Mqtt客户端
定义
一、说明
MqttTcpClient 是遵循Mqtt协议的消息客户端,支持连接Mqtt Broker、订阅主题、发布消息等功能,支持QoS 0/1/2三种消息质量等级,兼容 Mqtt 3.1.1 及 5.0+ 协议版本。
二、特点
- 轻量级协议支持
- QoS消息质量保障(0/1/2)
- 遗嘱消息支持
- 保留消息处理
- 自动重连机制
- TLS加密通信
- 主题通配符匹配(+/#)
- 插件化扩展体系
三、应用场景
- IoT设备数据上报
- 跨平台消息推送
- 低带宽环境通信
- 设备状态同步
- 远程设备控制
四、可配置项
继承所有TcpClient的配置。除此之外还支持Mqtt客户端专有配置。
可配置项
SetMqttConnectOptions
MqttConnectOptions 类用于配置 Mqtt 客户端连接到 Mqtt 服务端时的参数,支持 Mqtt 3.1.1 及 5.0+ 协议版本。以下是各配置项的详细说明:
基础连接选项(通用)
| 属性名 | Mqtt 版本支持 | 说明 |
|---|---|---|
CleanSession | 3.1.1+ | 是否清理会话(断开后删除未完成的遗嘱和离线消息)。 |
ClientId | 3.1.1+ | 客户端唯一标识符(服务端用于识别客户端);Mqtt 3.1.1 中必填(长度 1-23 字节,ASCII 字符);5.0+ 允许空字符串(仅当服务端允许时)。 |
KeepAlive | 3.1.1+ | 心跳保活时长(单位:秒),表示客户端与服务端保持连接的最长时间;需与服务端配置兼容(服务端可能拒绝过大或过小的值),默认值通常为 60 秒。 |
Password | 3.1.1+ | 连接认证密码(与 UserName 配合使用);若 UserName 为空则无效;5.0+ 支持二进制数据(需通过 AuthenticationData 传递)。 |
ProtocolName | 3.1.1+ | Mqtt 协议名称(固定为 "MQTT"),默认值为 "MQTT",通常无需手动设置。 |
UserName | 3.1.1+ | 连接认证用户名(与 Password 配合使用);5.0+ 支持二进制数据(需通过 AuthenticationData 传递)。 |
Version | 3.1.1+/5.0+ | Mqtt 协议版本(如 MqttProtocolVersion.V311 或 V500);需根据服务端支持的版本设置,否则可能导致连接失败。 |
遗嘱(Will)消息配置(可选)
当 WillFlag 为 true 时,服务端会在客户端异常断开时发布遗嘱消息。仅部分属性在 WillFlag=true 时生效。
| 属性名 | Mqtt 版本支持 | 说明 |
|---|---|---|
WillFlag | 3.1.1+ | 是否启用遗嘱消息(若为 true,需配置遗嘱相关属性)。 |
WillQos | 3.1.1+ | 遗嘱消息的服务质量等级(0/1/2);需与服务端支持的 QoS 等级兼容。 |
WillRetain | 3.1.1+ | 是否保留遗嘱消息(服务端保留最新遗嘱消息供新订阅者接收);默认值为 false;若为 true,服务端需支持保留消息功能。 |
WillMessage | 3.1.1+ | 遗嘱消息的内容(文本类型,Mqtt 5.0+ 支持二进制数据,需通过 WillContentType 和 WillPayloadFormatIndicator 配合)。 |
WillTopic | 3.1.1+ | 遗嘱消息发布的主题(需符合 Mqtt 主题命名规则);主题需提前订阅才能接收遗嘱消息。 |
WillContentType | 5.0+ | 遗嘱消息内容的 MIME 类型(如 "text/plain"、"application/json");描述 WillMessage 的格式(仅 5.0+ 有效)。 |
WillPayloadFormatIndicator | 5.0+ | 遗嘱消息负载格式指示符(如 Raw、Utf8);更细粒度描述负载格式(仅 5.0+ 有效)。 |
WillResponseTopic | 5.0+ | 遗嘱消息触发后,服务端向客户端发送响应的主题;需客户端提前订阅该主题以接收响应。 |
WillCorrelationData | 5.0+ | 遗嘱消息关联的上下文数据(用于匹配请求与响应);需与服务端约定格式(仅 5.0+ 有效)。 |
WillDelayInterval | 5.0+ | 遗嘱消息延迟发布的时间间隔(单位:秒);遗嘱触发后,延迟指定时间再发布(仅 5.0+ 有效)。 |
WillMessageExpiryInterval | 5.0+ | 遗嘱消息的有效期(单位:秒,过期后服务端删除);超过此时间未发布的遗嘱消息将被丢弃(仅 5.0+ 有效)。 |
Mqtt 5.0+ 扩展配置(可选)
仅当使用 Mqtt 5.0+ 协议时生效,用于更细粒度的连接控制。
| 属性名 | Mqtt 版本支持 | 说明 |
|---|---|---|
UserProperties | 5.0+ | 用户自定义属性(键值对,用于传递业务元数据);键值对需符合 Mqtt 5.0 规范(键为 UTF-8 字符串,值支持多种类型)。 |
AuthenticationMethod | 5.0+ | 认证方法名称(如 "OAuth2"、"Custom");需与服务端支持的认证方法一致(仅 5.0+ 支持扩展认证)。 |
AuthenticationData | 5.0+ | 认证方法的附加数据(如令牌、签名等);配合 AuthenticationMethod 使用(仅 5.0+ 有效)。 |
MaximumPacketSize | 5.0+ | 客户端请求的最大数据包大小(单位:字节);服务端会返回其支持的最大值(客户端需遵守)。 |
ReceiveMaximum | 5.0+ | 客户端能接收的最大 QoS 1/2 消息数量(防止消息堆积);默认值通常为 65535(需与服务端配置兼容)。 |
RequestProblemInformation | 5.0+ | 是否请求服务端在出错时返回详细问题信息(如原因码、原因字符串);默认值为 true(建议开启以便排查问题)。 |
RequestResponseInformation | 5.0+ | 是否请求服务端返回连接响应的额外信息(如会话过期时间);默认值为 false(仅当需要时启用)。 |
SessionExpiryInterval | 5.0+ | 会话过期时间间隔(单位:秒,0 表示立即过期);客户端断开后,服务端保留会话的时间(仅当 CleanSession=false 时有效)。 |
TopicAliasMaximum | 5.0+ | 客户端允许的最大主题别名值(服务端不能使用超过此值的别名);默认值为 0(表示不使用主题别名)。 |
- 协议版本兼容性:部分属性(如
UserProperties、AuthenticationMethod)仅在 Mqtt 5.0+ 中有效,需根据服务端版本选择配置。 - 遗嘱消息生效条件:
WillFlag必须为true,且WillTopic和WillMessage需有效配置,否则遗嘱不会被发送。 - 认证扩展:5.0+ 的
AuthenticationData需配合自定义认证插件使用,具体格式由服务端定义。
五、支持插件
| 插件方法 | 功能 |
|---|---|
| IMqttConnectingPlugin | 当Mqtt客户端正在连接之前调用此方法。 |
| IMqttConnectedPlugin | 当Mqtt客户端连接成功时调用。 |
| IMqttClosingPlugin | 当Mqtt客户端正在关闭时调用。 |
| IMqttClosedPlugin | 当Mqtt客户端断开连接后触发。 |
| IMqttReceivingPlugin | 在收到Mqtt所有消息时触发,可以通过e.MqttMessage获取到Mqtt的所有消息,包括订阅、订阅确认、发布、发布确认等。 |
| IMqttReceivedPlugin | 当接收到Mqtt发布消息,且成功接收时触发。可以通过e.MqttMessage获取到Mqtt的发布消息。 |
六、创建Mqtt客户端
6.1 简单创建
直接创建MqttTcpClient实例,配置连接参数和插件:
client.ConnectAsync()方法会连接到Mqtt服务器,如果连接失败会抛出异常。
6.2 配置连接选项
6.3 配置插件
例如,连接相关插件:
七、订阅与取消订阅
7.1 订阅主题
按照Mqtt协议,客户端可以通过Subscribe指令订阅一个或多个主题,服务端会返回订阅结果。
7.2 取消订阅
按照Mqtt协议,客户端可以通过Unsubscribe指令取消订阅一个或多个主题,服务端会返回所有取消订阅结果。
八、接收消息
在Mqtt客户端连接成功之后,Mqtt组件的消息都是通过插件抛出的。你可以订阅IMqttReceivingPlugin插件和IMqttReceivedPlugin插件来接收消息。
正如插件说明所示,IMqttReceivingPlugin插件可以在收到所有消息时触发,IMqttReceivedPlugin插件可以在收到发布消息时触发。
所以如果你只关心接收(已发布)消息的消息,那么可以只订阅IMqttReceivedPlugin插件即可。
但是对于客户端来说,也可能需要获取到的消息可能是订阅确认、发布、发布确认、取消订阅确认等。所以你可以通过IMqttReceivingPlugin的e.MqttMessage来获取到具体的消息类型。
下列将简单演示。
8.1 通过插件接收所有消息
8.2 接收发布消息
如果你只关心发布成功的消息,那么可以只订阅IMqttReceivedPlugin插件即可:
九、发布消息
或者使用高性能的内存池方式发布数据。