跳到主要内容
版本:1.3.9

Rpc大数据流式传输

一、说明

在RPC中,并没有对传输的数据做限制,但是因为RPC默认使用的固定包头适配器中,默认设置的可传递数据为10Mb,所以在RPC中,用户可一次性传递的数据包大约为9.9Mb。所以,如果用户传递超出阈值的数据,适配器则会触发异常,而无法接收。但在实际上RPC的使用中,大数据的传输也是很重要的一个环节,所以RRQM已经做了大数据的传输思路建议,希望能有效解决大家的麻烦。

二、设置适配器参数(推荐指数:⭐)

操作原理:在固定包头适配器中,默认限制了单次可发送数据包的最大值,所以可以修改此值实现目的。

该方法简单粗暴,能够解决一定程度的大数据问题,但并不建议这么做。

注意:客户端必须同样设置。

TouchSocketConfig config = new TouchSocketConfig()//配置
.SetMaxPackageSize(1024 * 1024 * 10)

三、RPC嵌套Channel(推荐指数:⭐⭐⭐⭐⭐)

操作原理:先利用RPC让客户端与服务器约定特定的Channel,然后后续数据通过Channel传递,最后由RPC返回结果。

3.1 请求流数据

【Service端】

/// <summary>
/// "测试ServiceToClient创建通道,从而实现流数据的传输"
/// </summary>
/// <param name="callContext"></param>
/// <param name="channelID"></param>
[Description("测试ServiceToClient创建通道,从而实现流数据的传输")]
[TouchRpc(MethodFlags = MethodFlags.IncludeCallContext)]
public int RpcPullChannel(ICallContext callContext, int channelID)
{
int size = 0;
int package = 1024 * 1024;
if (callContext.Caller is TcpTouchRpcSocketClient socketClient)
{
if (socketClient.TrySubscribeChannel(channelID, out Channel channel))
{
for (int i = 0; i < 1024; i++)
{
size += package;
channel.Write(new byte[package]);
}
channel.Complete();//必须调用指令函数,如Complete,Cancel,Dispose
}
}
return size;
}

【Client端】

ChannelStatus status = ChannelStatus.Default;
int size = 0;
Channel channel = client.CreateChannel();//创建通道
Task task = Task.Run(() =>//这里必须用异步
{
using (channel)
{
while (channel.MoveNext())
{
byte[] data = channel.GetCurrent();
size += data.Length;
}
status = channel.Status;
}
});
int result = client.RpcPullChannel(channel.ID);//RpcPullChannel是代理方法,此处会阻塞至服务器全部发送完成。
await task;//等待异步接收完成
Console.WriteLine($"状态:{status},size={size}");

3.2 推送流数据

【Service端】

/// <summary>
/// "测试推送"
/// </summary>
/// <param name="callContext"></param>
/// <param name="channelID"></param>
[Description("测试ServiceToClient创建通道,从而实现流数据的传输")]
[TouchRpc(MethodFlags = MethodFlags.IncludeCallContext)]
public int RpcPushChannel(ICallContext callContext, int channelID)
{
int size = 0;

if (callContext.Caller is TcpTouchRpcSocketClient socketClient)
{
if (socketClient.TrySubscribeChannel(channelID, out Channel channel))
{
while (channel.MoveNext())
{
size += channel.GetCurrent().Length;
}
}
}
return size;
}

【Client端】

ChannelStatus status = ChannelStatus.Default;
int size = 0;
int package = 1024 * 1024;
Channel channel = client.CreateChannel();//创建通道
Task task = Task.Run(() =>//这里必须用异步
{
for (int i = 0; i < 1024; i++)
{
size += package;
channel.Write(new byte[package]);
}
channel.Complete();//必须调用指令函数,如Complete,Cancel,Dispose
});
int result = client.RpcPushChannel(channel.ID);//RpcPushChannel是代理方法,此处会阻塞至服务器全部完成。
await task;//等待异步接收完成
Console.WriteLine($"状态:{status},result={result}");