跳到主要内容
版本:3.0

Rpc功能

定义

命名空间:TouchSocket.Dmtp.Rpc
程序集:TouchSocket.Dmtp.dll

一、说明

RPC(Remote Procedure Call)远程过程调用协议,一种通过网络从远程计算机上请求服务,而不需要了解底层网络技术的协议。RPC它假定某些协议的存在,例如TPC/UDP等,为通信程序之间携带信息数据。在OSI网络七层模型中,RPC跨越了传输层和应用层,RPC使得开发,包括网络分布式多程序在内的应用程序更加容易。

过程是什么? 过程就是业务处理、计算任务,更直白的说,就是程序,就是想调用本地方法一样调用远程的过程。

本Rpc是基于Dmtp协议的Rpc组件。其功能包括:

  • 支持客户端主动调用服务器。
  • 支持服务主动调用客户端。
  • 支持客户端之间互相调用。
  • 支持绝大多数数据类型及自定义实体类。
  • 支持自定义序列化。
  • 支持out,ref关键字。

二、使用Rpc服务

2.1 定义服务

  1. 服务器端中新建一个类名为MyRpcServer
  2. 继承于RpcServer类、或实现IRpcServer。亦或者将服务器声明为瞬时生命的服务,继承TransientRpcServer、或ITransientRpcServer。
  3. 在该类中写公共方法,并用DmtpRpc属性标签标记。
public partial class MyRpcServer : RpcServer
{
[Description("登录")]//服务描述,在生成代理时,会变成注释。
[DmtpRpc("Login")]//服务注册的函数键,此处为显式指定。默认不传参的时候,为该函数类全名+方法名的全小写。
public bool Login(string account, string password)
{
if (account == "123" && password == "abc")
{
return true;
}

return false;
}
}
public partial class MyRpcServer : TransientRpcServer
{
[Description("登录")]//服务描述,在生成代理时,会变成注释。
[DmtpRpc("Login")]//服务注册的函数键,此处为显式指定。默认不传参的时候,为该函数类全名+方法名的全小写。
public bool Login(string account,string password)
{
if (account=="123"&&password=="abc")
{
return true;
}

return false;
}
}
信息

ITransientRpcServerIRpcServer相比,意为瞬时生命服务,即实现ITransientRpcServer的服务,在每次被调用时,都会创建一个新的服务实例。其优点为可以直接通过this.CallContext属性获得调用上下文。其缺点则是每次调用时会多消耗一些性能。

2.2 启动Dmtp并注册Rpc服务

以下仅示例基于Tcp协议Dmtp。其他协议的服务器请看创建Dmtp服务器

更多注册Rpc的方法请看注册Rpc服务

var service = new TcpDmtpService();
var config = new TouchSocketConfig()//配置
.SetListenIPHosts(7789)
.ConfigureContainer(a=>
{
a.AddRpcStore(store =>
{
store.RegisterServer<MyRpcServer>();//注册服务
});
})
.ConfigurePlugins(a =>
{
a.UseDmtpRpc();
})
.SetDmtpOption(new DmtpOption()
{
VerifyToken = "Rpc"//连接验证口令。
});

await service.SetupAsync(config);

await service.StartAsync();

service.Logger.Info($"{service.GetType().Name}已启动");

2.3 调用Rpc

2.3.1 直接调用

直接调用,则是不使用任何代理,使用字符串参数直接Call Rpc,使用比较简单。

下列以TcpDmtpClient为例,其他客户端请看创建Dmtp客户端

var client = new TcpDmtpClient();
await client.SetupAsync(new TouchSocketConfig()
.SetRemoteIPHost("127.0.0.1:7789")
.ConfigurePlugins(a =>
{
a.UseDmtpRpc();
})
.SetDmtpOption(new DmtpOption()
{
VerifyToken = "Rpc"//连接验证口令。
}));
await client.ConnectAsync();

bool result =(bool) client.GetDmtpRpcActor().Invoke(typeof(bool),"Login", InvokeOption.WaitInvoke, "123", "abc");
信息

