Rpc功能
定义
定义
一、说明
RPC(Remote Procedure Call)远程过程调用协议,一种通过网络从远程计算机上请求服务,而不需要了解底层网络技术的协议。RPC它假定某些协议的存在,例如TPC/UDP等,为通信程序之间携带信息数据。在OSI网络七层模型中,RPC跨越了传输层和应用层,RPC使得开发,包括网络分布式多程序在内的应用程序更加容易。
过程是什么? 过程就是业务处理、计算任务,更直白的说,就是程序,就是想调用本地方法一样调用远程的过程。
本Rpc是基于Dmtp协议的Rpc组件。其功能包括:
- 支持客户端主动调用服务器。
 - 支持服务主动调用客户端。
 - 支持客户端之间互相调用。
 - 支持绝大多数数据类型及自定义实体类。
 - 支持自定义序列化。
 - 支持out,ref关键字。
 
二、使用Rpc服务
2.1 定义服务
- 在服务器端中新建一个类名为MyRpcServer。
 - 继承于RpcServer类、或实现IRpcServer。亦或者将服务器声明为瞬时生命的服务,继承TransientRpcServer、或ITransientRpcServer。
 - 在该类中写公共方法,并用DmtpRpc属性标签标记。
 
public partial class MyRpcServer : RpcServer
{
    [Description("登录")]//服务描述,在生成代理时,会变成注释。
    [DmtpRpc(InvokeKey ="Login")]//服务注册的函数键,此处为显式指定。默认不传参的时候,为该函数类全名+方法名的全小写。
    public bool Login(string account, string password)
    {
        if (account == "123" && password == "abc")
        {
            return true;
        }
        return false;
    }
}
public partial class MyRpcServer : TransientRpcServer
{
    [Description("登录")]//服务描述,在生成代理时,会变成注释。
    [DmtpRpc(InvokeKey ="Login")]//服务注册的函数键,此处为显式指定。默认不传参的时候,为该函数类全名+方法名的全小写。
    public bool Login(string account,string password)
    {
        if (account=="123"&&password=="abc")
        {
            return true;
        }
        return false;
    }
}
ITransientRpcServer和IRpcServer相比,意为瞬时生命服务,即实现ITransientRpcServer的服务,在每次被调用时,都会创建一个新的服务实例。其优点为可以直接通过this.CallContext属性获得调用上下文。其缺点则是每次调用时会多消耗一些性能。
2.2 启动Dmtp并注册Rpc服务
以下仅示例基于Tcp协议Dmtp。其他协议的服务器请看创建Dmtp服务器
更多注册Rpc的方法请看注册Rpc服务
var service = new TcpDmtpService();
var config = new TouchSocketConfig()//配置
       .SetListenIPHosts(7789)
       .ConfigureContainer(a=> 
       {
           a.AddRpcStore(store =>
           {
               store.RegisterServer<MyRpcServer>();//注册服务
           });
       })
       .ConfigurePlugins(a =>
       {
           a.UseDmtpRpc();
       })
       .SetDmtpOption(new DmtpOption()
       {
           VerifyToken = "Rpc"//连接验证口令。
       });
await service.SetupAsync(config);
await service.StartAsync();
service.Logger.Info($"{service.GetType().Name}已启动");
2.3 调用Rpc
2.3.1 直接调用
直接调用,则是不使用任何代理,使用字符串和参数直接Call Rpc,使用比较简单。
下列以TcpDmtpClient为例,其他客户端请看创建Dmtp客户端。
var client = new TcpDmtpClient();
await client.SetupAsync(new TouchSocketConfig()
    .SetRemoteIPHost("127.0.0.1:7789")
    .ConfigurePlugins(a =>
    {
        a.UseDmtpRpc();
    })
    .SetDmtpOption(new DmtpOption()
    {
        VerifyToken = "Rpc"//连接验证口令。
    }));
