跳到主要内容
版本:4.0-rc

Rpc执行调度器

一、说明

Rpc执行调度器(RpcDispatcher)是TouchSocket.Rpc框架中的核心组件之一,用于实现远程过程调用(Rpc)的服务执行方式。

目前,内置了ConcurrencyDispatcherQueueRpcDispatcherImmediateRpcDispatcher三种调度器,分别用于实现不同场景下的服务执行方式。

提示

调度器的选择对RPC服务的性能和行为有重要影响,应根据实际业务场景选择合适的调度器。

二、调度器说明

RpcDispatcher是直接应用于Rpc Caller的。所以它在被调用方可能是以多个实例的形式存在。

例如DmtpRpc,对于服务端而言,如果有多个Rpc客户端连接,那么每个连接都会拥有一个RpcDispatcher实例。

2.1 并发调度器(ConcurrencyDispatcher)

并发调度器(ConcurrencyDispatcher)是在收到Rpc请求后,直接使用线程池(Task.Run)直接执行Rpc。所以它是完全并发的。

适用场景:

  • 需要高并发处理的场景
  • 各个RPC调用之间无依赖关系
  • 追求最大吞吐量的场景

配置示例:

🔄 正在加载代码...

2.2 队列调度器(QueueRpcDispatcher)

队列调度器(QueueRpcDispatcher)是在收到Rpc请求后,先将请求放入队列中,再通过线程(Task.Run)执行Rpc。所以它在当前实例中(可以简单理解为一个连接)是有顺序的。

适用场景:

  • 需要保证同一连接的RPC调用按顺序执行
  • 避免并发执行导致的资源竞争
  • 需要控制执行流程的场景

配置示例:

🔄 正在加载代码...

2.3 立即调度器(ImmediateRpcDispatcher)

立即调度器(ImmediateRpcDispatcher)是在收到Rpc请求后,使用Rpc接收线程,直接执行Rpc。所以它的同步性是依赖接收线程的。

适用场景:

  • 需要最低延迟的场景
  • RPC方法执行时间极短
  • 不希望引入额外线程切换开销

配置示例:

🔄 正在加载代码...

三、使用

Rpc调度器并非强制要求,具体还得看Rpc框架本身支不支持多样的调度方式。

例如,DmtpRpc框架就支持所有调度方式,但是对于WebApi框架,它只支持ImmediateRpcDispatcher

所以实际使用还得看具体框架的支持情况。

3.1 创建客户端并调用

🔄 正在加载代码...

3.2 并发调用示例

🔄 正在加载代码...

四、可重入性

可重入性也表示的是在一个CallerRpc调用中的并发性。不过可以使用[Reenterable]特性来控制方法、服务级别的并发。

可重入性会受调度器的完全影响。如果调度器本身不支持可重入性,那么它将不能支持可重入性。

例如,对于并发调度器(ConcurrencyDispatcher),它默认支持可重入性。

但是对于有的Rpc方法,我们希望仅这个方法不支持可重入性,那么我们可以使用[Reenterable]特性来控制。

4.1 使用Reenterable特性

通过在接口方法上添加[Reenterable(false)]特性,可以禁止该方法的可重入调用,即使使用的是并发调度器,该方法也会按顺序执行。

🔄 正在加载代码...

4.2 调度器与可重入性对照表

调度器类型默认可重入性支持Reenterable特性
ConcurrencyDispatcher
QueueRpcDispatcher
ImmediateRpcDispatcher
注意
  • 队列调度器和立即调度器默认不支持可重入,即使设置[Reenterable(true)]也不会生效
  • 只有在并发调度器下,才能通过[Reenterable(false)]来禁止特定方法的并发执行
  • 可重入性控制是针对单个连接(Caller)的,不同连接之间不受影响

五、全局队列调度器

除了上述三种调度器外,TouchSocket还提供了一个全局队列调度器(GlobalQueueRpcDispatcher),它与普通队列调度器的区别在于:

  • 普通队列调度器:每个连接拥有独立的队列实例
  • 全局队列调度器:所有连接共享同一个队列实例

适用场景:

  • 需要在所有连接之间保证执行顺序
  • 限制系统整体的并发执行数量
  • 实现全局的任务调度

配置方式:

.ConfigurePlugins(a =>
{
a.UseDmtpRpc(options =>
{
options.UseGlobalQueueRpcDispatcher();
});
})

六、自定义调度器

如果内置的调度器无法满足需求,可以通过实现IRpcDispatcher接口来自定义调度器。

示例:

public class MyCustomDispatcher : IRpcDispatcher<IDmtpActor, IDmtpRpcCallContext>
{
public async Task ExecuteAsync(IDmtpActor caller, IDmtpRpcCallContext callContext, Func<Task> action)
{
// 自定义调度逻辑
await action();
}
}

// 配置使用自定义调度器
.ConfigurePlugins(a =>
{
a.UseDmtpRpc(options =>
{
options.CreateDispatcher = (actor) => new MyCustomDispatcher();
});
})

七、最佳实践

  1. 高并发场景:使用ConcurrencyDispatcher以获得最大吞吐量
  2. 顺序执行需求:使用QueueRpcDispatcher保证同一连接的调用顺序
  3. 低延迟需求:使用ImmediateRpcDispatcher,但要确保RPC方法执行时间短
  4. 全局控制:使用GlobalQueueRpcDispatcher实现跨连接的顺序控制
  5. 特殊场景:通过自定义调度器实现特定的业务需求
性能建议
  • 避免在ImmediateRpcDispatcher中执行耗时操作,否则会阻塞接收线程
  • 对于I/O密集型操作,推荐使用ConcurrencyDispatcher
  • 对于CPU密集型操作,可以考虑使用QueueRpcDispatcher控制并发数