跳到主要内容
版本:3.0

MQTT客户端使用

定义

命名空间:TouchSocket.MQTT
程序集:TouchSocket.dll

一、说明

MQTTClient是遵循MQTT协议的消息客户端,支持连接MQTT Broker、订阅主题、发布消息等功能,支持QoS 0/1/2三种消息质量等级。

二、特点

  • 轻量级协议支持
  • QoS消息质量保障
  • 遗嘱消息支持
  • 保留消息处理
  • TLS加密通信
  • 主题通配符匹配(+/#)

三、应用场景

  • IoT设备数据上报
  • 跨平台消息推送
  • 低带宽环境通信
  • 设备状态同步

四、可配置项

SetRemoteIPHost

MQTT Broker地址,格式:

  • mqtt://127.0.0.1:1883
  • mqtts://broker.emqx.io:8883 (TLS加密)

SetClientId

客户端唯一标识,默认自动生成GUID

SetKeepAliveInterval

心跳间隔(秒),默认60秒

SetCleanSession

清除会话标志,默认true

SetWillMessage

配置遗嘱消息:

五、支持插件

| 插件接口 | 功能 | | --- | --- | | IMqttConnectingPlugin | 连接Broker前触发 | | IMqttConnectedPlugin | 成功连接后触发 | | IMqttSubscribedPlugin | 订阅主题成功时触发 | | IMqttUnsubscribedPlugin | 取消订阅时触发 | | IMqttMessageReceivedPlugin | 收到消息时触发 |

六、创建MQTT客户端

var mqttClient = new MQTTClient();
mqttClient.Connected = (client, e) => {
Console.WriteLine($"Connected to {e.Options.Server}");
return EasyTask.CompletedTask;
};

await mqttClient.SetupAsync(new TouchSocketConfig()
.SetRemoteIPHost("mqtt://127.0.0.1:1883")
.SetClientId("device_001"));

await mqttClient.ConnectAsync();

七、订阅主题

// 订阅单个主题
await mqttClient.SubscribeAsync("home/sensor/temperature", QoSLevel.AtLeastOnce);

// 批量订阅
var topics = new List<MqttSubscribeOption> {
new("home/+/status", QoSLevel.AtMostOnce),
new("factory/#", QoSLevel.ExactlyOnce)
};
await mqttClient.SubscribeAsync(topics);

八、消息处理

mqttClient.Received = (client, e) => {
var topic = e.Topic;
var payload = e.Payload.ToText();
Console.WriteLine($"收到 [{topic}]: {payload}");
return EasyTask.CompletedTask;
};

九、发布消息

// 发布QoS 0消息
await mqttClient.PublishAsync("device/status", "online", QoSLevel.AtMostOnce);

// 发布保留消息
var message = new MqttPublishMessage {
Topic = "config/update",
Payload = Encoding.UTF8.GetBytes("v1.2.0"),
QoS = QoSLevel.ExactlyOnce,
Retain = true
};
await mqttClient.PublishAsync(message);

十、断线重连

.ConfigurePlugins(a => {
a.UseMqttReconnection()
.SetCheckInterval(TimeSpan.FromSeconds(5));
})