Mqtt服务器
定义
一、说明
MqttTcpService 是基于 Mqtt 协议的消息服务端,支持客户端接入管理、订阅关系维护、消息路由转发、遗嘱消息处理等功能,兼容 Mqtt 3.1.1 及 5.0+ 协议版本。
二、特点
- 多协议版本支持(v3.1.1/v5.0)
- 高性能异步架构设计
- 主题树形管理机制
- 精确的 QoS 保障
- 遗嘱消息转发
- 插件化扩展体系
- TLS 加密通信
- 客户端黑白名单控制
三、应用场景
- 工业物联网平台
- 实时数据监控中心
- 智慧城市中枢系统
- 私有化消息总线
- 设备远程管理服务
四、可配置项
继承所有 TcpService 的配置。除此之外还支持 Mqtt 服务端专有配置。
可配置项
五、支持插件
| 插件方法 | 功能 |
|---|---|
| IMqttConnectingPlugin | 当Mqtt客户端正在连接之前调用此方法。 |
| IMqttConnectedPlugin | 当Mqtt客户端连接成功时调用。 |
| IMqttClosingPlugin | 当Mqtt客户端正在关闭时调用。 |
| IMqttClosedPlugin | 当Mqtt客户端断开连接后触发。 |
| IMqttReceivingPlugin | 在收到Mqtt所有消息时触发,可以通过e.MqttMessage获取到Mqtt的所有消息,包括订阅、订阅确认、发布、发布确认等。 |
| IMqttReceivedPlugin | 当接收到Mqtt发布消息,且成功接收时触发。可以通过e.MqttMessage获取到Mqtt的发布消息。 |
六、创建Mqtt服务端
6.1 基于Tcp协议创建
直接创建MqttTcpService,然后配置基本的监听地址和插件:
6.2 基于WebSocket协议创建
TouchSocket也支持基于WebSocket协议的Mqtt服务器。这种方式允许Web浏览器客户端直接连接到Mqtt服务器,特别适合于需要在浏览器环境中使用Mqtt的场景。
要创建基于WebSocket的Mqtt服务器,需要使用HttpService作为基础服务,然后通过插件启用MqttWebSocket功能:
- 基于WebSocket的Mqtt服务使用
HttpService作为基础服务 - 需要在容器中添加
AddMqttWebSocketService()注册服务 - 通过
UseMqttWebSocket("/mqtt")插件启用WebSocket协议支持,其中/mqtt是WebSocket的路径 - 客户端连接时使用ws://或wss://协议,例如:
ws://127.0.0.1:1883/mqtt - 支持所有标准的Mqtt插件和功能
七、接收消息
在Mqtt服务端创建好之后,Mqtt组件的消息都是通过插件抛出的。你可以订阅IMqttReceivingPlugin插件和IMqttReceivedPlugin插件来接收消息。
正如插件说明所示,IMqttReceivingPlugin插件可以在收到所有消息时触发,IMqttReceivedPlugin插件可以在收到发布消息时触发。
所以如果你只关心发布成功的消息,那么可以只订阅IMqttReceivedPlugin插件即可。
但是对于服务器来说,也可能需要获取到的消息可能是订阅、发布、取消订阅等。所以你可以通过IMqttReceivingPlugin的e.MqttMessage来获取到具体的消息类型。
下列将简单演示。
7.1 通过插件接收所有消息
7.2 接收发布消息
如果你只关心发布成功的消息,那么可以只订阅IMqttReceivedPlugin插件即可:
八、发布消息
Mqtt服务器可以主动向连接的客户端发布消息。
实际上,只要能拿到IMqttService实例,就可以使用MqttBroker属性,调用它的ForwardMessageAsync方法来发布消息。
8.1 向所有客户端发布消息
Tcp协议的Mqtt服务器,其本身实现了IMqttService接口,所以可以直接调用MqttBroker.ForwardMessageAsync方法来发布消息:
如果是基于WebSocket协议的Mqtt服务器,则需要通过依赖注入获取到IMqttService实例:
九、客户端管理
9.1 获取所有连接的客户端
9.2 断开指定客户端
十、订阅管理
MqttBroker 是 Mqtt 服务器的核心订阅路由引擎,负责维护客户端与主题之间的订阅关系,并将发布消息路由转发给所有匹配的订阅者。通过 service.MqttBroker 属性即可获取到当前服务器的 MqttBroker 实例。
10.1 查询所有主题
GetAllTopics 方法返回当前 Broker 中已注册的所有主题名称快照,可用于监控、调试或管理界面展示:
10.2 查询主题的订阅者
GetSubscribers 方法支持通配符匹配,返回与指定主题匹配的所有 Subscription 订阅者对象。每个订阅者包含 ClientId(客户端标识)和 QosLevel(订阅的服务质量等级)属性:
10.3 检查订阅关系
ContainsSubscriber 方法可精确判断某个客户端是否已订阅了指定主题:
10.4 手动添加订阅
除了客户端主动发送 Subscribe 指令之外,服务端也可以通过 RegisterActor 方法为指定客户端主动建立订阅关系,适用于需要服务端控制订阅权限或批量初始化订阅的场景:
RegisterActor 等同于在内部创建一个 Subscription 对象并调用 AddSubscriber,两者均可使用。手动添加的订阅关系与客户端主动订阅的行为一致,消息转发时会按 QoS 等级协商进行路由。
10.5 手动移除订阅
UnregisterActor 方法可从指定主题中移除某个客户端的订阅关系。若需要更精确地控制,也可以使用 RemoveSubscriber 方法直接传入 Subscription 对象:
10.6 清空订阅
ClearTopic 可移除指定主题下的所有订阅者,Clear 方法可一次性清空所有主题及全部订阅关系:
十一、服务器管理
11.1 停止服务器
11.2 释放资源
十二、示例Demo
十三、性能测试
本节通过基准测试对比 TouchSocket.Mqtt(TSMqtt)服务器 与 MQTTnet 服务器 在不同 QoS 等级和发布/订阅规模下的吞吐表现,客户端同时使用 MQTTnet 和 TSMqtt 两种实现进行测试,共覆盖 18 个测试场景,全部零丢包完成。
13.1 使用 MQTTnet 作为服务器
下表展示了在 MQTTnet Broker 托管下,两种客户端在各场景中的发送速率与接收速率(单位:条/秒):
[使用mqttnet作为服务器]
| 场景名称 | 客户端 | QoS | 发布x订阅 | 发送量 | 实际收 | 丢包 | 耗时(s) | 发送/s | 接收/s | 状态 |
|---|---|---|---|---|---|---|---|---|---|---|
| 1发布x1订阅 | MQTTnet | QoS0 | 1x1 | 50,000 | 50,000 | - | 1.87 | 26,786 | 26,786 | OK |
| 1发布x1订阅 | MQTTnet | QoS1 | 1x1 | 20,000 | 20,000 | - | 2.01 | 9,936 | 9,936 | OK |
| 1发布x1订阅 | MQTTnet | QoS2 | 1x1 | 5,000 | 5,000 | - | 0.93 | 5,385 | 5,385 | OK |
| 1发布x50订阅 | MQTTnet | QoS0 | 1x50 | 10,000 | 500,000 | - | 4.73 | 2,112 | 105,636 | OK |
| 1发布x50订阅 | MQTTnet | QoS1 | 1x50 | 5,000 | 250,000 | - | 3.38 | 1,479 | 73,996 | OK |
| 1发布x50订阅 | MQTTnet | QoS2 | 1x50 | 1,000 | 50,000 | - | 1.11 | 903 | 45,177 | OK |
| 50发布x50订阅 | MQTTnet | QoS0 | 50x50 | 50,000 | 2,500,000 | - | 26.82 | 1,864 | 93,219 | OK |
| 50发布x50订阅 | MQTTnet | QoS1 | 50x50 | 25,000 | 1,250,000 | - | 17.13 | 1,459 | 72,963 | OK |
| 50发布x50订阅 | MQTTnet | QoS2 | 50x50 | 5,000 | 250,000 | - | 5.68 | 879 | 43,987 | OK |
| 1发布x1订阅 | TSMqtt | QoS0 | 1x1 | 50,000 | 50,000 | - | 1.83 | 27,321 | 27,321 | OK |
| 1发布x1订阅 | TSMqtt | QoS1 | 1x1 | 20,000 | 20,000 | - | 1.83 | 10,912 | 10,912 | OK |
| 1发布x1订阅 | TSMqtt | QoS2 | 1x1 | 5,000 | 5,000 | - | 0.87 | 5,761 | 5,761 | OK |
| 1发布x50订阅 | TSMqtt | QoS0 | 1x50 | 10,000 | 500,000 | - | 5.20 | 1,923 | 96,175 | OK |
| 1发布x50订阅 | TSMqtt | QoS1 | 1x50 | 5,000 | 250,000 | - | 3.58 | 1,397 | 69,870 | OK |
| 1发布x50订阅 | TSMqtt | QoS2 | 1x50 | 1,000 | 50,000 | - | 1.16 | 865 | 43,288 | OK |
| 50发布x50订阅 | TSMqtt | QoS0 | 50x50 | 50,000 | 2,500,000 | - | 27.07 | 1,847 | 92,359 | OK |
| 50发布x50订阅 | TSMqtt | QoS1 | 50x50 | 25,000 | 1,250,000 | - | 15.75 | 1,587 | 79,362 | OK |
| 50发布x50订阅 | TSMqtt | QoS2 | 50x50 | 5,000 | 250,000 | - | 4.48 | 1,115 | 55,759 | OK |
| 合计 / 平均 | 18/18 | - | - | 342,000 | 9,750,000 | - | - | 5,752 | 53,216 | 全部OK |
MQTTnet vs TSMqtt 对比
| 场景 | QoS | MQTTnet 接收/s | TSMqtt 接收/s | 接收/s 差距 | MQTTnet 耗时(s) | TSMqtt 耗时(s) | 耗时差(s) |
|---|---|---|---|---|---|---|---|
| 1发布x1订阅 | QoS0 | 26,786 | 27,321 | +535 | 1.87 | 1.83 | -0.04 |
| 1发布x1订阅 | QoS1 | 9,936 | 10,912 | +976 | 2.01 | 1.83 | -0.18 |
| 1发布x1订阅 | QoS2 | 5,385 | 5,761 | +376 | 0.93 | 0.87 | -0.06 |
| 1发布x50订阅 | QoS0 | 105,636 | 96,175 | -9,461 | 4.73 | 5.20 | +0.47 |
| 1发布x50订阅 | QoS1 | 73,996 | 69,870 | -4,126 | 3.38 | 3.58 | +0.20 |
| 1发布x50订阅 | QoS2 | 45,177 | 43,288 | -1,889 | 1.11 | 1.16 | +0.05 |
| 50发布x50订阅 | QoS0 | 93,219 | 92,359 | -860 | 26.82 | 27.07 | +0.25 |
| 50发布x50订阅 | QoS1 | 72,963 | 79,362 | +6,399 | 17.13 | 15.75 | -1.38 |
| 50发布x50订阅 | QoS2 | 43,987 | 55,759 | +11,772 | 5.68 | 4.48 | -1.20 |
在 MQTTnet 服务器下的结论: 两种客户端的整体表现基本持平,TSMqtt 客户端在 QoS0 单发单订场景下与 MQTTnet 相近,在高 QoS 大规模(50x50)场景中 TSMqtt 接收速率略有优势(QoS1 +6,399/s,QoS2 +11,772/s)。可见客户端实现对服务端吞吐的影响在同一 Broker 下并不显著,性能瓶颈主要在 Broker 侧。
13.2 使用 TouchSocket.Mqtt 作为服务器
下表展示了在 TouchSocket.Mqtt Broker 托管下,两种客户端在各场景中的表现:
[使用TouchSocket.Mqtt作为服务器]
| 场景名称 | 客户端 | QoS | 发布x订阅 | 发送量 | 实际收 | 丢包 | 耗时(s) | 发送/s | 接收/s | 状态 |
|---|---|---|---|---|---|---|---|---|---|---|
| 1发布x1订阅 | MQTTnet | QoS0 | 1x1 | 50,000 | 50,000 | - | 1.58 | 31,667 | 31,667 | OK |
| 1发布x1订阅 | MQTTnet | QoS1 | 1x1 | 20,000 | 20,000 | - | 1.34 | 14,977 | 14,977 | OK |
| 1发布x1订阅 | MQTTnet | QoS2 | 1x1 | 5,000 | 5,000 | - | 0.59 | 8,403 | 8,403 | OK |
| 1发布x50订阅 | MQTTnet | QoS0 | 1x50 | 10,000 | 500,000 | - | 1.50 | 6,656 | 332,840 | OK |
| 1发布x50订阅 | MQTTnet | QoS1 | 1x50 | 5,000 | 250,000 | - | 2.14 | 2,334 | 116,736 | OK |
| 1发布x50订阅 | MQTTnet | QoS2 | 1x50 | 1,000 | 50,000 | - | 0.85 | 1,177 | 58,850 | OK |
| 50发布x50订阅 | MQTTnet | QoS0 | 50x50 | 50,000 | 2,500,000 | - | 2.43 | 20,548 | 1,027,408 | OK |
| 50发布x50订阅 | MQTTnet | QoS1 | 50x50 | 25,000 | 1,250,000 | - | 11.18 | 2,235 | 111,761 | OK |
| 50发布x50订阅 | MQTTnet | QoS2 | 50x50 | 5,000 | 250,000 | - | 4.79 | 1,042 | 52,138 | OK |
| 1发布x1订阅 | TSMqtt | QoS0 | 1x1 | 50,000 | 50,000 | - | 0.18 | 271,947 | 271,947 | OK |
| 1发布x1订阅 | TSMqtt | QoS1 | 1x1 | 20,000 | 20,000 | - | 1.17 | 17,130 | 17,130 | OK |
| 1发布x1订阅 | TSMqtt | QoS2 | 1x1 | 5,000 | 5,000 | - | 0.57 | 8,845 | 8,845 | OK |
| 1发布x50订阅 | TSMqtt | QoS0 | 1x50 | 10,000 | 500,000 | - | 0.20 | 50,635 | 2,531,772 | OK |
| 1发布x50订阅 | TSMqtt | QoS1 | 1x50 | 5,000 | 250,000 | - | 2.17 | 2,308 | 115,438 | OK |
| 1发布x50订阅 | TSMqtt | QoS2 | 1x50 | 1,000 | 50,000 | - | 0.88 | 1,133 | 56,684 | OK |
| 50发布x50订阅 | TSMqtt | QoS0 | 50x50 | 50,000 | 2,500,000 | - | 0.89 | 56,428 | 2,821,412 | OK |
| 50发布x50订阅 | TSMqtt | QoS1 | 50x50 | 25,000 | 1,250,000 | - | 12.33 | 2,026 | 101,346 | OK |
| 50发布x50订阅 | TSMqtt | QoS2 | 50x50 | 5,000 | 250,000 | - | 4.80 | 1,041 | 52,097 | OK |
| 合计 / 平均 | 18/18 | - | - | 342,000 | 9,750,000 | - | - | 27,807 | 429,525 | 全部OK |
MQTTnet vs TSMqtt 对比
| 场景 | QoS | MQTTnet 接收/s | TSMqtt 接收/s | 接收/s 差距 | MQTTnet 耗时(s) | TSMqtt 耗时(s) | 耗时差(s) |
|---|---|---|---|---|---|---|---|
| 1发布x1订阅 | QoS0 | 31,667 | 271,947 | +240,280 | 1.58 | 0.18 | -1.40 |
| 1发布x1订阅 | QoS1 | 14,977 | 17,130 | +2,153 | 1.34 | 1.17 | -0.17 |
| 1发布x1订阅 | QoS2 | 8,403 | 8,845 | +442 | 0.59 | 0.57 | -0.03 |
| 1发布x50订阅 | QoS0 | 332,840 | 2,531,772 | +2,198,932 | 1.50 | 0.20 | -1.30 |
| 1发布x50订阅 | QoS1 | 116,736 | 115,438 | -1,298 | 2.14 | 2.17 | +0.02 |
| 1发布x50订阅 | QoS2 | 58,850 | 56,684 | -2,166 | 0.85 | 0.88 | +0.03 |
| 50发布x50订阅 | QoS0 | 1,027,408 | 2,821,412 | +1,794,004 | 2.43 | 0.89 | -1.55 |
| 50发布x50订阅 | QoS1 | 111,761 | 101,346 | -10,415 | 11.18 | 12.33 | +1.15 |
| 50发布x50订阅 | QoS2 | 52,138 | 52,097 | -41 | 4.79 | 4.80 | +0.00 |
13.3 综合结论
TouchSocket.Mqtt 服务端在 QoS0 场景下具有极为显著的性能优势:
-
QoS0 单发单订(1x1):TSMqtt 客户端连接 TouchSocket 服务器时接收速率高达 271,947/s,是使用 MQTTnet 服务器时(27,321/s)的约 10 倍,耗时从 1.83s 降至 0.18s。这得益于 TouchSocket 在零确认路径上的内存零拷贝与无锁化分发设计。
-
QoS0 一发多订(1x50):TSMqtt + TouchSocket 组合的接收速率达到 253 万/s,而同场景下 MQTTnet 服务器仅能达到 10.5 万/s,提升约 24 倍。
-
QoS0 多发多订(50x50):TSMqtt + TouchSocket 组合接收速率高达 282 万/s,是 MQTTnet 服务器同场景的约 30 倍。
-
QoS1/QoS2 场景:受协议本身的握手确认机制约束,两者服务器在此类场景下表现接近,不存在量级差异。这说明 QoS1/2 的吞吐上限更多由协议往返延迟决定,而非 Broker 实现效率。
-
综合平均吞吐:TouchSocket 服务器的平均接收速率为 429,525/s,是 MQTTnet 服务器(53,216/s)的约 8 倍;平均发送速率也从 5,752/s 提升至 27,807/s(约 4.8 倍)。
在对 QoS0 吞吐有极高要求的场景(如大规模设备遥测数据上报、实时数据流分发等),推荐优先选用 TouchSocket.Mqtt 作为服务端并配合 TSMqtt 客户端,可获得最优性能表现。QoS1/2 场景下两种 Broker 表现相当,可根据团队技术栈灵活选择。