await client.ConnectAsync();
bool result =(bool) client.GetDmtpRpcActor().Invoke(typeof(bool),"Login", InvokeOption.WaitInvoke, "123", "abc");
直接调用时,第一个参数为返回值类型,当没有返回值时则可以不用。第二个参数为调用键,调用键默认情况下为服务类的“命名空间+类名+方法名”的全小写。但在本案例中直接指定了以“Login”为调用键。第三个参数为调用配置参数,可设置调用超时时间,取消调用等功能。示例中使用的预设,实际上可以自行new InvokeOption()。后续参数为调用参数。
或者使用InvokeT的扩展方法调用。在有返回值时,可以直接泛型传参。
bool result =client.GetDmtpRpcActor().InvokeT<bool>("Login", InvokeOption.WaitInvoke, "123", "abc");
2.3.2 代理调用
代理调用的便捷在于,客户端不用再知道哪些服务可调,也不用再纠结调用的参数类型正不正确,因为这些,代理工具都会替你做好。
详细步骤:
- 生成代理文件
 - 将生成的cs文件添加到调用端一起编译。
 
以上示例,会生成下列代理代码。
【生成的代理】
using System;
using TouchSocket.Core;
using TouchSocket.Sockets;
using TouchSocket.Rpc;
using TouchSocket.Dmtp.Rpc;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
namespace RpcProxy
{
    public interface IMyRpcServer : TouchSocket.Rpc.IRemoteServer
    {
        ///<summary>
        ///登录
        ///</summary>
        /// <exception cref="System.TimeoutException">调用超时</exception>
        /// <exception cref="TouchSocket.Rpc.RpcInvokeException">Rpc调用异常</exception>
        /// <exception cref="System.Exception">其他异常</exception>
        System.Boolean Login(System.String account, System.String password, IInvokeOption invokeOption = default);
        ///<summary>
        ///登录
        ///</summary>
        /// <exception cref="System.TimeoutException">调用超时</exception>
        /// <exception cref="TouchSocket.Rpc.RpcInvokeException">Rpc调用异常</exception>
        /// <exception cref="System.Exception">其他异常</exception>
        Task<System.Boolean> LoginAsync(System.String account, System.String password, IInvokeOption invokeOption = default);
    }
    public class MyRpcServer : IMyRpcServer
    {
        public MyRpcServer(IRpcClient client)
        {
            this.Client = client;
        }
        public IRpcClient Client { get; private set; }
        ///<summary>
        ///登录
        ///</summary>
        /// <exception cref="System.TimeoutException">调用超时</exception>
        /// <exception cref="TouchSocket.Rpc.RpcInvokeException">Rpc调用异常</exception>
        /// <exception cref="System.Exception">其他异常</exception>
        public System.Boolean Login(System.String account, System.String password, IInvokeOption invokeOption = default)
        {
            if (Client == null)
            {
                throw new RpcException("IRpcClient为空,请先初始化或者进行赋值");
            }
            object[] parameters = new object[] { account, password };
            System.Boolean returnData = (System.Boolean)Client.Invoke(typeof(System.Boolean), "Login", invokeOption, parameters);
            return returnData;
        }
        ///<summary>
        ///登录
        ///</summary>
        public async Task<System.Boolean> LoginAsync(System.String account, System.String password, IInvokeOption invokeOption = default)
        {
            if (Client == null)
            {
                throw new RpcException("IRpcClient为空,请先初始化或者进行赋值");
            }
            object[] parameters = new object[] { account, password };
            return (System.Boolean)await Client.InvokeAsync(typeof(System.Boolean), "Login", invokeOption, parameters);
        }
    }
    public static class MyRpcServerExtensions
    {
        ///<summary>
        ///登录
        ///</summary>
        /// <exception cref="System.TimeoutException">调用超时</exception>
        /// <exception cref="TouchSocket.Rpc.RpcInvokeException">Rpc调用异常</exception>
        /// <exception cref="System.Exception">其他异常</exception>
        public static System.Boolean Login<TClient>(this TClient client, System.String account, System.String password, IInvokeOption invokeOption = default) where TClient :
        TouchSocket.Rpc.IRpcClient
        {
            object[] parameters = new object[] { account, password };
            System.Boolean returnData = (System.Boolean)client.Invoke(typeof(System.Boolean), "Login", invokeOption, parameters);
            return returnData;
        }
        ///<summary>
        ///登录
        ///</summary>
        public static async Task<System.Boolean> LoginAsync<TClient>(this TClient client, System.String account, System.String password, IInvokeOption invokeOption = default) where TClient :
        TouchSocket.Rpc.IRpcClient
        {
            object[] parameters = new object[] { account, password };
            return (System.Boolean)await client.InvokeAsync(typeof(System.Boolean), "Login", invokeOption, parameters);
        }
    }
}
使用代理扩展直接调用。
var client = new TcpDmtpClient();
await client.SetupAsync(new TouchSocketConfig()
    .SetRemoteIPHost("127.0.0.1:7789")
    .ConfigurePlugins(a =>
    {
        a.UseDmtpRpc();
    })
    .SetDmtpOption(new DmtpOption()
    {
        VerifyToken = "Rpc"//连接验证口令。
    }));
