Rpc功能
定义
命名空间:TouchSocket.Dmtp.Rpc
程序集:TouchSocket.Dmtp.dll
一、说明
RPC(Remote Procedure Call)远程过程调用协议,一种通过网络从远程计算机上请求服务,而不需要了解底层网络技术的协议。RPC它假定某些协议的存在,例如TPC/UDP等,为通信程序之间携带信息数据。在OSI网络七层模型中,RPC跨越了传输层和应用层,RPC使得开发,包括网络分布式多程序在内的应用程序更加容易。
过程是什么? 过程就是业务处理、计算任务,更直白的说,就是程序,就是想调用本地方法一样调用远程的过程。
本Rpc是基于Dmtp协议的Rpc组件。其功能包括:
- 支持客户端主动调用服务器。
- 支持服务主动调用客户端。
- 支持客户端之间互相调用。
- 支持绝大多数数据类型及自定义实体类。
- 支持自定义序列化。
二、使用Rpc服务
2.1 定义服务
- 在服务器端中新建一个类名为
MyRpcServer
。 - 继承于
SingletonRpcServer
类、或实现ISingletonRpcServer
。亦或者将服务器声明为瞬时生命的服务,继承TransientRpcServer
、或ITransientRpcServer
。 - 在该类中写公共方法,并用
DmtpRpc
属性标签标记。
public partial class MyRpcServer : SingletonRpcServer
{
[Description("登录")]//服务描述,在生成代理时,会变成注释。
[DmtpRpc(InvokeKey ="Login")]//服务注册的函数键,此处为显式指定。默认不传参的时候,为该函数类全名+方法 名的全小写。
public bool Login(string account, string password)
{
if (account == "123" && password == "abc")
{
return true;
}
return false;
}
}
public partial class MyRpcServer : TransientRpcServer
{
[Description("登录")]//服务描述,在生成代理时,会变成注释。
[DmtpRpc(InvokeKey ="Login")]//服务注册的函数键,此处为显式指定。默认不传参的时候,为该函数类全名+方法名的全小写。
public bool Login(string account,string password)
{
if (account=="123"&&password=="abc")
{
return true;
}
return false;
}
}
ITransientRpcServer
和ISingletonRpcServer
相比,意为瞬时生命服务,即实现ITransientRpcServer
的服务,在每次被调用时,都会创建一个新的服务实例。其优点为可以直接通过this.CallContext
属性获得调用上下文。其缺点则是每次调用时会多消耗一些性能。
2.2 启动Dmtp并注册Rpc服务
以下仅示例基于Tcp协议Dmtp。其他协议的服务器请看创建Dmtp服务器
更多注册Rpc的方法请看注册Rpc服务
var service = new TcpDmtpService();
var config = new TouchSocketConfig()//配置
.SetListenIPHosts(7789)
.ConfigureContainer(a=>
{
a.AddRpcStore(store =>
{
store.RegisterServer<MyRpcServer>();//注册服务
});
})
.ConfigurePlugins(a =>
{
a.UseDmtpRpc();
})
.SetDmtpOption(new DmtpOption()
{
VerifyToken = "Rpc"//连接验证口令。
});
await service.SetupAsync(config);
await service.StartAsync();
service.Logger.Info($"{service.GetType().Name}已启动");
2.3 调用Rpc
2.3.1 直接调用
直接调用,则是不使用任何代理,使用字符串和参数直接Call Rpc,使用比较简单。
下列以TcpDmtpClient为例,其他客户端请看创建Dmtp客户端。
var client = new TcpDmtpClient();
await client.SetupAsync(new TouchSocketConfig()
.SetRemoteIPHost("127.0.0.1:7789")
.ConfigurePlugins(a =>
{
a.UseDmtpRpc();
})
.SetDmtpOption(new DmtpOption()
{
VerifyToken = "Rpc"//连接验证口令。
}));
await client.ConnectAsync();
bool result =(bool) client.GetDmtpRpcActor().Invoke(typeof(bool),"Login", InvokeOption.WaitInvoke, "123", "abc");
直接调用时,第一个参数为返回值
类型,当没有返回值时则可以不用。第二个参数为调用键
,调用键默认情况下为服务类的“命名空间+类名+方法名
”的全小写
。但在本案例中直接指定了以“Login”为调用键。第三个参数为调用配置
参数,可设置调用超时时间,取消调用等功能。示例中使用的预设,实际上可以自行new InvokeOption() 。后续参数为调用参数
。
或者使用InvokeT
的扩展方法调用。在有返回值时,可以直接泛型传参。
bool result =client.GetDmtpRpcActor().InvokeT<bool>("Login", InvokeOption.WaitInvoke, "123", "abc");
2.3.2 代理调用
代理调用的便捷在于,客户端不用再知道哪些服务可调,也不用再纠结调用的参数类型正不正确,因为这些,代理工具都会替你做好。
详细步骤:
- 生成代理文件
- 将生成的cs文件添加到调用端一起编译。
以上示例,会生成下列代理代码。
生成的代理
using System;
using TouchSocket.Core;
using TouchSocket.Sockets;
using TouchSocket.Rpc;
using TouchSocket.Dmtp.Rpc;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
namespace RpcProxy
{
public interface IMyRpcServer : TouchSocket.Rpc.IRemoteServer
{
///<summary>
///登录
///</summary>
/// <exception cref="System.TimeoutException">调用超 时</exception>
/// <exception cref="TouchSocket.Rpc.RpcInvokeException">Rpc调用异常</exception>
/// <exception cref="System.Exception">其他异常</exception>
System.Boolean Login(System.String account, System.String password, IInvokeOption invokeOption = default);
///<summary>
///登录
///</summary>
/// <exception cref="System.TimeoutException">调用超时</exception>
/// <exception cref="TouchSocket.Rpc.RpcInvokeException">Rpc调用异常</exception>
/// <exception cref="System.Exception">其他异常</exception>
Task<System.Boolean> LoginAsync(System.String account, System.String password, IInvokeOption invokeOption = default);
}
public class MyRpcServer : IMyRpcServer
{
public MyRpcServer(IRpcClient client)
{
this.Client = client;
}
public IRpcClient Client { get; private set; }
///<summary>
///登录
///</summary>
/// <exception cref="System.TimeoutException">调用超时</exception>
/// <exception cref="TouchSocket.Rpc.RpcInvokeException">Rpc调用异常</exception>
/// <exception cref="System.Exception">其他异常</exception>
public System.Boolean Login(System.String account, System.String password, IInvokeOption invokeOption = default)
{
if (Client == null)
{
throw new RpcException("IRpcClient为空,请先初始化或者进行赋值");
}
object[] parameters = new object[] { account, password };
System.Boolean returnData = (System.Boolean)Client.Invoke(typeof(System.Boolean), "Login", invokeOption, parameters);
return returnData;
}
///<summary>
///登录
///</summary>
public async Task<System.Boolean> LoginAsync(System.String account, System.String password, IInvokeOption invokeOption = default)
{
if (Client == null)
{
throw new RpcException("IRpcClient为空,请先初始化或者进行赋值");
}
object[] parameters = new object[] { account, password };
return (System.Boolean)await Client.InvokeAsync(typeof(System.Boolean), "Login", invokeOption, parameters);
}
}
public static class MyRpcServerExtensions
{
///<summary>
///登录
///</summary>
/// <exception cref="System.TimeoutException">调用超时</exception>
/// <exception cref="TouchSocket.Rpc.RpcInvokeException">Rpc调用异常</exception>
/// <exception cref="System.Exception">其他异常</exception>
public static System.Boolean Login<TClient>(this TClient client, System.String account, System.String password, IInvokeOption invokeOption = default) where TClient :
TouchSocket.Rpc.IRpcClient
{
object[] parameters = new object[] { account, password };
System.Boolean returnData = (System.Boolean)client.Invoke(typeof(System.Boolean), "Login", invokeOption, parameters);
return returnData;
}
///<summary>
///登录
///</summary>
public static async Task<System.Boolean> LoginAsync<TClient>(this TClient client, System.String account, System.String password, IInvokeOption invokeOption = default) where TClient :
TouchSocket.Rpc.IRpcClient
{
object[] parameters = new object[] { account, password };
return (System.Boolean)await client.InvokeAsync(typeof(System.Boolean), "Login", invokeOption, parameters);
}
}
}
使用代理扩展直接调用。
var client = new TcpDmtpClient();
await client.SetupAsync(new TouchSocketConfig()
.SetRemoteIPHost("127.0.0.1:7789")
.ConfigurePlugins(a =>
{
a.UseDmtpRpc();
})
.SetDmtpOption(new DmtpOption()
{
VerifyToken = "Rpc"//连接验证口令。
}));
await client.ConnectAsync();
bool result = client.GetDmtpRpcActor().Login("123", "abc", InvokeOption.WaitInvoke);//Login是生成的代理扩展方法。可能需要额外添加命名空间。
client.GetDmtpRpcActor()的 操作,内部还需要执行字典的查询操作。所以,如果为效率考虑的话,在连接稳定的前提下,可以保存好client.GetDmtpRpcActor()的返回值对象,直接执行Rpc操作。但是需要的注意的是,一旦重新连接,则该对象也需要重新获取。
三、反向Rpc
一般的rpc服务都是客户端发起,服务器响应。但是有时候也需要服务器发起,客户端响应,所以需要反向rpc。
3.1 定义、发布反向Rpc服务
实际上,Dmtp的全称(Duplex Message Transport Protocol双工消息传输协议),Duplex意为双工,则表明,当Dmtp客户端连接到服务以后,拥有与服务器同等的通讯权限与功能。所以客户端发布Rpc服务的步骤和服务器完全一致。即:当客户端和服务器建立连接以后,就不再区分谁是客户端,谁是服务器了。只关心,谁能提供服务,谁在调用服务。
下列就以简单的示例下,由客户端声明服务,服务器调用服务。
具体步骤:
- 在客户端项目中定义Rpc服务,名为
ReverseCallbackServer
。 - 用DmtpRpc标记需要公开的公共方法。
public partial class ReverseCallbackServer : SingletonRpcServer
{
[DmtpRpc(MethodInvoke = true)]//使用方法名作为调用键
public string SayHello(string name)
{
return $"{name},hi";
}
}
【客户端注册发布服务】
var client = new TcpDmtpClient();
await client.SetupAsync(new TouchSocketConfig()
.ConfigureContainer(a =>
{
a.AddConsoleLogger();
})
.ConfigureContainer(a =>
{
a.AddRpcStore(store =>
{
store.RegisterServer<ReverseCallbackServer>();
});
})
.ConfigurePlugins(a =>
{
a.UseDmtpRpc();
})
.SetRemoteIPHost("127.0.0.1:7789")
.SetDmtpOption(new DmtpOption()
{
VerifyToken = "Rpc"//连接验证口令。
}));
await client.ConnectAsync();
client.Logger.Info($"连接成功,Id={client.Id}");
3.2 调用反向Rpc
服务器回调客户端,最终必须通过服务器辅助类客户端(ITcpSessionClient
的派生类),以TcpDmtpService
为例,其辅助客户端为TcpDmtpSessionClient
(或其接口:ITcpDmtpSessionClient
)。
下列示例以TcpDmtpSessionClient为例,其余一致。
3.2.1 通过服务器直接获取
可以获取所有TcpDmtpSessionClient
,进行广播式调用。
foreach (var item in service.GetClients())
{
item.GetDmtpRpcActor().InvokeT<string>("SayHello", InvokeOption.WaitInvoke, "张三");
}
也可以先筛选Id,然后再调用。
var id = service.GetIds().FirstOrDefault(a => a.Equals("特定id"));
if (service.TryGetClient(id, out var SessionClient))
{
SessionClient.GetDmtpRpcActor().InvokeT<string>("SayHello", InvokeOption.WaitInvoke, "张三");
}
3.2.2 通过调用上下文获取
例如:下列声明在服务器端的Rpc服务MyRpcServer,使其使用瞬时服务(也可以通过函数注入服务)。
上下文的Caller,即为服务器辅助类终端,进行强转即可。
使用该方式可以实现,当客户端调用服务器的Add接口的时候,服务器又回调客户端的SayHello接口。
partial class MyRpcServer : TransientRpcServer
{
[DmtpRpc(MethodInvoke = true)]//使用函数名直接调用
public int Add(int a, int b)
{
if (this.CallContext.Caller is ITcpDmtpSessionClient SessionClient)
{
SessionClient.GetDmtpRpcActor().InvokeT<string>("SayHello",InvokeOption.WaitInvoke,"张三");
}
int sum = a + b;
return sum;
}
}
反向Rpc也可以使用代理调用。所有用法和常规Rpc一致。
四、客户端互Call Rpc
除了Rpc,反向Rpc,DmtpRpc还支持客户端之间互Call Rpc。服务的定义与Rpc一样。
4.1 互Call RPC
客户端1调用客户端2的方法,需要知道对方的Id。然后和调用Rpc方法一致。然后使用下列函数调用即可。
var client1 = GetTcpDmtpClient();
var client2 = GetTcpDmtpClient();
client1.GetDmtpRpcActor().InvokeT<bool>(client2.Id,"Notice",InvokeOption.WaitInvoke,"Hello");
亦或者
var targetRpcClient = client1.CreateTargetDmtpRpcActor(client2.Id);
targetRpcClient.InvokeT<bool>("Notice", InvokeOption.WaitInvoke, "Hello");
使用上述的CreateTargetDmtpRpcActor(),获取到的targetRpcClient也能使用代理调用Rpc。
互Call Rpc也支持调用上下文。
客户端互Call的时候,每个请求,都需要服务支持路由,且同意路由,才可以被转发。所以服务器需要配置路由策略和添加允许转发的插件。
配置路由。
.ConfigureContainer(a =>
{
a.AddDmtpRouteService();
a.AddConsoleLogger();
})
同意转发路由数据。
internal class MyPlugin : PluginBase,IDmtpRoutingPlugin
{
public async Task OnDmtpRouting(IDmtpActorObject client, PackageRouterEventArgs e)
{
if (e.RouterType == RouteType.Rpc)
{
e.IsPermitOperation = true;
return;
}
await e.InvokeNext();
}
}
五、调用配置
DmtpRpc支持单次调用配置。单次调用配置,就是在每次调用的时候,使用新建DmtpInvokeOption
对象,然后在Invoke
时,传入invokeOption
即可。
其中详细介绍如下:
5.1 FeedbackType
FeedbackType是调用反馈类型,其枚举值分别有OnlySend、WaitSend、WaitInvoke。
OnlySend
意为只发送Rpc请求,不进行任何等待。这在通知类调用时是非常快速的。WaitSend
意为发送Rpc请求,并等待接收结果。即,返回时, 仅表示对方收到了Rpc请求,但是具体执行如何,则不可知。这一般在不可靠协议中是有用的。WaitInvoke
意为发送Rpc请求,并等待执行结果。即,返回时,表示对方已经执行了Rpc请求,如果有执行返回值,则携带返回值。如果执行过程发生异常,则会将异常返回。
5.2 SerializationType
SerializationType
是序列化类型,其枚举值有FastBinary
、Json
、Xml
、SystemBinary
。其特点如下:
FastBinary | Json | Xml | SystemBinary |
---|---|---|---|
序列化方式速度快,数据量小,但是兼容的数据格式也比较有限。仅支持基础类型、自定义实体类、数组、List、字典 | 兼容性好,可读性强,但是受字符串影响,性能不出众,且数据量受限制 | 兼容性一般,可读性强,同样受字符串影响,性能不出众,且数据量受限制 | 序列化速度快。但是兼容性低。且要求类必须一致,不然需要重新指定图根。 |
5.2.1 配置默认序列化选择器
默认序列化选择器在初始化时,可以配置相关属性。例如:
- FastSerializerContext:快速序列化上下文属性
- JsonSerializerSettings:Json序列化设置属性
- SerializationBinder:系统二进制序列 化绑定器
例如:
.ConfigurePlugins(a =>
{
a.UseDmtpRpc()
.SetSerializationSelector(new DefaultSerializationSelector()
{
//仅示例,实际使用时,请赋值有效值
FastSerializerContext = default,
JsonSerializerSettings = default,
SerializationBinder = default,
});
})
5.2.2 自定义序列化
自定义序列化
Dmtp
除了上述的4中内置序列化,还支持自定义序列化。
首先,新建一个类,实现ISerializationConverter
接口,然后实现相关方法。
下列将使用MemoryPack 序列化为例。
public class MemoryPackSerializationSelector : ISerializationSelector
{
public object DeserializeParameter<TByteBlock>(ref TByteBlock byteBlock, SerializationType serializationType, Type parameterType) where TByteBlock : IByteBlock
{
var len = byteBlock.ReadInt32();
var span = byteBlock.ReadToSpan(len);
return MemoryPackSerializer.Deserialize(parameterType, span);
}
public void SerializeParameter<TByteBlock>(ref TByteBlock byteBlock, SerializationType serializationType, in object parameter) where TByteBlock : IByteBlock
{
var pos = byteBlock.Position;
byteBlock.Seek(4, SeekOrigin.Current);
var memoryPackWriter = new MemoryPackWriter<TByteBlock>(ref byteBlock, null);
MemoryPackSerializer.Serialize(parameter.GetType(), ref memoryPackWriter, parameter);
var newPos = byteBlock.Position;
byteBlock.Position = pos;
byteBlock.WriteInt32(memoryPackWriter.WrittenCount);
byteBlock.Position = newPos;
}
}
然后配置序列化器
.ConfigurePlugins(a =>
{
a.UseDmtpRpc()
.SetSerializationSelector(new MemoryPackSerializationSelector());
})
序列化器的配置,必须是调用端和响应端相同的配置。不然序列化不统一,则无法进行反序列化。
最后就是使用序列化。
在上述代码中,我们并没有判断SerializationType,所以在调用时无需特指,它都会以MemoryPack序列化工作。
但有时候我们希望能保留内置序列化类型,所以可以参考内置序列化选择器,然后把新加的序列化再做一次判断。
例如:
internal sealed class DefaultSerializationSelector : ISerializationSelector
{
/// <summary>
/// 根据指定的序列化类型反序列化字节块中的数据。
/// </summary>
/// <param name="byteBlock">包含序列化数据的字节块。</param>
/// <param name="serializationType">指定的序列化类型。</param>
/// <param name="parameterType">预期反序列化出的对象类型。</param>
/// <returns>反序列化后的对象。</returns>
/// <exception cref="RpcException">抛出当未识别序列化类型时。</exception>
public object DeserializeParameter<TByteBlock>(ref TByteBlock byteBlock, SerializationType serializationType, Type parameterType) where TByteBlock : IByteBlock
{
// 根据序列化类型选择不同的反序列化方式
switch (serializationType)
{
case SerializationType.FastBinary:
// 使用FastBinary格式进行反序列化
return FastBinaryFormatter.Deserialize(ref byteBlock, parameterType);
case SerializationType.SystemBinary:
// 检查字节块是否为null
if (byteBlock.ReadIsNull())
{
// 如果为null,则返回该类型的默认值
return parameterType.GetDefault();
}
// 使用SystemBinary格式进行反序列化
using (var block = byteBlock.ReadByteBlock(1024*64))
{
// 将字节块转换为流并进行反序列化
return SerializeConvert.BinaryDeserialize(block.AsStream());
}
case SerializationType.Json:
// 检查字节块是否为null
if (byteBlock.ReadIsNull())
{
// 如果为null,则返回该类型的默认值
return parameterType.GetDefault();
}
// 使用Json格式进行反 序列化
return JsonConvert.DeserializeObject(byteBlock.ReadString(), parameterType);
case SerializationType.Xml:
// 检查字节块是否为null
if (byteBlock.ReadIsNull())
{
// 如果为null,则返回该类型的默认值
return parameterType.GetDefault();
}
// 使用Xml格式进行反序列化
return SerializeConvert.XmlDeserializeFromBytes(byteBlock.ReadBytesPackage(), parameterType);
case (SerializationType)4:
{
var Length = byteBlock.ReadInt32();
var span = byteBlock.ReadToSpan(Length);
return MemoryPackSerializer.Deserialize(parameterType, span);
}
default:
// 如果序列化类型未识别,则抛出异常
throw new RpcException("未指定的反序列化方式");
}
}
/// <summary>
/// 序列化参数
/// </summary>
/// <param name="byteBlock">字节块引用,用于存储序列化后的数据</param>
/// <param name="serializationType">序列化类型,决定了使用哪种方式序列化</param>
/// <param name="parameter">待序列化的参数对象</param>
/// <typeparam name="TByteBlock">字节块类型,必须实现IByteBlock接口</typeparam>
public void SerializeParameter<TByteBlock>(ref TByteBlock byteBlock, SerializationType serializationType, in object parameter) where TByteBlock : IByteBlock
{
// 根据序列化类型选择不同的序列化方法
switch (serializationType)
{
case SerializationType.FastBinary:
{
// 使用FastBinaryFormatter进行序列化
FastBinaryFormatter.Serialize(ref byteBlock, parameter);
break;
}
case SerializationType.SystemBinary:
{
// 参数为null时,写入空值标记
if (parameter is null)
{
byteBlock.WriteNull();
}
else
{
// 参数不为null时,标记并序列化参数
byteBlock.WriteNotNull();
using (var block = new ByteBlock(1024 * 64))
{
// 使用System.Runtime.Serialization.BinaryFormatter进行序列化
SerializeConvert.BinarySerialize(block.AsStream(), parameter);
// 将序列化后的字节块写入byteBlock
byteBlock.WriteByteBlock(block);
}
}
break;
}
case SerializationType.Json:
{
// 参数为null时,写入空值标记
if (parameter is null)
{
byteBlock.WriteNull();
}
else
{
// 参数不为null时,标记并转换为JSON字符串
byteBlock.WriteNotNull();
byteBlock.WriteString(JsonConvert.SerializeObject(parameter));
}
break;
}
case SerializationType.Xml:
{
// 参数为null时,写入空值标记
if (parameter is null)
{
byteBlock.WriteNull();
}
else
{
// 参数不为null时,标记并转换为Xml字节
byteBlock.WriteNotNull();
byteBlock.WriteBytesPackage(SerializeConvert.XmlSerializeToBytes(parameter));
}
break;
}
case (SerializationType)4:
{
var pos = byteBlock.Position;
byteBlock.Seek(4, SeekOrigin.Current);
var memoryPackWriter = new MemoryPackWriter<TByteBlock>(ref byteBlock, null);
MemoryPackSerializer.Serialize(parameter.GetType(), ref memoryPackWriter, parameter);
var newPos = byteBlock.Position;
byteBlock.Position = pos;
byteBlock.WriteInt32(memoryPackWriter.WrittenCount);
byteBlock.Position = newPos;
break;
}
default:
// 抛出异常,提示未指定的序列化方式
throw new RpcException("未指定的序列化方式");
}
}
}
最后在使用时,因为序列化类型是枚举值,所以使用时需要强制转换一下。
var invokeOption = new DmtpInvokeOption()//调用配置
{
FeedbackType = FeedbackType.WaitInvoke,//调用反馈类型
SerializationType = (SerializationType)4,//序列化类型
Timeout = 5000,//调用超时设置
Token = tokenSource.Token//配置可取消令箭
};
5.3 Timeout
Timeout是超时时间,单位是毫秒。
5.4 CancellationToken
CancellationToken是取消令箭源,可用于取消Rpc的调用。
取消调用时,其取消消息可以传递到被调用端,所以,被调用端可以通过ICallContext(调用上下文)获取到Token。
var client = GetTcpDmtpClient();
//设置调用配置
var tokenSource = new CancellationTokenSource();//可取消令箭源,可用于取消Rpc的调用
var invokeOption = new DmtpInvokeOption()//调用配置
{
FeedbackType = FeedbackType.WaitInvoke,//调用反馈类型
SerializationType = SerializationType.FastBinary,//序列化类型
Timeout = 5000,//调用超时设置
Token = tokenSource.Token//配置可取消令箭
};
var sum = client.GetDmtpRpcActor().InvokeT<int>("Add", invokeOption, 10, 20);
client.Logger.Info($"调用Add方法成功,结果:{sum}");
5.5 Metadata 元数据
Metadata是字符串键值对,其作用类似http的headers,用于传递一些附加信息。
在请求时可以通过DmtpInvokeOption进行传参。
var invokeOption = new DmtpInvokeOption()//调用配置
{
FeedbackType = FeedbackType.WaitInvoke,//调用反馈类型
SerializationType = SerializationType.FastBinary,//序列化类型
Timeout = 5000,//调用超时设置
Metadata=new Metadata(){{"a","a"}}
};
var metadata = client.GetDmtpRpcActor().InvokeT<Metadata>("CallContextMetadata", invokeOption);
在接收可以通过IDmtpCallContext(调用上下文)获取到Metadata。
[DmtpRpc(MethodInvoke = true)]
public Metadata CallContextMetadata(IDmtpRpcCallContext callContext)
{
return callContext.Metadata;
}
六、Rpc大数据传输
在Rpc中,并没有对传输的数据做限制,但是因为Rpc默认使用的固定包头适配器中,默认设置的可传递数据为10Mb,所以在Rpc中,用户可一次性传递的数据包大约为9.9Mb。所以,如果用户传递超出阈值的数据,适配器则会触发异常,而无法接收。但在实际上Rpc的使用中,大数据的传输也是很重要的一个环节,所以在此做了大数据的传输思路建议,希望能有效解决大家的麻烦。
6.1 设置适配器参数(推荐指数:⭐)
操作原理:在固定包头适配器中,默认限制了单次可发送数据包的最大值,所以可以修改此值实现目的。
该方法简单粗暴,能够解决一定程度的大数据问题,但并不建议这么做。
TouchSocketConfig config = new TouchSocketConfig()//配置
.SetMaxPackageSize(1024 * 1024 * 10)
客户端必须同样设置。
6.2 Rpc嵌套Channel(推荐指数:⭐⭐⭐⭐⭐)
操作原理:先利用Rpc让客户端与服务器约定特定的Channel,然后后续数据通过Channel传递,最后由Rpc返回结果。
6.2.1 请求流数据
【Service端】
/// <summary>
/// 测试客户端请求,服务器响应大量流数据
/// </summary>
/// <param name="callContext"></param>
/// <param name="channelID"></param>
[Description("测试客户端请求,服务器响应大量流数据")]
[DmtpRpc]
public int RpcPullChannel(ICallContext callContext, int channelID)
{
var size = 0;
var package = 1024 * 64;
if (callContext.Caller is TcpDmtpSessionClient SessionClient)
{
if (SessionClient.TrySubscribeChannel(channelID, out var channel))
{
for (var i = 0; i < 10; i++)
{
size += package;
channel.Write(new byte[package]);
}
channel.Complete();//必须调用指令函数,如Complete,Cancel,Dispose
}
}
return size;
}
【Client端】
using var client = GetTcpDmtpClient();
ChannelStatus status = ChannelStatus.Default;
int size = 0;
var channel = client.CreateChannel();//创建通道
Task task = Task.Run(() =>//这里必须用异步
{
using (channel)
{
foreach (var currentByteBlock in channel)
{
size += currentByteBlock.Length;//此处可以处理传递来的流数据
}
status = channel.Status;//最后状态
}
});
int result = client.GetDmtpRpcActor().RpcPullChannel(channel.Id);//RpcPullChannel是代理方法,此处会阻塞至服务器全部发送完成。
await task;//等待异步接收完成
Console.WriteLine($"状态:{status},size={size}");
6.2.2 推送流数据
【Service端】
/// <summary>
/// "测试推送"
/// </summary>
/// <param name="callContext"></param>
/// <param name="channelID"></param>
[Description("测试客户端推送流数据")]
[DmtpRpc]
public int RpcPushChannel(ICallContext callContext, int channelID)
{
int size = 0;
if (callContext.Caller is TcpDmtpSessionClient SessionClient)
{
if (SessionClient.TrySubscribeChannel(channelID, out var channel))
{
foreach (var item in channel)
{
size += item.Length;//此处处理流数据
}
}
}
return size;
}
【Client端】
using var client = GetTcpDmtpClient();
ChannelStatus status = ChannelStatus.Default;
int size = 0;
int package = 1024;
var channel = client.CreateChannel();//创建通道
Task task = Task.Run(() =>//这里必须用异步
{
for (int i = 0; i < 1024; i++)
{
size += package;
channel.Write(new byte[package]);
}
channel.Complete();//必须调用指令函数,如Complete,Cancel,Dispose
status = channel.Status;
});
int result = client.GetDmtpRpcActor().RpcPushChannel(channel.Id);//RpcPushChannel是代理方法,此处会阻塞至服务器全部完成。
await task;//等待异步接收完成
Console.WriteLine($"状态:{status},result={result}");
七、限制代理接口
默认情况下,代理生成的接口,是面向IRpcClient
的,即:面向所有Rpc终端
。但是有时候我们希望不同的终端,只能调用不同的方法。甚至有时候希望,不同的终端可以重载方法。所以如果不对生成的接口做限制,就可能发生下图问题。