直接调用时,第一个参数为返回值类型,当没有返回值时则可以不用。第二个参数为调用键,调用键默认情况下为服务类的“命名空间+类名+方法名”的全小写。但在本案例中直接指定了以“Login”为调用键。第三个参数为调用配置参数,可设置调用超时时间,取消调用等功能。示例中使用的预设,实际上可以自行new InvokeOption()。后续参数为调用参数

或者使用InvokeT的扩展方法调用。在有返回值时,可以直接泛型传参。

bool result =client.GetDmtpRpcActor().InvokeT<bool>("Login", InvokeOption.WaitInvoke, "123", "abc");

2.3.2 代理调用

代理调用的便捷在于,客户端不用再知道哪些服务可调,也不用再纠结调用的参数类型正不正确,因为这些,代理工具都会替你做好。

详细步骤:

  1. 生成代理文件
  2. 将生成的cs文件添加到调用端一起编译。
备注

以上示例,会生成下列代理代码。

【生成的代理】

using System;
using TouchSocket.Core;
using TouchSocket.Sockets;
using TouchSocket.Rpc;
using TouchSocket.Dmtp.Rpc;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
namespace RpcProxy
{
public interface IMyRpcServer : TouchSocket.Rpc.IRemoteServer
{
///<summary>
///登录
///</summary>
/// <exception cref="System.TimeoutException">调用超时</exception>
/// <exception cref="TouchSocket.Rpc.RpcInvokeException">Rpc调用异常</exception>
/// <exception cref="System.Exception">其他异常</exception>
System.Boolean Login(System.String account, System.String password, IInvokeOption invokeOption = default);
///<summary>
///登录
///</summary>
/// <exception cref="System.TimeoutException">调用超时</exception>
/// <exception cref="TouchSocket.Rpc.RpcInvokeException">Rpc调用异常</exception>
/// <exception cref="System.Exception">其他异常</exception>
Task<System.Boolean> LoginAsync(System.String account, System.String password, IInvokeOption invokeOption = default);
}
public class MyRpcServer : IMyRpcServer
{
public MyRpcServer(IRpcClient client)
{
this.Client = client;
}
public IRpcClient Client { get; private set; }
///<summary>
///登录
///</summary>
/// <exception cref="System.TimeoutException">调用超时</exception>
/// <exception cref="TouchSocket.Rpc.RpcInvokeException">Rpc调用异常</exception>
/// <exception cref="System.Exception">其他异常</exception>
public System.Boolean Login(System.String account, System.String password, IInvokeOption invokeOption = default)
{
if (Client == null)
{
throw new RpcException("IRpcClient为空,请先初始化或者进行赋值");
}
object[] parameters = new object[] { account, password };
System.Boolean returnData = (System.Boolean)Client.Invoke(typeof(System.Boolean), "Login", invokeOption, parameters);
return returnData;
}
///<summary>
///登录
///</summary>
public async Task<System.Boolean> LoginAsync(System.String account, System.String password, IInvokeOption invokeOption = default)
{
if (Client == null)
{
throw new RpcException("IRpcClient为空,请先初始化或者进行赋值");
}
object[] parameters = new object[] { account, password };
return (System.Boolean)await Client.InvokeAsync(typeof(System.Boolean), "Login", invokeOption, parameters);
}
}
public static class MyRpcServerExtensions
{
///<summary>
///登录
///</summary>
/// <exception cref="System.TimeoutException">调用超时</exception>
/// <exception cref="TouchSocket.Rpc.RpcInvokeException">Rpc调用异常</exception>
/// <exception cref="System.Exception">其他异常</exception>
public static System.Boolean Login<TClient>(this TClient client, System.String account, System.String password, IInvokeOption invokeOption = default) where TClient :
TouchSocket.Rpc.IRpcClient
{
object[] parameters = new object[] { account, password };
System.Boolean returnData = (System.Boolean)client.Invoke(typeof(System.Boolean), "Login", invokeOption, parameters);
return returnData;
}
///<summary>
///登录
///</summary>
public static async Task<System.Boolean> LoginAsync<TClient>(this TClient client, System.String account, System.String password, IInvokeOption invokeOption = default) where TClient :
TouchSocket.Rpc.IRpcClient
{
object[] parameters = new object[] { account, password };
return (System.Boolean)await client.InvokeAsync(typeof(System.Boolean), "Login", invokeOption, parameters);
}
}
}