await client.ConnectAsync();
bool result = client.GetDmtpRpcActor().Login("123", "abc", InvokeOption.WaitInvoke);//Login是生成的代理扩展方法。可能需要额外添加命名空间。
client.GetDmtpRpcActor()的操作,内部还需要执行字典的查询操作。所以,如果为效率考虑的话,在连接稳定的前提下,可以保存好client.GetDmtpRpcActor()的返回值对象,直接执行Rpc操作。但是需要的注意的是,一旦重新连接,则该对象也需要重新获取。
三、反向Rpc
一般的rpc服务都是客户端发起,服务器响应。但是有时候也需要服务器发起,客户端响应,所以需要反向rpc。
3.1 定义、发布反向Rpc服务
实际上,Dmtp的全称(Duplex Message Transport Protocol双工消息传输协议),Duplex意为双工,则表明,当Dmtp客户端连接到服务以后,拥有与服务器同等的通讯权限与功能。所以客户端发布Rpc服务的步骤和服务器完全一致。即:当客户端和服务器建立连接以后,就不再区分谁是客户端,谁是服务器了。只关心,谁能提供服务,谁在调用服务。
下列就以简单的示例下,由客户端声明服务,服务器调用服务。
具体步骤:
- 在客户端项目中定义Rpc服务,名为
ReverseCallbackServer。 - 用DmtpRpc标记需要公开的公共方法。
 
public partial class ReverseCallbackServer : RpcServer
{
    [DmtpRpc(MethodInvoke = true)]//使用方法名作为调用键
    public string SayHello(string name)
    {
        return $"{name},hi";
    }
}
【客户端注册发布服务】
var client = new TcpDmtpClient();
await client.SetupAsync(new TouchSocketConfig()
    .ConfigureContainer(a =>
    {
        a.AddConsoleLogger();
    })
    .ConfigureContainer(a => 
    {
        a.AddRpcStore(store =>
        {
            store.RegisterServer<ReverseCallbackServer>();
        });
    })
    .ConfigurePlugins(a =>
    {
        a.UseDmtpRpc();
    })
    .SetRemoteIPHost("127.0.0.1:7789")
    .SetDmtpOption(new DmtpOption()
    {
        VerifyToken = "Rpc"//连接验证口令。
    }));
