Mqtt服务器
定义
定义
一、说明
MqttTcpService
是基于 Mqtt 协议的消息服务端,支持客户端接入管理、订阅关系维护、消息路由转发、遗嘱消息处理等功能,兼容 Mqtt 3.1.1 及 5.0+ 协议版本。
二、特点
- 多协议版本支持(v3.1.1/v5.0)
- 高性能异步架构设计
- 主题树形管理机制
- 精确的 QoS 保障
- 遗嘱消息转发
- 插件化扩展体系
- TLS 加密通信
- 客户端黑白名单控制
三、应用场景
- 工业物联网平台
- 实时数据监控中心
- 智慧城市中枢系统
- 私有化消息总线
- 设备远程管理服务
四、可配置项
继承所有 TcpService 的配置。除此之外还支持 Mqtt 服务端专有配置。
可配置项
五、支持插件
插件方法 | 功能 |
---|---|
IMqttConnectingPlugin | 当Mqtt客户端正在连接之前调用此方法。 |
IMqttConnectedPlugin | 当Mqtt客户端连接成功时调用。 |
IMqttClosingPlugin | 当Mqtt客户端正在关闭时调用。 |
IMqttClosedPlugin | 当Mqtt客户端断开连接后触发。 |
IMqttReceivingPlugin | 在收到Mqtt所有消息时触发,可以通过e.MqttMessage 获取到Mqtt 的所有消息,包括订阅、订阅确认、发布、发布确认等。 |
IMqttReceivedPlugin | 当接收到Mqtt发布消息,且成功接收时触发。可以通过e.MqttMessage 获取到Mqtt 的发布消息。 |
六、创建Mqtt服务端
var service = new MqttTcpService();
await service.SetupAsync(new TouchSocketConfig()
.SetListenIPHosts(7789) // 监听所有网卡
.ConfigureContainer(a =>
{
a.AddConsoleLogger();
})
.ConfigurePlugins(a =>
{
a.AddMqttClientConnectedPlugin(async (service, e) =>
{
Console.WriteLine($"Client Connected:{e.ClientId}");
await e.InvokeNext();
});
a.AddMqttClientDisconnectedPlugin(async (service, e) =>
{
Console.WriteLine($"Client Disconnected:{e.ClientId}");
await e.InvokeNext();
});
}));
await service.StartAsync();
七、接收消息
在Mqtt服务端创建好之后,Mqtt组件的消息都是通过插件抛出的。你可以订阅IMqttReceivingPlugin
插件和IMqttReceivedPlugin
插件来接收消息。
正如插件说明所示,IMqttReceivingPlugin
插件可以在收到所有消息时触发,IMqttReceivedPlugin
插件可以在收到发布消息时触发。
所以如果你只关心发布成功的消息,那么可以只订阅IMqttReceivedPlugin
插件即可。
但是对于服务器来说,也可能需要获取到的消息可能是订阅、发布、取消订阅等。所以你可以通过IMqttReceivingPlugin
的e.MqttMessage
来获取到具体的消息类型。
下列将简单演示。
7.1 订阅消息
.ConfigurePlugins(a =>
{
a.AddMqttReceivingPlugin(async (client, e) =>
{
switch (e.MqttMessage)
{
case MqttSubscribeMessage message:
{
//订阅消息
Console.WriteLine("Reving:" + e.MqttMessage.MessageType);
foreach (var subscribeRequest in message.SubscribeRequests)
{
var topic = subscribeRequest.Topic;
var qosLevel = subscribeRequest.QosLevel;
//或者其他属性
Console.WriteLine($"Subscribe Topic:{topic},QosLevel:{qosLevel}");
}
break;
}
default:
break;
}
Console.WriteLine("Reving:" + e.MqttMessage.MessageType);
await e.InvokeNext();
});
})
7.2 取消订阅消息
.ConfigurePlugins(a =>
{
a.AddMqttReceivingPlugin(async (client, e) =>
{
switch (e.MqttMessage)
{
case MqttUnsubscribeMessage message:
{
//取消订阅消息
Console.WriteLine("Reving:" + e.MqttMessage.MessageType);
foreach (var topic in message.TopicFilters)
{
//取消订阅的主题
Console.WriteLine($"Unsubscribe Topic:{topic}");
}
break;
}
default:
break;
}
Console.WriteLine("Reving:" + e.MqttMessage.MessageType);
await e.InvokeNext();
});
})
7.3 发布消息
所以如果你只关心发布成功的消息,那么可以只订阅IMqttReceivedPlugin
插件即可。
你可以直接使用委托实现订阅:
.ConfigurePlugins(a =>
{
a.AddMqttReceivedPlugin(async (client, e) =>
{
var mqttMessage = e.MqttMessage;
Console.WriteLine("Reved:" + mqttMessage);
//订阅消息的主题
var topicName = mqttMessage.TopicName;
//订阅消息的Qos级别
var qosLevel = mqttMessage.QosLevel;
//订阅消息的Payload
var payload = mqttMessage.Payload;
await e.InvokeNext();
});
})