使用代理扩展直接调用。

var client = new TcpDmtpClient();
await client.SetupAsync(new TouchSocketConfig()
.SetRemoteIPHost("127.0.0.1:7789")
.ConfigurePlugins(a =>
{
a.UseDmtpRpc();
})
.SetDmtpOption(new DmtpOption()
{
VerifyToken = "Rpc"//连接验证口令。
}));
await client.ConnectAsync();

bool result = client.GetDmtpRpcActor().Login("123", "abc", InvokeOption.WaitInvoke);//Login是生成的代理扩展方法。可能需要额外添加命名空间。
提示

client.GetDmtpRpcActor()的操作,内部还需要执行字典的查询操作。所以,如果为效率考虑的话,在连接稳定的前提下,可以保存好client.GetDmtpRpcActor()的返回值对象,直接执行Rpc操作。但是需要的注意的是,一旦重新连接,则该对象也需要重新获取。

三、反向Rpc

一般的rpc服务都是客户端发起,服务器响应。但是有时候也需要服务器发起,客户端响应,所以需要反向rpc。

3.1 定义、发布反向Rpc服务

实际上,Dmtp的全称(Duplex Message Transport Protocol双工消息传输协议),Duplex意为双工,则表明,当Dmtp客户端连接到服务以后,拥有与服务器同等的通讯权限与功能。所以客户端发布Rpc服务的步骤和服务器完全一致。即:当客户端和服务器建立连接以后,就不再区分谁是客户端,谁是服务器了。只关心,谁能提供服务,谁在调用服务

下列就以简单的示例下,由客户端声明服务,服务器调用服务。

具体步骤:

  1. 客户端项目中定义Rpc服务,名为ReverseCallbackServer
  2. DmtpRpc标记需要公开的公共方法。
public partial class ReverseCallbackServer : RpcServer
{
[DmtpRpc(MethodInvoke = true)]//使用方法名作为调用键
public string SayHello(string name)
{
return $"{name},hi";
}
}

【客户端注册发布服务】

var client = new TcpDmtpClient();
await client.SetupAsync(new TouchSocketConfig()
.ConfigureContainer(a =>
{
a.AddConsoleLogger();
})
.ConfigureContainer(a =>
{
a.AddRpcStore(store =>
{
store.RegisterServer<ReverseCallbackServer>();
});
})
.ConfigurePlugins(a =>
{
a.UseDmtpRpc();
})
.SetRemoteIPHost("127.0.0.1:7789")
.SetDmtpOption(new DmtpOption()
{
VerifyToken = "Rpc"//连接验证口令。
}));
await client.ConnectAsync();
client.Logger.Info($"连接成功,Id={client.Id}");

3.2 调用反向Rpc

服务器回调客户端,最终必须通过服务器辅助类客户端ITcpSessionClient的派生类),以TcpDmtpService为例,其辅助客户端为TcpDmtpSessionClient(或其接口:ITcpDmtpSessionClient)。

下列示例以TcpDmtpSessionClient为例,其余一致。

3.2.1 通过服务器直接获取

可以获取所有TcpDmtpSessionClient,进行广播式调用。

foreach (var item in service.GetClients())
{
item.GetDmtpRpcActor().InvokeT<string>("SayHello", InvokeOption.WaitInvoke, "张三");
}

也可以先筛选Id,然后再调用。