关于代理代码生成接口限制,请看服务代理生成或者源服务代理生成
接下来就讲讲客户端如何实现接口限制。
首先按需求,声明多个继承IDmtpRpcActor
的接口,此处有IRpcClient1
与IRpcClient2
两个。
然后新建类,命名为MyDmtpRpcActor
,继承DmtpRpcActor
,然后分别实现IRpcClient1
与IRpcClient2
两个接口。
interface IRpcClient1:IDmtpRpcActor
{
}
interface IRpcClient2 : IDmtpRpcActor
{
}
class MyDmtpRpcActor : DmtpRpcActor, IRpcClient1, IRpcClient2
{
public MyDmtpRpcActor(IDmtpActor smtpActor) : base(smtpActor)
{
}
}
然后在UseDmtpRpc
时,设置SetCreateDmtpRpcActor
,这样获得的实际实例则会是MyDmtpRpcActor
类型。
var client = new TcpDmtpClient();
await client.SetupAsync(new TouchSocketConfig()
.ConfigurePlugins(a =>
{
a.UseDmtpRpc()
.SetCreateDmtpRpcActor((actor)=>new MyDmtpRpcActor(actor));
})
.SetRemoteIPHost("127.0.0.1:7789")
.SetDmtpOption(new DmtpOption()
{
VerifyToken = "Rpc"//连接验证口令。
}));
await client.ConnectAsync();
最后在获得RpcActor时,就可以按接口获取。然后配合服务器代码接口约束,就可以实现我们所期望的功能。
IRpcClient1 rpcClient1= client.GetDmtpRpcActor<IRpcClient1>();
IRpcClient2 rpcClient2= client.GetDmtpRpcActor<IRpcClient2>();