跳到主要内容
版本:4.0-rc

Mqtt客户端

定义

命名空间:
TouchSocket.Mqtt
安装:
dotnet add package TouchSocket.Mqtt

一、说明

MqttTcpClient 是遵循Mqtt协议的消息客户端,支持连接Mqtt Broker、订阅主题、发布消息等功能,支持QoS 0/1/2三种消息质量等级,兼容 Mqtt 3.1.1 及 5.0+ 协议版本。

二、特点

  • 轻量级协议支持
  • QoS消息质量保障(0/1/2)
  • 遗嘱消息支持
  • 保留消息处理
  • 自动重连机制
  • TLS加密通信
  • 主题通配符匹配(+/#)
  • 插件化扩展体系

三、应用场景

  • IoT设备数据上报
  • 跨平台消息推送
  • 低带宽环境通信
  • 设备状态同步
  • 远程设备控制

四、可配置项

继承所有TcpClient的配置。除此之外还支持Mqtt客户端专有配置。

可配置项

SetMqttConnectOptions

MqttConnectOptions 类用于配置 Mqtt 客户端连接到 Mqtt 服务端时的参数,支持 Mqtt 3.1.1 及 5.0+ 协议版本。以下是各配置项的详细说明:

基础连接选项(通用)

属性名Mqtt 版本支持说明
CleanSession3.1.1+是否清理会话(断开后删除未完成的遗嘱和离线消息)。
ClientId3.1.1+客户端唯一标识符(服务端用于识别客户端);Mqtt 3.1.1 中必填(长度 1-23 字节,ASCII 字符);5.0+ 允许空字符串(仅当服务端允许时)。
KeepAlive3.1.1+心跳保活时长(单位:秒),表示客户端与服务端保持连接的最长时间;需与服务端配置兼容(服务端可能拒绝过大或过小的值),默认值通常为 60 秒。
Password3.1.1+连接认证密码(与 UserName 配合使用);若 UserName 为空则无效;5.0+ 支持二进制数据(需通过 AuthenticationData 传递)。
ProtocolName3.1.1+Mqtt 协议名称(固定为 "MQTT"),默认值为 "MQTT",通常无需手动设置。
UserName3.1.1+连接认证用户名(与 Password 配合使用);5.0+ 支持二进制数据(需通过 AuthenticationData 传递)。
Version3.1.1+/5.0+Mqtt 协议版本(如 MqttProtocolVersion.V311V500);需根据服务端支持的版本设置,否则可能导致连接失败。

遗嘱(Will)消息配置(可选)

WillFlagtrue 时,服务端会在客户端异常断开时发布遗嘱消息。仅部分属性在 WillFlag=true 时生效。

属性名Mqtt 版本支持说明
WillFlag3.1.1+是否启用遗嘱消息(若为 true,需配置遗嘱相关属性)。
WillQos3.1.1+遗嘱消息的服务质量等级(0/1/2);需与服务端支持的 QoS 等级兼容。
WillRetain3.1.1+是否保留遗嘱消息(服务端保留最新遗嘱消息供新订阅者接收);默认值为 false;若为 true,服务端需支持保留消息功能。
WillMessage3.1.1+遗嘱消息的内容(文本类型,Mqtt 5.0+ 支持二进制数据,需通过 WillContentTypeWillPayloadFormatIndicator 配合)。
WillTopic3.1.1+遗嘱消息发布的主题(需符合 Mqtt 主题命名规则);主题需提前订阅才能接收遗嘱消息。
WillContentType5.0+遗嘱消息内容的 MIME 类型(如 "text/plain"、"application/json");描述 WillMessage 的格式(仅 5.0+ 有效)。
WillPayloadFormatIndicator5.0+遗嘱消息负载格式指示符(如 RawUtf8);更细粒度描述负载格式(仅 5.0+ 有效)。
WillResponseTopic5.0+遗嘱消息触发后,服务端向客户端发送响应的主题;需客户端提前订阅该主题以接收响应。
WillCorrelationData5.0+遗嘱消息关联的上下文数据(用于匹配请求与响应);需与服务端约定格式(仅 5.0+ 有效)。
WillDelayInterval5.0+遗嘱消息延迟发布的时间间隔(单位:秒);遗嘱触发后,延迟指定时间再发布(仅 5.0+ 有效)。
WillMessageExpiryInterval5.0+遗嘱消息的有效期(单位:秒,过期后服务端删除);超过此时间未发布的遗嘱消息将被丢弃(仅 5.0+ 有效)。

Mqtt 5.0+ 扩展配置(可选)

仅当使用 Mqtt 5.0+ 协议时生效,用于更细粒度的连接控制。

属性名Mqtt 版本支持说明
UserProperties5.0+用户自定义属性(键值对,用于传递业务元数据);键值对需符合 Mqtt 5.0 规范(键为 UTF-8 字符串,值支持多种类型)。
AuthenticationMethod5.0+认证方法名称(如 "OAuth2"、"Custom");需与服务端支持的认证方法一致(仅 5.0+ 支持扩展认证)。
AuthenticationData5.0+认证方法的附加数据(如令牌、签名等);配合 AuthenticationMethod 使用(仅 5.0+ 有效)。
MaximumPacketSize5.0+客户端请求的最大数据包大小(单位:字节);服务端会返回其支持的最大值(客户端需遵守)。
ReceiveMaximum5.0+客户端能接收的最大 QoS 1/2 消息数量(防止消息堆积);默认值通常为 65535(需与服务端配置兼容)。
RequestProblemInformation5.0+是否请求服务端在出错时返回详细问题信息(如原因码、原因字符串);默认值为 true(建议开启以便排查问题)。
RequestResponseInformation5.0+是否请求服务端返回连接响应的额外信息(如会话过期时间);默认值为 false(仅当需要时启用)。
SessionExpiryInterval5.0+会话过期时间间隔(单位:秒,0 表示立即过期);客户端断开后,服务端保留会话的时间(仅当 CleanSession=false 时有效)。
TopicAliasMaximum5.0+客户端允许的最大主题别名值(服务端不能使用超过此值的别名);默认值为 0(表示不使用主题别名)。

注意
  1. 协议版本兼容性:部分属性(如 UserPropertiesAuthenticationMethod)仅在 Mqtt 5.0+ 中有效,需根据服务端版本选择配置。
  2. 遗嘱消息生效条件WillFlag 必须为 true,且 WillTopicWillMessage 需有效配置,否则遗嘱不会被发送。
  3. 认证扩展:5.0+ 的 AuthenticationData 需配合自定义认证插件使用,具体格式由服务端定义。

五、支持插件

插件方法功能
IMqttConnectingPlugin当Mqtt客户端正在连接之前调用此方法。
IMqttConnectedPlugin当Mqtt客户端连接成功时调用。
IMqttClosingPlugin当Mqtt客户端正在关闭时调用。
IMqttClosedPlugin当Mqtt客户端断开连接后触发。
IMqttReceivingPlugin在收到Mqtt所有消息时触发,可以通过e.MqttMessage获取到Mqtt的所有消息,包括订阅、订阅确认、发布、发布确认等。
IMqttReceivedPlugin当接收到Mqtt发布消息,且成功接收时触发。可以通过e.MqttMessage获取到Mqtt的发布消息。

六、创建Mqtt客户端

6.1 简单创建

直接创建MqttTcpClient实例,配置连接参数和插件:

🔄 正在加载代码...
温馨提示

client.ConnectAsync()方法会连接到Mqtt服务器,如果连接失败会抛出异常。

6.2 配置连接选项

🔄 正在加载代码...

6.3 配置插件

🔄 正在加载代码...

例如,连接相关插件:

🔄 正在加载代码...

七、订阅与取消订阅

7.1 订阅主题

按照Mqtt协议,客户端可以通过Subscribe指令订阅一个或多个主题,服务端会返回订阅结果。

🔄 正在加载代码...

7.2 取消订阅

按照Mqtt协议,客户端可以通过Unsubscribe指令取消订阅一个或多个主题,服务端会返回所有取消订阅结果。

🔄 正在加载代码...

八、接收消息

在Mqtt客户端连接成功之后,Mqtt组件的消息都是通过插件抛出的。你可以订阅IMqttReceivingPlugin插件和IMqttReceivedPlugin插件来接收消息。

正如插件说明所示,IMqttReceivingPlugin插件可以在收到所有消息时触发,IMqttReceivedPlugin插件可以在收到发布消息时触发。

所以如果你只关心接收(已发布)消息的消息,那么可以只订阅IMqttReceivedPlugin插件即可。

但是对于客户端来说,也可能需要获取到的消息可能是订阅确认、发布、发布确认、取消订阅确认等。所以你可以通过IMqttReceivingPlugine.MqttMessage来获取到具体的消息类型。

下列将简单演示。

8.1 通过插件接收所有消息

🔄 正在加载代码...

8.2 接收发布消息

如果你只关心发布成功的消息,那么可以只订阅IMqttReceivedPlugin插件即可:

🔄 正在加载代码...

九、发布消息

🔄 正在加载代码...

或者使用高性能的内存池方式发布数据。

🔄 正在加载代码...

十、断开连接

10.1 正常断开

🔄 正在加载代码...

10.2 释放资源

🔄 正在加载代码...

十一、连接管理

11.1 检查连接状态

🔄 正在加载代码...

11.2 自动重连配置

🔄 正在加载代码...