var id = service.GetIds().FirstOrDefault(a => a.Equals("特定id"));
if (service.TryGetClient(id, out var SessionClient))
{
SessionClient.GetDmtpRpcActor().InvokeT<string>("SayHello", InvokeOption.WaitInvoke, "张三");
}

3.2.2 通过调用上下文获取

例如:下列声明在服务器端的Rpc服务MyRpcServer,使其使用瞬时服务(也可以通过函数注入服务)。

上下文的Caller,即为服务器辅助类终端,进行强转即可。

使用该方式可以实现,当客户端调用服务器的Add接口的时候,服务器又回调客户端的SayHello接口。

partial class MyRpcServer : TransientRpcServer
{
[DmtpRpc(MethodInvoke = true)]//使用函数名直接调用
public int Add(int a, int b)
{
if (this.CallContext.Caller is ITcpDmtpSessionClient SessionClient)
{
SessionClient.GetDmtpRpcActor().InvokeT<string>("SayHello",InvokeOption.WaitInvoke,"张三");
}
int sum = a + b;
return sum;
}
}
提示

反向Rpc也可以使用代理调用。所有用法和常规Rpc一致。

四、客户端互Call Rpc

除了Rpc,反向Rpc,DmtpRpc还支持客户端之间互Call Rpc。服务的定义与Rpc一样。

4.1 互Call RPC

客户端1调用客户端2的方法,需要知道对方的Id。然后和调用Rpc方法一致。然后使用下列函数调用即可。

var client1 = GetTcpDmtpClient();
var client2 = GetTcpDmtpClient();

client1.GetDmtpRpcActor().InvokeT<bool>(client2.Id,"Notice",InvokeOption.WaitInvoke,"Hello");

亦或者

var targetRpcClient = client1.CreateTargetDmtpRpcActor(client2.Id);
targetRpcClient.InvokeT<bool>("Notice", InvokeOption.WaitInvoke, "Hello");
提示

使用上述的CreateTargetDmtpRpcActor(),获取到的targetRpcClient也能使用代理调用Rpc。

提示

互Call Rpc也支持调用上下文。

服务器注意

客户端互Call的时候,每个请求,都需要服务支持路由,且同意路由,才可以被转发。所以服务器需要配置路由策略和添加允许转发的插件。

配置路由。

.ConfigureContainer(a =>
{
a.AddDmtpRouteService();
a.AddConsoleLogger();
})

同意转发路由数据。

internal class MyPlugin : PluginBase,IDmtpRoutingPlugin
{
public async Task OnDmtpRouting(IDmtpActorObject client, PackageRouterEventArgs e)
{
if (e.RouterType == RouteType.Rpc)
{
e.IsPermitOperation = true;
return;
}

await e.InvokeNext();
}
}

五、调用配置

DmtpRpc支持单次调用配置。单次调用配置,就是在每次调用的时候,使用新建DmtpInvokeOption对象,然后在Invoke时,传入invokeOption即可。

其中详细介绍如下:

5.1 FeedbackType

FeedbackType是调用反馈类型,其枚举值分别有OnlySend、WaitSend、WaitInvoke。

  • OnlySend意为只发送Rpc请求,不进行任何等待。这在通知类调用时是非常快速的。
  • WaitSend意为发送Rpc请求,并等待接收结果。即,返回时,仅表示对方收到了Rpc请求,但是具体执行如何,则不可知。这一般在不可靠协议中是有用的。
  • WaitInvoke意为发送Rpc请求,并等待执行结果。即,返回时,表示对方已经执行了Rpc请求,如果有执行返回值,则携带返回值。如果执行过程发生异常,则会将异常返回。

5.2 SerializationType

SerializationType是序列化类型,其枚举值有FastBinaryJsonXmlSystemBinary。其特点如下:

FastBinaryJsonXmlSystemBinary
序列化方式速度快,数据量小,但是兼容的数据格式也比较有限。仅支持基础类型、自定义实体类、数组、List、字典兼容性好,可读性强,但是受字符串影响,性能不出众,且数据量受限制兼容性一般,可读性强,同样受字符串影响,性能不出众,且数据量受限制序列化速度快。但是兼容性低。且要求类必须一致,不然需要重新指定图根。

