Rpc执行调度器
一、说明
Rpc执行调度器(RpcDispatcher)是TouchSocket.Rpc框架中的核心组件之一,用于实现远程过程调用(Rpc)的服务执行方式。
目前,内置了ConcurrencyDispatcher、QueueRpcDispatcher、ImmediateRpcDispatcher三种调度器,分别用于实现不同场景下的服务执行方式。
调度器的选择对RPC服务的性能和行为有重要影响,应根据实际业务场景选择合适的调度器。
二、调度器说明
RpcDispatcher是直接应用于Rpc Caller的。所以它在被调用方可能是以多个实例的形式存在。
例如DmtpRpc,对于服务端而言,如果有多个Rpc客户端连接,那么每个连接都会拥有一个RpcDispatcher实例。
2.1 并发调度器(ConcurrencyDispatcher)
并发调度器(ConcurrencyDispatcher)是在收到Rpc请求后,直接使用线程池(Task.Run)直接执行Rpc。所以它是完全并发的。
适用场景:
- 需要高并发处理的场景
- 各个RPC调用之间无依赖关系
- 追求最大吞吐量的场景
配置示例:
2.2 队列调度器(QueueRpcDispatcher)
队列调度器(QueueRpcDispatcher)是在收到Rpc请求后,先将请求放入队列中,再通过线程(Task.Run)执行Rpc。所以它在当前实例中(可以简单理解为一个连接)是有顺序的。
适用场景:
- 需要保证同一连接的RPC调用按顺序执行
- 避免并发执行导致的资源竞争
- 需要控制执行流程的场景
配置示例:
2.3 立即调度器(ImmediateRpcDispatcher)
立即调度器(ImmediateRpcDispatcher)是在收到Rpc请求后,使用Rpc接收线程,直接执行Rpc。所以它的同步性是依赖接收线程的。
适用场景:
- 需要最低延迟的场景
- RPC方法执行时间极短
- 不希望引入额外线程切换开销
配置示例:
三、使用
Rpc调度器并非强制要求,具体还得看Rpc框架本身支不支持多样的调度方式。
例如,DmtpRpc框架就支持所有调度方式,但是对于WebApi框架,它只支持ImmediateRpcDispatcher。
所以实际使用还得看具体框架的支持情况。
3.1 创建客户端并调用
3.2 并发调用示例
四、可重入性
可重入性也表示的是在一个Caller的Rpc调用中的并发性。不过可以使用[Reenterable]特性来控制方法、服务级别的并发。
可重入性会受调度器的完全影响。如果调度器本身不支持可重入性,那么它将不能支持可重入性。
例如,对于并发调度器(ConcurrencyDispatcher),它默认支持可重入性。
但是对于有的Rpc方法,我们希望仅这个方法不支持可重入性,那么我们可以使用[Reenterable]特性来控制。
4.1 使用Reenterable特性
通过在接口方法上添加[Reenterable(false)]特性,可以禁止该方法的可重入调用,即使使用的是并发调度器,该方法也会按顺序执行。
4.2 调度器与可重入性对照表
| 调度器类型 | 默认可重入性 | 支持Reenterable特性 |
|---|---|---|
| ConcurrencyDispatcher | 是 | 是 |
| QueueRpcDispatcher | 否 | 是 |
| ImmediateRpcDispatcher | 否 | 是 |
- 队列调度器和立即调度器默认不支持可重入,即使设置
[Reenterable(true)]也不会生效 - 只有在并发调度器下,才能通过
[Reenterable(false)]来禁止特定方法的并发执行 - 可重入性控制是针对单个连接(Caller)的,不同连接之间不受影响
五、全局队列调度器
除了上述三种调度器外,TouchSocket还提供了一个全局队列调度器(GlobalQueueRpcDispatcher),它与普通队列调度器的区别在于:
- 普通队列调度器:每个连接拥有独立的队列实例
- 全局队列调度器:所有连接共享同一个队列实例
适用场景:
- 需要在所有连接之间保证执行顺序
- 限制系统整体的并发执行数量
- 实现全局的任务调度
配置方式:
.ConfigurePlugins(a =>
{
a.UseDmtpRpc(options =>
{
options.UseGlobalQueueRpcDispatcher();
});
})
六、自定义调度器
如果内置的调度器无法满足需求,可以通过实现IRpcDispatcher接口来自定义调度器。
示例:
public class MyCustomDispatcher : IRpcDispatcher<IDmtpActor, IDmtpRpcCallContext>
{
public async Task ExecuteAsync(IDmtpActor caller, IDmtpRpcCallContext callContext, Func<Task> action)
{
// 自定义调度逻辑
await action();
}
}
// 配置使用自定义调度器
.ConfigurePlugins(a =>
{
a.UseDmtpRpc(options =>
{
options.CreateDispatcher = (actor) => new MyCustomDispatcher();
});
})
七、最佳实践
- 高并发场景:使用
ConcurrencyDispatcher以获得最大吞吐量 - 顺序执行需求:使用
QueueRpcDispatcher保证同一连接的调用顺序 - 低延迟需求:使用
ImmediateRpcDispatcher,但要确保RPC方法执行时间短 - 全局控制:使用
GlobalQueueRpcDispatcher实现跨连接的顺序控制 - 特殊场景:通过自定义调度器实现特定的业务需求
- 避免在
ImmediateRpcDispatcher中执行耗时操作,否则会阻塞接收线程 - 对于I/O密集型操作,推荐使用
ConcurrencyDispatcher - 对于CPU密集型操作,可以考虑使用
QueueRpcDispatcher控制并发数