await client.ConnectAsync();
client.Logger.Info($"连接成功,Id={client.Id}");
3.2 调用反向Rpc
服务器回调客户端,最终必须通过服务器辅助类客户端(ITcpSessionClient的派生类),以TcpDmtpService为例,其辅助客户端为TcpDmtpSessionClient(或其接口:ITcpDmtpSessionClient)。
下列示例以TcpDmtpSessionClient为例,其余一致。
3.2.1 通过服务器直接获取
可以获取所有TcpDmtpSessionClient,进行广播式调用。
foreach (var item in service.GetClients())
{
    item.GetDmtpRpcActor().InvokeT<string>("SayHello", InvokeOption.WaitInvoke, "张三");
}
也可以先筛选Id,然后再调用。
var id = service.GetIds().FirstOrDefault(a => a.Equals("特定id"));
if (service.TryGetClient(id, out var SessionClient))
{
    SessionClient.GetDmtpRpcActor().InvokeT<string>("SayHello", InvokeOption.WaitInvoke, "张三");
}
3.2.2 通过调用上下文获取
例如:下列声明在服务器端的Rpc服务MyRpcServer,使其使用瞬时服务(也可以通过函数注入服务)。
上下文的Caller,即为服务器辅助类终端,进行强转即可。
使用该方式可以实现,当客户端调用服务器的Add接口的时候,服务器又回调客户端的SayHello接口。
partial class MyRpcServer : TransientRpcServer
{
    [DmtpRpc(MethodInvoke = true)]//使用函数名直接调用
    public int Add(int a, int b)
    {
        if (this.CallContext.Caller is ITcpDmtpSessionClient SessionClient)
        {
            SessionClient.GetDmtpRpcActor().InvokeT<string>("SayHello",InvokeOption.WaitInvoke,"张三");
        }
        int sum = a + b;
        return sum;
    }
}
反向Rpc也可以使用代理调用。所有用法和常规Rpc一致。
四、客户端互Call Rpc
除了Rpc,反向Rpc,DmtpRpc还支持客户端之间互Call Rpc。服务的定义与Rpc一样。
4.1 互Call RPC
客户端1调用客户端2的方法,需要知道对方的Id。然后和调用Rpc方法一致。然后使用下列函数调用即可。
var client1 = GetTcpDmtpClient();
var client2 = GetTcpDmtpClient();
client1.GetDmtpRpcActor().InvokeT<bool>(client2.Id,"Notice",InvokeOption.WaitInvoke,"Hello");
亦或者
var targetRpcClient = client1.CreateTargetDmtpRpcActor(client2.Id);
targetRpcClient.InvokeT<bool>("Notice", InvokeOption.WaitInvoke, "Hello");
使用上述的CreateTargetDmtpRpcActor(),获取到的targetRpcClient也能使用代理调用Rpc。
互Call Rpc也支持调用上下文。
客户端互Call的时候,每个请求,都需要服务支持路由,且同意路由,才可以被转发。所以服务器需要配置路由策略和添加允许转发的插件。
配置路由。
.ConfigureContainer(a =>
{
    a.AddDmtpRouteService();
    a.AddConsoleLogger();
})
同意转发路由数据。
internal class MyPlugin : PluginBase,IDmtpRoutingPlugin
{
    public async Task OnDmtpRouting(IDmtpActorObject client, PackageRouterEventArgs e)
    {
        if (e.RouterType == RouteType.Rpc)
        {
            e.IsPermitOperation = true;
            return;
        }
       await e.InvokeNext();
    }
}
五、调用配置
DmtpRpc支持单次调用配置。单次调用配置,就是在每次调用的时候,使用新建DmtpInvokeOption对象,然后在Invoke时,传入invokeOption即可。
其中详细介绍如下:
5.1 FeedbackType
FeedbackType是调用反馈类型,其枚举值分别有OnlySend、WaitSend、WaitInvoke。
OnlySend意为只发送Rpc请求,不进行任何等待。这在通知类调用时是非常快速的。WaitSend意为发送Rpc请求,并等待接收结果。即,返回时,仅表示对方收到了Rpc请求,但是具体执行如何,则不可知。这一般在不可靠协议中是有用的。WaitInvoke意为发送Rpc请求,并等待执行结果。即,返回时,表示对方已经执行了Rpc请求,如果有执行返回值,则携带返回值。如果执行过程发生异常,则会将异常返回。
5.2 SerializationType
SerializationType是序列化类型,其枚举值有FastBinary、Json、Xml、SystemBinary。其特点如下:
| FastBinary | Json | Xml | SystemBinary | 
|---|---|---|---|
| 序列化方式速度快,数据量小,但是兼容的数据格式也比较有限。仅支持基础类型、自定义实体类、数组、List、字典 | 兼容性好,可读性强,但是受字符串影响,性能不出众,且数据量受限制 | 兼容性一般,可读性强,同样受字符串影响,性能不出众,且数据量受限制 | 序列化速度快。但是兼容性低。且要求类必须一致,不然需要重新指定图根。 | 
5.2.1 配置默认序列化选择器
默认序列化选择器在初始化时,可以配置相关属性。例如:
- FastSerializerContext:快速序列化上下文属性
 - JsonSerializerSettings:Json序列化设置属性
 - SerializationBinder:系统二进制序列化绑定器
 