5.2.1 配置默认序列化选择器

默认序列化选择器在初始化时,可以配置相关属性。例如:

  • FastSerializerContext:快速序列化上下文属性
  • JsonSerializerSettings:Json序列化设置属性
  • SerializationBinder:系统二进制序列化绑定器

例如:

.ConfigurePlugins(a =>
{
a.UseDmtpRpc()
.SetSerializationSelector(new DefaultSerializationSelector()
{
//仅示例,实际使用时,请赋值有效值
FastSerializerContext = default,
JsonSerializerSettings = default,
SerializationBinder = default,
});
})

5.2.2 自定义序列化

自定义序列化

Dmtp除了上述的4中内置序列化,还支持自定义序列化。

首先,新建一个类,实现ISerializationConverter接口,然后实现相关方法。

下列将使用MemoryPack 序列化为例。

public class MemoryPackSerializationSelector : ISerializationSelector
{
public object DeserializeParameter<TByteBlock>(ref TByteBlock byteBlock, SerializationType serializationType, Type parameterType) where TByteBlock : IByteBlock
{
var len = byteBlock.ReadInt32();
var span = byteBlock.ReadToSpan(len);
return MemoryPackSerializer.Deserialize(parameterType, span);
}

public void SerializeParameter<TByteBlock>(ref TByteBlock byteBlock, SerializationType serializationType, in object parameter) where TByteBlock : IByteBlock
{
var pos = byteBlock.Position;
byteBlock.Seek(4, SeekOrigin.Current);
var memoryPackWriter = new MemoryPackWriter<TByteBlock>(ref byteBlock, null);

MemoryPackSerializer.Serialize(parameter.GetType(), ref memoryPackWriter, parameter);

var newPos = byteBlock.Position;
byteBlock.Position = pos;
byteBlock.WriteInt32(memoryPackWriter.WrittenCount);
byteBlock.Position = newPos;
}
}

然后配置序列化器

.ConfigurePlugins(a =>
{
a.UseDmtpRpc()
.SetSerializationSelector(new MemoryPackSerializationSelector());
})
注意

序列化器的配置,必须是调用端和响应端相同的配置。不然序列化不统一,则无法进行反序列化。

最后就是使用序列化。

在上述代码中,我们并没有判断SerializationType,所以在调用时无需特指,它都会以MemoryPack序列化工作。

但有时候我们希望能保留内置序列化类型,所以可以参考内置序列化选择器,然后把新加的序列化再做一次判断。

例如:

