跳到主要内容
版本:3.1

Mqtt服务器

定义

命名空间:TouchSocket.Mqtt
程序集:TouchSocket.Mqtt.dll

一、说明

MqttTcpService 是基于 Mqtt 协议的消息服务端,支持客户端接入管理、订阅关系维护、消息路由转发、遗嘱消息处理等功能,兼容 Mqtt 3.1.1 及 5.0+ 协议版本。

二、特点

  • 多协议版本支持(v3.1.1/v5.0)
  • 高性能异步架构设计
  • 主题树形管理机制
  • 精确的 QoS 保障
  • 遗嘱消息转发
  • 插件化扩展体系
  • TLS 加密通信
  • 客户端黑白名单控制

三、应用场景

  • 工业物联网平台
  • 实时数据监控中心
  • 智慧城市中枢系统
  • 私有化消息总线
  • 设备远程管理服务

四、可配置项

继承所有 TcpService 的配置。除此之外还支持 Mqtt 服务端专有配置。

可配置项

五、支持插件

插件方法功能
IMqttConnectingPlugin当Mqtt客户端正在连接之前调用此方法。
IMqttConnectedPlugin当Mqtt客户端连接成功时调用。
IMqttClosingPlugin当Mqtt客户端正在关闭时调用。
IMqttClosedPlugin当Mqtt客户端断开连接后触发。
IMqttReceivingPlugin在收到Mqtt所有消息时触发,可以通过e.MqttMessage获取到Mqtt的所有消息,包括订阅、订阅确认、发布、发布确认等。
IMqttReceivedPlugin当接收到Mqtt发布消息,且成功接收时触发。可以通过e.MqttMessage获取到Mqtt的发布消息。

六、创建Mqtt服务端

var service = new MqttTcpService();
await service.SetupAsync(new TouchSocketConfig()
.SetListenIPHosts(7789) // 监听所有网卡
.ConfigureContainer(a =>
{
a.AddConsoleLogger();
})
.ConfigurePlugins(a =>
{
a.AddMqttClientConnectedPlugin(async (service, e) =>
{
Console.WriteLine($"Client Connected:{e.ClientId}");
await e.InvokeNext();
});

a.AddMqttClientDisconnectedPlugin(async (service, e) =>
{
Console.WriteLine($"Client Disconnected:{e.ClientId}");
await e.InvokeNext();
});
}));
await service.StartAsync();

七、接收消息

在Mqtt服务端创建好之后,Mqtt组件的消息都是通过插件抛出的。你可以订阅IMqttReceivingPlugin插件和IMqttReceivedPlugin插件来接收消息。

正如插件说明所示,IMqttReceivingPlugin插件可以在收到所有消息时触发,IMqttReceivedPlugin插件可以在收到发布消息时触发。

所以如果你只关心发布成功的消息,那么可以只订阅IMqttReceivedPlugin插件即可。

但是对于服务器来说,也可能需要获取到的消息可能是订阅、发布、取消订阅等。所以你可以通过IMqttReceivingPlugine.MqttMessage来获取到具体的消息类型。

下列将简单演示。

7.1 订阅消息

.ConfigurePlugins(a =>
{
a.AddMqttReceivingPlugin(async (client, e) =>
{
switch (e.MqttMessage)
{
case MqttSubscribeMessage message:
{
//订阅消息
Console.WriteLine("Reving:" + e.MqttMessage.MessageType);
foreach (var subscribeRequest in message.SubscribeRequests)
{
var topic = subscribeRequest.Topic;
var qosLevel = subscribeRequest.QosLevel;
//或者其他属性
Console.WriteLine($"Subscribe Topic:{topic},QosLevel:{qosLevel}");
}
break;
}
default:
break;
}
Console.WriteLine("Reving:" + e.MqttMessage.MessageType);
await e.InvokeNext();
});
})

7.2 取消订阅消息

.ConfigurePlugins(a =>
{
a.AddMqttReceivingPlugin(async (client, e) =>
{
switch (e.MqttMessage)
{
case MqttUnsubscribeMessage message:
{

//取消订阅消息
Console.WriteLine("Reving:" + e.MqttMessage.MessageType);
foreach (var topic in message.TopicFilters)
{
//取消订阅的主题
Console.WriteLine($"Unsubscribe Topic:{topic}");
}
break;
}
default:
break;
}
Console.WriteLine("Reving:" + e.MqttMessage.MessageType);
await e.InvokeNext();
});
})

7.3 发布消息

所以如果你只关心发布成功的消息,那么可以只订阅IMqttReceivedPlugin插件即可。

你可以直接使用委托实现订阅:

.ConfigurePlugins(a =>
{
a.AddMqttReceivedPlugin(async (client, e) =>
{
var mqttMessage = e.MqttMessage;
Console.WriteLine("Reved:" + mqttMessage);

//订阅消息的主题
var topicName = mqttMessage.TopicName;

//订阅消息的Qos级别
var qosLevel = mqttMessage.QosLevel;

//订阅消息的Payload
var payload = mqttMessage.Payload;
await e.InvokeNext();
});
})