例如:
.ConfigurePlugins(a =>
{
   a.UseDmtpRpc()
       .SetSerializationSelector(new DefaultSerializationSelector()
       {
           //仅示例,实际使用时,请赋值有效值
           FastSerializerContext = default,
           JsonSerializerSettings = default,
           SerializationBinder = default,
       });
})
5.2.2 自定义序列化
自定义序列化
Dmtp除了上述的4中内置序列化,还支持自定义序列化。
首先,新建一个类,实现ISerializationConverter接口,然后实现相关方法。
下列将使用MemoryPack 序列化为例。
public class MemoryPackSerializationSelector : ISerializationSelector
{
    public object DeserializeParameter<TByteBlock>(ref TByteBlock byteBlock, SerializationType serializationType, Type parameterType) where TByteBlock : IByteBlock
    {
        var len = byteBlock.ReadInt32();
        var span = byteBlock.ReadToSpan(len);
        return MemoryPackSerializer.Deserialize(parameterType, span);
    }
    public void SerializeParameter<TByteBlock>(ref TByteBlock byteBlock, SerializationType serializationType, in object parameter) where TByteBlock : IByteBlock
    {
        var pos = byteBlock.Position;
        byteBlock.Seek(4, SeekOrigin.Current);
        var memoryPackWriter = new MemoryPackWriter<TByteBlock>(ref byteBlock, null);
        MemoryPackSerializer.Serialize(parameter.GetType(), ref memoryPackWriter, parameter);
        var newPos = byteBlock.Position;
        byteBlock.Position = pos;
        byteBlock.WriteInt32(memoryPackWriter.WrittenCount);
        byteBlock.Position = newPos;
    }
}
然后配置序列化器
.ConfigurePlugins(a =>
{
    a.UseDmtpRpc()
    .SetSerializationSelector(new MemoryPackSerializationSelector());
})
序列化器的配置,必须是调用端和响应端相同的配置。不然序列化不统一,则无法进行反序列化。
最后就是使用序列化。
在上述代码中,我们并没有判断SerializationType,所以在调用时无需特指,它都会以MemoryPack序列化工作。
但有时候我们希望能保留内置序列化类型,所以可以参考内置序列化选择器,然后把新加的序列化再做一次判断。
例如:
internal sealed class DefaultSerializationSelector : ISerializationSelector
{
    /// <summary>
    /// 根据指定的序列化类型反序列化字节块中的数据。
    /// </summary>
    /// <param name="byteBlock">包含序列化数据的字节块。</param>
    /// <param name="serializationType">指定的序列化类型。</param>
    /// <param name="parameterType">预期反序列化出的对象类型。</param>
    /// <returns>反序列化后的对象。</returns>
    /// <exception cref="RpcException">抛出当未识别序列化类型时。</exception>
    public object DeserializeParameter<TByteBlock>(ref TByteBlock byteBlock, SerializationType serializationType, Type parameterType) where TByteBlock : IByteBlock
    {
        // 根据序列化类型选择不同的反序列化方式
        switch (serializationType)
        {
            case SerializationType.FastBinary:
                // 使用FastBinary格式进行反序列化
                return FastBinaryFormatter.Deserialize(ref byteBlock, parameterType);
            case SerializationType.SystemBinary:
                // 检查字节块是否为null
                if (byteBlock.ReadIsNull())
                {
                    // 如果为null,则返回该类型的默认值
                    return parameterType.GetDefault();
                }
                // 使用SystemBinary格式进行反序列化
                using (var block = byteBlock.ReadByteBlock())
                {
                    // 将字节块转换为流并进行反序列化
                    return SerializeConvert.BinaryDeserialize(block.AsStream());
                }
            case SerializationType.Json:
                // 检查字节块是否为null
                if (byteBlock.ReadIsNull())
                {
                    // 如果为null,则返回该类型的默认值
                    return parameterType.GetDefault();
                }
                // 使用Json格式进行反序列化
                return JsonConvert.DeserializeObject(byteBlock.ReadString(), parameterType);
            case SerializationType.Xml:
                // 检查字节块是否为null
                if (byteBlock.ReadIsNull())
                {
                    // 如果为null,则返回该类型的默认值
                    return parameterType.GetDefault();
                }
                // 使用Xml格式进行反序列化
                return SerializeConvert.XmlDeserializeFromBytes(byteBlock.ReadBytesPackage(), parameterType);
            case (SerializationType)4:
                {
                    var Length = byteBlock.ReadInt32();
                    var span = byteBlock.ReadToSpan(Length);
                    return MemoryPackSerializer.Deserialize(parameterType, span);
                }
            default:
                // 如果序列化类型未识别,则抛出异常
                throw new RpcException("未指定的反序列化方式");
        }
    }
    /// <summary>
    /// 序列化参数
    /// </summary>
    /// <param name="byteBlock">字节块引用,用于存储序列化后的数据</param>
    /// <param name="serializationType">序列化类型,决定了使用哪种方式序列化</param>
    /// <param name="parameter">待序列化的参数对象</param>
    /// <typeparam name="TByteBlock">字节块类型,必须实现IByteBlock接口</typeparam>
    public void SerializeParameter<TByteBlock>(ref TByteBlock byteBlock, SerializationType serializationType, in object parameter) where TByteBlock : IByteBlock
    {
        // 根据序列化类型选择不同的序列化方法
        switch (serializationType)
        {
            case SerializationType.FastBinary:
                {
                    // 使用FastBinaryFormatter进行序列化
                    FastBinaryFormatter.Serialize(ref byteBlock, parameter);
                    break;
                }
            case SerializationType.SystemBinary:
                {
                    // 参数为null时,写入空值标记
                    if (parameter is null)
                    {
                        byteBlock.WriteNull();
                    }
                    else
                    {
                        // 参数不为null时,标记并序列化参数
                        byteBlock.WriteNotNull();
                        using (var block = new ByteBlock(1024 * 64))
                        {
                            // 使用System.Runtime.Serialization.BinaryFormatter进行序列化
                            SerializeConvert.BinarySerialize(block.AsStream(), parameter);
                            // 将序列化后的字节块写入byteBlock
                            byteBlock.WriteByteBlock(block);
                        }
                    }
                    break;
                }
            case SerializationType.Json:
                {
                    // 参数为null时,写入空值标记
                    if (parameter is null)
                    {
                        byteBlock.WriteNull();
                    }
                    else
                    {
                        // 参数不为null时,标记并转换为JSON字符串
                        byteBlock.WriteNotNull();
                        byteBlock.WriteString(JsonConvert.SerializeObject(parameter));
                    }
                    break;
                }
            case SerializationType.Xml:
                {
                    // 参数为null时,写入空值标记
                    if (parameter is null)
                    {
                        byteBlock.WriteNull();
                    }
                    else
                    {
                        // 参数不为null时,标记并转换为Xml字节
                        byteBlock.WriteNotNull();
                        byteBlock.WriteBytesPackage(SerializeConvert.XmlSerializeToBytes(parameter));
                    }
                    break;
                }
            case (SerializationType)4:
                {
                    var pos = byteBlock.Position;
                    byteBlock.Seek(4, SeekOrigin.Current);
                    var memoryPackWriter = new MemoryPackWriter<TByteBlock>(ref byteBlock, null);
                    MemoryPackSerializer.Serialize(parameter.GetType(), ref memoryPackWriter, parameter);
                    var newPos = byteBlock.Position;
                    byteBlock.Position = pos;
                    byteBlock.WriteInt32(memoryPackWriter.WrittenCount);
                    byteBlock.Position = newPos;
                    break;
                }
            default:
                // 抛出异常,提示未指定的序列化方式
                throw new RpcException("未指定的序列化方式");
        }
    }
}
最后在使用时,因为序列化类型是枚举值,所以使用时需要强制转换一下。
var invokeOption = new DmtpInvokeOption()//调用配置
{
    FeedbackType = FeedbackType.WaitInvoke,//调用反馈类型
    SerializationType = (SerializationType)4,//序列化类型
    Timeout = 5000,//调用超时设置
    Token = tokenSource.Token//配置可取消令箭
};
5.3 Timeout
Timeout是超时时间,单位是毫秒。
5.4 CancellationToken
CancellationToken是取消令箭源,可用于取消Rpc的调用。
取消调用时,其取消消息可以传递到被调用端,所以,被调用端可以通过ICallContext(调用上下文)获取到Token。
var client = GetTcpDmtpClient();
//设置调用配置
var tokenSource = new CancellationTokenSource();//可取消令箭源,可用于取消Rpc的调用
var invokeOption = new DmtpInvokeOption()//调用配置
{
    FeedbackType = FeedbackType.WaitInvoke,//调用反馈类型
    SerializationType = SerializationType.FastBinary,//序列化类型
    Timeout = 5000,//调用超时设置
    Token = tokenSource.Token//配置可取消令箭
};
var sum = client.GetDmtpRpcActor().InvokeT<int>("Add", invokeOption, 10, 20);
client.Logger.Info($"调用Add方法成功,结果:{sum}");
5.5 Metadata 元数据
Metadata是字符串键值对,其作用类似http的headers,用于传递一些附加信息。
在请求时可以通过DmtpInvokeOption进行传参。
var invokeOption = new DmtpInvokeOption()//调用配置
{
    FeedbackType = FeedbackType.WaitInvoke,//调用反馈类型
    SerializationType = SerializationType.FastBinary,//序列化类型
    Timeout = 5000,//调用超时设置
    Metadata=new Metadata(){{"a","a"}}
};
var metadata = client.GetDmtpRpcActor().InvokeT<Metadata>("CallContextMetadata", invokeOption);
在接收可以通过IDmtpCallContext(调用上下文)获取到Metadata。
[DmtpRpc(MethodInvoke = true)]
public Metadata CallContextMetadata(IDmtpRpcCallContext callContext)
{
    return callContext.Metadata;
}
六、Rpc大数据传输
在Rpc中,并没有对传输的数据做限制,但是因为Rpc默认使用的固定包头适配器中,默认设置的可传递数据为10Mb,所以在Rpc中,用户可一次性传递的数据包大约为9.9Mb。所以,如果用户传递超出阈值的数据,适配器则会触发异常,而无法接收。但在实际上Rpc的使用中,大数据的传输也是很重要的一个环节,所以在此做了大数据的传输思路建议,希望能有效解决大家的麻烦。
6.1 设置适配器参数(推荐指数:⭐)
操作原理:在固定包头适配器中,默认限制了单次可发送数据包的最大值,所以可以修改此值实现目的。
该方法简单粗暴,能够解决一定程度的大数据问题,但并不建议这么做。
TouchSocketConfig config = new TouchSocketConfig()//配置
                .SetMaxPackageSize(1024 * 1024 * 10)