internal sealed class DefaultSerializationSelector : ISerializationSelector
{
/// <summary>
/// 根据指定的序列化类型反序列化字节块中的数据。
/// </summary>
/// <param name="byteBlock">包含序列化数据的字节块。</param>
/// <param name="serializationType">指定的序列化类型。</param>
/// <param name="parameterType">预期反序列化出的对象类型。</param>
/// <returns>反序列化后的对象。</returns>
/// <exception cref="RpcException">抛出当未识别序列化类型时。</exception>
public object DeserializeParameter<TByteBlock>(ref TByteBlock byteBlock, SerializationType serializationType, Type parameterType) where TByteBlock : IByteBlock
{
// 根据序列化类型选择不同的反序列化方式
switch (serializationType)
{
case SerializationType.FastBinary:
// 使用FastBinary格式进行反序列化
return FastBinaryFormatter.Deserialize(ref byteBlock, parameterType);
case SerializationType.SystemBinary:
// 检查字节块是否为null
if (byteBlock.ReadIsNull())
{
// 如果为null,则返回该类型的默认值
return parameterType.GetDefault();
}

// 使用SystemBinary格式进行反序列化
using (var block = byteBlock.ReadByteBlock())
{
// 将字节块转换为流并进行反序列化
return SerializeConvert.BinaryDeserialize(block.AsStream());
}
case SerializationType.Json:
// 检查字节块是否为null
if (byteBlock.ReadIsNull())
{
// 如果为null,则返回该类型的默认值
return parameterType.GetDefault();
}

// 使用Json格式进行反序列化
return JsonConvert.DeserializeObject(byteBlock.ReadString(), parameterType);

case SerializationType.Xml:
// 检查字节块是否为null
if (byteBlock.ReadIsNull())
{
// 如果为null,则返回该类型的默认值
return parameterType.GetDefault();
}
// 使用Xml格式进行反序列化
return SerializeConvert.XmlDeserializeFromBytes(byteBlock.ReadBytesPackage(), parameterType);
case (SerializationType)4:
{
var len = byteBlock.ReadInt32();
var span = byteBlock.ReadToSpan(len);
return MemoryPackSerializer.Deserialize(parameterType, span);
}
default:
// 如果序列化类型未识别,则抛出异常
throw new RpcException("未指定的反序列化方式");
}
}

/// <summary>
/// 序列化参数
/// </summary>
/// <param name="byteBlock">字节块引用,用于存储序列化后的数据</param>
/// <param name="serializationType">序列化类型,决定了使用哪种方式序列化</param>
/// <param name="parameter">待序列化的参数对象</param>
/// <typeparam name="TByteBlock">字节块类型,必须实现IByteBlock接口</typeparam>
public void SerializeParameter<TByteBlock>(ref TByteBlock byteBlock, SerializationType serializationType, in object parameter) where TByteBlock : IByteBlock
{
// 根据序列化类型选择不同的序列化方法
switch (serializationType)
{
case SerializationType.FastBinary:
{
// 使用FastBinaryFormatter进行序列化
FastBinaryFormatter.Serialize(ref byteBlock, parameter);
break;
}
case SerializationType.SystemBinary:
{
// 参数为null时,写入空值标记
if (parameter is null)
{
byteBlock.WriteNull();
}
else
{
// 参数不为null时,标记并序列化参数
byteBlock.WriteNotNull();
using (var block = new ByteBlock(1024 * 64))
{
// 使用System.Runtime.Serialization.BinaryFormatter进行序列化
SerializeConvert.BinarySerialize(block.AsStream(), parameter);
// 将序列化后的字节块写入byteBlock
byteBlock.WriteByteBlock(block);
}
}
break;
}
case SerializationType.Json:
{
// 参数为null时,写入空值标记
if (parameter is null)
{
byteBlock.WriteNull();
}
else
{
// 参数不为null时,标记并转换为JSON字符串
byteBlock.WriteNotNull();
byteBlock.WriteString(JsonConvert.SerializeObject(parameter));
}
break;
}
case SerializationType.Xml:
{
// 参数为null时,写入空值标记
if (parameter is null)
{
byteBlock.WriteNull();
}
else
{
// 参数不为null时,标记并转换为Xml字节
byteBlock.WriteNotNull();
byteBlock.WriteBytesPackage(SerializeConvert.XmlSerializeToBytes(parameter));
}
break;
}
case (SerializationType)4:
{
var pos = byteBlock.Position;
byteBlock.Seek(4, SeekOrigin.Current);
var memoryPackWriter = new MemoryPackWriter<TByteBlock>(ref byteBlock, null);

MemoryPackSerializer.Serialize(parameter.GetType(), ref memoryPackWriter, parameter);

var newPos = byteBlock.Position;
byteBlock.Position = pos;
byteBlock.WriteInt32(memoryPackWriter.WrittenCount);
byteBlock.Position = newPos;

break;
}
default:
// 抛出异常,提示未指定的序列化方式
throw new RpcException("未指定的序列化方式");
}
}
}

最后在使用时,因为序列化类型是枚举值,所以使用时需要强制转换一下。

var invokeOption = new DmtpInvokeOption()//调用配置
{
FeedbackType = FeedbackType.WaitInvoke,//调用反馈类型
SerializationType = (SerializationType)4,//序列化类型
Timeout = 5000,//调用超时设置
Token = tokenSource.Token//配置可取消令箭
};

