MqttRpc 使用指南
定义
一、概述
MqttRpc 是基于 Mqtt 协议实现的 RPC 组件,允许 Mqtt 客户端之间进行远程过程调用。它以 Mqtt Broker 为消息中转中心,使用 JSON 对请求和响应进行编解码,具备良好的跨语言兼容性。
与传统的 Tcp/Http RPC 不同,MqttRpc 天然契合物联网场景,客户端既可以作为 RPC 服务端(处理请求),也可以作为 RPC 调用端(发起请求),甚至可以同时承担两种角色。
1.1 架构原理
调用流程说明:
- 调用端向
mqttrpc/req主题发布 JSON 格式的请求报文,报文中携带响应主题地址mqttrpc/res/{callerId} - Broker 将请求路由到订阅了
mqttrpc/req的所有服务端客户端 - 服务端收到请求后,通过注册的 RpcStore 匹配并执行对应的 RPC 方法
- 服务端将方法返回值序列化为 JSON,发布到请求报文中指定的响应主题
- 调用端接收响应并将结果返回给上层调用代码
1.2 核心特点
- 基于 Mqtt 协议,天然支持 QoS 0/1/2 消息可靠性保障
- 使用 JSON 编码,跨语言、跨平台兼容
- 支持同步等待响应(
WaitInvoke)和仅发送(OnlySend)两种调用模式 - 支持调用超时与
CancellationToken取消 - 支持 RPC 调用上下文(
IMqttRpcCallContext),可获取调用方信息 - 支持 DispatchProxy 强类型代理调用
- 支持源码生成器自动生成代理扩展方法
- 服务端与调用端可以是同一个 Mqtt 客户端,双向通信无缝衔接
- 可与任何标准 Mqtt Broker(EMQX、Mosquitto 等)配合使用
二、定义 RPC 服务
在服务端创建一个类,继承 SingletonRpcServer(或实现 ISingletonRpcServer),然后在该类中定义公共方法,并使用 [MqttRpc] 特性标记。
2.1 调用键说明
调用键(InvokeKey)默认格式为:命名空间.类名.方法名(全部小写)。
例如,MqttRpcConsoleApp.MyRpcServer.Add 方法的调用键为:mqttrpcconsoleapp.myrpcserver.add。
若在定义代理接口时继承自 ISingletonRpcServer,调用键基于接口的命名空间和接口名生成,而非实现类名。
2.2 调用上下文
[MqttRpc] 标记的方法支持将 IMqttRpcCallContext 作为第一个参数,该参数由框架自动注入,无需客户端传递。通过它可以获取调用方的 Mqtt 会话信息(如客户端 ID、连接状态等)。
更多 RPC 注册方式请参考:注册 Rpc 服务
三、创建 Mqtt Broker
MqttRpc 需要一个 Mqtt Broker 作为消息中转中心。MqttTcpService 本身即具备完整的 Broker(消息路由转发)能力。
在生产环境中,可使用任何标准 Mqtt Broker(如 EMQX、Mosquitto 等)替代,无需修改客户端代码。只需确保服务端和调用端连接到同一个 Broker 即可。
四、创建 RPC 服务端客户端
RPC 服务端是一个普通的 Mqtt 客户端,通过 UseMqttRpc 插件订阅请求主题并处理 RPC 调用。该模式下,客户端既能处理 RPC 请求,也能发起 RPC 调用。
- 在
ConfigureContainer中通过AddRpcStore注册 RPC 服务类(支持接口映射和直接注册两种方式) - 在
ConfigurePlugins中通过UseMqttRpc启用服务端+客户端双向 RPC 功能 UseMqttRpc会自动从容器中解析IRpcServerProvider,无需手动传入- 连接后插件会自动订阅
RequestTopic(处理传入请求)和ResponseTopicPrefix/+(接收响应)
五、创建 RPC 调用端客户端
调用端同样是一个 Mqtt 客户端,通过 UseMqttRpcClient 插件以纯客户端模式运行,只发起调用、不注册任何 RPC 服务。
UseMqttRpc(服务端+客户端)与 UseMqttRpcClient(纯客户端)的区别:
| 插件 | 订阅请求主题 | 订阅响应主题 | 适用场景 |
|---|---|---|---|
UseMqttRpc | ✅ | ✅ | 既要处理请求,也要发起调用 |
UseMqttRpcClient | ❌ | ✅ | 只发起调用,不处理 RPC 请求 |
六、配置选项(MqttRpcOption)
| 属性 | 类型 | 默认值 | 说明 |
|---|---|---|---|
RequestTopic | string | mqttrpc/req | RPC 请求主题,服务端订阅此主题接收请求 |
ResponseTopicPrefix | string | mqttrpc/res | 响应主题前缀,实际响应主题为 {前缀}/{唯一标识} |
QosLevel | QosLevel | AtLeastOnce | 请求/响应消息的 QoS 等级(AtMostOnce/AtLeastOnce/ExactlyOnce) |
SerializerOptions | JsonSerializerOptions | 默认 | JSON 序列化选项,可自定义序列化行为 |
服务端与调用端配置的 RequestTopic 和 ResponseTopicPrefix 必须保持一致,否则无法完成 RPC 调用。建议将这两个值集中定义为常量以避免配置不一致。
七、调用 RPC 方法
连接成功后,通过 GetMqttRpcClient() 扩展方法获取 IMqttRpcClient 接口,即可发起调用。
7.1 直接调用(InvokeAsync)
使用 InvokeAsync 方法进行原生调用,需要手动指定调用键和返回值类型。
InvokeOption 提供以下调用模式:
| 选项 | 说明 |
|---|---|
InvokeOption.WaitInvoke | 等待服务端执行完毕并返回结果(默认超时 5000ms) |
InvokeOption.OnlySend | 仅发送请求,不等待响应(fire-and-forget) |
也可以自定义调用选项:
7.2 代理调用(源码生成器)
通过定义接口并使用 [GeneratorRpcProxy] 特性,源码生成器会自动生成强类型代理扩展方法,消除字符串调用键和类型转换的繁琐操作。
第一步: 定义代理接口,方法签名与服务端保持一致(带 Async 后缀的异步版本),并标记 [GeneratorRpcProxy] 和 [MqttRpc]:
第二步: 直接使用生成的扩展方法调用,类型安全,无需字符串调用键:
MqttRpc 支持 RPC 平台的所有功能,如过滤器、调度器以及所有代理调用方式。更多代理调用方式请参考:服务端代理生成
八、序列化配置
MqttRpc 默认使用 System.Text.Json 进行序列化。可以通过 MqttRpcOption.SerializerOptions 自定义序列化行为:
服务端与调用端的 SerializerOptions 必须保持一致(例如属性命名策略、枚举处理方式等),否则反序列化可能失败。