客户端必须同样设置。
6.2 Rpc嵌套Channel(推荐指数:⭐⭐⭐⭐⭐)
操作原理:先利用Rpc让客户端与服务器约定特定的Channel,然后后续数据通过Channel传递,最后由Rpc返回结果。
6.2.1 请求流数据
【Service端】
/// <summary>
/// 测试客户端请求,服务器响应大量流数据
/// </summary>
/// <param name="callContext"></param>
/// <param name="channelID"></param>
[Description("测试客户端请求,服务器响应大量流数据")]
[DmtpRpc]
public int RpcPullChannel(ICallContext callContext, int channelID)
{
    var size = 0;
    var package = 1024 * 64;
    if (callContext.Caller is TcpDmtpSessionClient SessionClient)
    {
        if (SessionClient.TrySubscribeChannel(channelID, out var channel))
        {
            for (var i = 0; i < 10; i++)
            {
                size += package;
                channel.Write(new byte[package]);
            }
            channel.Complete();//必须调用指令函数,如Complete,Cancel,Dispose
        }
    }
    return size;
}
【Client端】
using var client = GetTcpDmtpClient();
ChannelStatus status = ChannelStatus.Default;
int size = 0;
var channel = client.CreateChannel();//创建通道
Task task = Task.Run(() =>//这里必须用异步
{
    using (channel)
    {
        foreach (var currentByteBlock in channel)
        {
            size += currentByteBlock.Length;//此处可以处理传递来的流数据
        }
        status = channel.Status;//最后状态
    }
});
int result = client.GetDmtpRpcActor().RpcPullChannel(channel.Id);//RpcPullChannel是代理方法,此处会阻塞至服务器全部发送完成。
await task;//等待异步接收完成
Console.WriteLine($"状态:{status},size={size}");
6.2.2 推送流数据
【Service端】
/// <summary>
/// "测试推送"
/// </summary>
/// <param name="callContext"></param>
/// <param name="channelID"></param>
[Description("测试客户端推送流数据")]
[DmtpRpc]
public int RpcPushChannel(ICallContext callContext, int channelID)
{
    int size = 0;
    if (callContext.Caller is TcpDmtpSessionClient SessionClient)
    {
        if (SessionClient.TrySubscribeChannel(channelID, out var channel))
        {
            foreach (var item in channel)
            {
                size += item.Length;//此处处理流数据
            }
        }
    }
    return size;
}
【Client端】
using var client = GetTcpDmtpClient();
ChannelStatus status = ChannelStatus.Default;
int size = 0;
int package = 1024;
var channel = client.CreateChannel();//创建通道
Task task = Task.Run(() =>//这里必须用异步
{
    for (int i = 0; i < 1024; i++)
    {
        size += package;
        channel.Write(new byte[package]);
    }
    channel.Complete();//必须调用指令函数,如Complete,Cancel,Dispose
    status = channel.Status;
});
int result = client.GetDmtpRpcActor().RpcPushChannel(channel.Id);//RpcPushChannel是代理方法,此处会阻塞至服务器全部完成。
await task;//等待异步接收完成
Console.WriteLine($"状态:{status},result={result}");
七、限制代理接口
默认情况下,代理生成的接口,是面向IRpcClient的,即:面向所有Rpc终端。但是有时候我们希望不同的终端,只能调用不同的方法。甚至有时候希望,不同的终端可以重载方法。所以如果不对生成的接口做限制,就可能发生下图问题。
关于代理代码生成接口限制,请看服务代理生成或者源服务代理生成
接下来就讲讲客户端如何实现接口限制。
首先按需求,声明多个继承IDmtpRpcActor的接口,此处有IRpcClient1与IRpcClient2两个。
然后新建类,命名为MyDmtpRpcActor,继承DmtpRpcActor,然后分别实现IRpcClient1与IRpcClient2两个接口。
interface IRpcClient1:IDmtpRpcActor
{
}
interface IRpcClient2 : IDmtpRpcActor
{
}
class MyDmtpRpcActor : DmtpRpcActor, IRpcClient1, IRpcClient2
{
    public MyDmtpRpcActor(IDmtpActor smtpActor) : base(smtpActor)
    {
    }
}
然后在UseDmtpRpc时,设置SetCreateDmtpRpcActor,这样获得的实际实例则会是MyDmtpRpcActor类型。
var client = new TcpDmtpClient();
await client.SetupAsync(new TouchSocketConfig()
    .ConfigurePlugins(a =>
    {
        a.UseDmtpRpc()
        .SetCreateDmtpRpcActor((actor)=>new MyDmtpRpcActor(actor));
    })
    .SetRemoteIPHost("127.0.0.1:7789")
    .SetDmtpOption(new DmtpOption()
    {
        VerifyToken = "Rpc"//连接验证口令。
    }));
await client.ConnectAsync();
最后在获得RpcActor时,就可以按接口获取。然后配合服务器代码接口约束,就可以实现我们所期望的功能。
IRpcClient1 rpcClient1= client.GetDmtpRpcActor<IRpcClient1>();
IRpcClient2 rpcClient2= client.GetDmtpRpcActor<IRpcClient2>();