序列化选择器Demo

5.3 Timeout

Timeout是超时时间,单位是毫秒。

5.4 CancellationToken

CancellationToken是取消令箭源,可用于取消Rpc的调用。

提示

取消调用时,其取消消息可以传递到被调用端,所以,被调用端可以通过ICallContext(调用上下文)获取到Token。

var client = GetTcpDmtpClient();

//设置调用配置
var tokenSource = new CancellationTokenSource();//可取消令箭源,可用于取消Rpc的调用
var invokeOption = new DmtpInvokeOption()//调用配置
{
FeedbackType = FeedbackType.WaitInvoke,//调用反馈类型
SerializationType = SerializationType.FastBinary,//序列化类型
Timeout = 5000,//调用超时设置
Token = tokenSource.Token//配置可取消令箭
};

var sum = client.GetDmtpRpcActor().InvokeT<int>("Add", invokeOption, 10, 20);
client.Logger.Info($"调用Add方法成功,结果:{sum}");

5.5 Metadata 元数据

Metadata是字符串键值对,其作用类似http的headers,用于传递一些附加信息。

在请求时可以通过DmtpInvokeOption进行传参。

var invokeOption = new DmtpInvokeOption()//调用配置
{
FeedbackType = FeedbackType.WaitInvoke,//调用反馈类型
SerializationType = SerializationType.FastBinary,//序列化类型
Timeout = 5000,//调用超时设置
Metadata=new Metadata(){{"a","a"}}
};

var metadata = client.GetDmtpRpcActor().InvokeT<Metadata>("CallContextMetadata", invokeOption);

在接收可以通过IDmtpCallContext(调用上下文)获取到Metadata。

[DmtpRpc(MethodInvoke = true)]
public Metadata CallContextMetadata(IDmtpRpcCallContext callContext)
{
return callContext.Metadata;
}

六、Rpc大数据传输

在Rpc中,并没有对传输的数据做限制,但是因为Rpc默认使用的固定包头适配器中,默认设置的可传递数据为10Mb,所以在Rpc中,用户可一次性传递的数据包大约为9.9Mb。所以,如果用户传递超出阈值的数据,适配器则会触发异常,而无法接收。但在实际上Rpc的使用中,大数据的传输也是很重要的一个环节,所以在此做了大数据的传输思路建议,希望能有效解决大家的麻烦。

6.1 设置适配器参数(推荐指数:⭐)

操作原理:在固定包头适配器中,默认限制了单次可发送数据包的最大值,所以可以修改此值实现目的。

该方法简单粗暴,能够解决一定程度的大数据问题,但并不建议这么做。

TouchSocketConfig config = new TouchSocketConfig()//配置
.SetMaxPackageSize(1024 * 1024 * 10)
注意

客户端必须同样设置。

6.2 Rpc嵌套Channel(推荐指数:⭐⭐⭐⭐⭐)

操作原理:先利用Rpc让客户端与服务器约定特定的Channel,然后后续数据通过Channel传递,最后由Rpc返回结果。

6.2.1 请求流数据

【Service端】

/// <summary>
/// 测试客户端请求,服务器响应大量流数据
/// </summary>
/// <param name="callContext"></param>
/// <param name="channelID"></param>
[Description("测试客户端请求,服务器响应大量流数据")]
[DmtpRpc]
public int RpcPullChannel(ICallContext callContext, int channelID)
{
var size = 0;
var package = 1024 * 64;
if (callContext.Caller is TcpDmtpSessionClient SessionClient)
{
if (SessionClient.TrySubscribeChannel(channelID, out var channel))
{
for (var i = 0; i < 10; i++)
{
size += package;
channel.Write(new byte[package]);
}
channel.Complete();//必须调用指令函数,如Complete,Cancel,Dispose
}
}
return size;
}

【Client端】

using var client = GetTcpDmtpClient();
ChannelStatus status = ChannelStatus.Default;
int size = 0;
var channel = client.CreateChannel();//创建通道
Task task = Task.Run(() =>//这里必须用异步
{
using (channel)
{
foreach (var currentByteBlock in channel)
{
size += currentByteBlock.Len;//此处可以处理传递来的流数据
}
status = channel.Status;//最后状态
}
});
int result = client.GetDmtpRpcActor().RpcPullChannel(channel.Id);//RpcPullChannel是代理方法,此处会阻塞至服务器全部发送完成。
await task;//等待异步接收完成
Console.WriteLine($"状态:{status},size={size}");

6.2.2 推送流数据

【Service端】

/// <summary>
/// "测试推送"
/// </summary>
/// <param name="callContext"></param>
/// <param name="channelID"></param>
[Description("测试客户端推送流数据")]
[DmtpRpc]
public int RpcPushChannel(ICallContext callContext, int channelID)
{
int size = 0;

if (callContext.Caller is TcpDmtpSessionClient SessionClient)
{
if (SessionClient.TrySubscribeChannel(channelID, out var channel))
{
foreach (var item in channel)
{
size += item.Len;//此处处理流数据
}
}
}
return size;
}

【Client端】

using var client = GetTcpDmtpClient();
ChannelStatus status = ChannelStatus.Default;
int size = 0;
int package = 1024;
var channel = client.CreateChannel();//创建通道
Task task = Task.Run(() =>//这里必须用异步
{
for (int i = 0; i < 1024; i++)
{
size += package;
channel.Write(new byte[package]);
}
channel.Complete();//必须调用指令函数,如Complete,Cancel,Dispose

status = channel.Status;
});
int result = client.GetDmtpRpcActor().RpcPushChannel(channel.Id);//RpcPushChannel是代理方法,此处会阻塞至服务器全部完成。
await task;//等待异步接收完成
Console.WriteLine($"状态:{status},result={result}");

七、限制代理接口

默认情况下,代理生成的接口,是面向IRpcClient的,即:面向所有Rpc终端。但是有时候我们希望不同的终端,只能调用不同的方法。甚至有时候希望,不同的终端可以重载方法。所以如果不对生成的接口做限制,就可能发生下图问题。

关于代理代码生成接口限制,请看服务代理生成或者源服务代理生成

接下来就讲讲客户端如何实现接口限制。

首先按需求,声明多个继承IDmtpRpcActor的接口,此处有IRpcClient1IRpcClient2两个。

然后新建类,命名为MyDmtpRpcActor,继承DmtpRpcActor,然后分别实现IRpcClient1IRpcClient2两个接口。

interface IRpcClient1:IDmtpRpcActor
{

}

interface IRpcClient2 : IDmtpRpcActor
{

}

class MyDmtpRpcActor : DmtpRpcActor, IRpcClient1, IRpcClient2
{
public MyDmtpRpcActor(IDmtpActor smtpActor) : base(smtpActor)
{
}
}

然后在UseDmtpRpc时,设置SetCreateDmtpRpcActor,这样获得的实际实例则会是MyDmtpRpcActor类型。

var client = new TcpDmtpClient();
await client.SetupAsync(new TouchSocketConfig()
.ConfigurePlugins(a =>
{
a.UseDmtpRpc()
.SetCreateDmtpRpcActor((actor)=>new MyDmtpRpcActor(actor));
})
.SetRemoteIPHost("127.0.0.1:7789")
.SetDmtpOption(new DmtpOption()
{
VerifyToken = "Rpc"//连接验证口令。
}));
await client.ConnectAsync();

最后在获得RpcActor时,就可以按接口获取。然后配合服务器代码接口约束,就可以实现我们所期望的功能。

IRpcClient1 rpcClient1= client.GetDmtpRpcActor<IRpcClient1>();
IRpcClient2 rpcClient2= client.GetDmtpRpcActor<IRpcClient2>();

本文示例Demo