自定义DmtpActor
定义
一、说明
DmtpActor是Dmtp协议的Actor,负责Dmtp协议的交互。规范的Actor可以做到一次开发,多种协议适用,比如DmtpRpcActor、DmtpFileTransferActor等。所以学习Dmtp的Actor开发,是进阶使用Dmtp的必要途径。
二、使用Actor开发的优点
实际上,使用协议扩展即可扩展Dmtp的功能,但是使用Actor开发,则更具优点:
- 很好的分离了协议的交互逻辑和协议的实现逻辑。可以做到一次开发,多种协议适用。
 - 按照规范的开发,可以非常好的支持客户端到服务器、服务器到客户端、客户端到客户端的交互。
 - 更高等级的代码封装,让程序更易维护。
 
三、实现
Actor通讯的原理,就是当DmtpActor(主通讯器)收到DmtpMessage后,会依次投递给所有的Dmtp插件,当某个插件通过ProtocolFlag判断正确后,由本插件支持的Actor进行处理数据。
下面我们将通过实现一个最简单的Rpc调用来熟悉Actor通讯。
基本流程如下:
3.1 声明Actor
首先新建一个接口ISimpleDmtpRpcActor,继承IActor。然后新建一个类SimpleDmtpRpcActor,实现ISimpleDmtpRpcActor接口。
声明m_invoke_Request和m_invoke_Response两个变量,用于判断ProtocolFlags能否在当前Actor处理。
声明TryFindMethod委托,用于向外面插件通过方法名搜索Rpc方法。
interface ISimpleDmtpRpcActor : IActor
{
    void Invoke(string methodName);
    void Invoke(string targetId, string methodName);
}
class SimpleDmtpRpcActor : ISimpleDmtpRpcActor
{
    private ushort m_invoke_Request = 1000;
    private ushort m_invoke_Response = 1001;
    public IDmtpActor DmtpActor { get; private set; }
    public Func<string, MethodModel> TryFindMethod { get; set; }
    public SimpleDmtpRpcActor(IDmtpActor dmtpActor)
    {
        this.DmtpActor = dmtpActor;
    }
    public async Task<bool> InputReceivedData(DmtpMessage message)
    {
        var byteBlock = message.BodyByteBlock;
        if (message.ProtocolFlags == this.m_invoke_Request)
        {
            try
            {
                var rpcPackage = new SimpleDmtpRpcPackage();
                rpcPackage.UnpackageRouter(byteBlock);
                if (rpcPackage.Route && this.DmtpActor.AllowRoute)
                {
                    if (await this.DmtpActor.TryRoute(new PackageRouterEventArgs(new RouteType("SimpleRpc"), rpcPackage)))
                    {
                        if (await this.DmtpActor.TryFindDmtpActor(rpcPackage.TargetId) is DmtpActor actor)
                        {
                            actor.SendAsync(this.m_invoke_Request, byteBlock);
                            return true;
                        }
                        else
                        {
                            rpcPackage.Status = 2;
                        }
                    }
                    else
                    {
                        rpcPackage.Status = 3;
                    }
                    byteBlock.Reset();
                    rpcPackage.SwitchId();
                    rpcPackage.Package(byteBlock);
                    this.DmtpActor.SendAsync(this.m_invoke_Response, byteBlock);
                }
                else
                {
                    rpcPackage.UnpackageBody(byteBlock);
                    _ = Task.Factory.StartNew(this.InvokeThis, rpcPackage);
                }
            }
            catch (Exception ex)
            {
                this.DmtpActor.Logger.Error(this, $"在protocol={message.ProtocolFlags}中发生错误。信息:{ex.Message}");
            }
            return true;
        }
        else if (message.ProtocolFlags == this.m_invoke_Response)
        {
            try
            {
                var rpcPackage = new SimpleDmtpRpcPackage();
                rpcPackage.UnpackageRouter(byteBlock);
                if (this.DmtpActor.AllowRoute && rpcPackage.Route)
                {
                    if (await this.DmtpActor.TryFindDmtpActor(rpcPackage.TargetId) is DmtpActor actor)
                    {
                        actor.SendAsync(this.m_invoke_Response, byteBlock);
                    }
                }
                else
                {
                    rpcPackage.UnpackageBody(byteBlock);
                    this.DmtpActor.WaitHandlePool.SetRun(rpcPackage);
                }
            }
            catch (Exception ex)
            {
                this.DmtpActor.Logger.Error(this, $"在protocol={message.ProtocolFlags}中发生错误。信息:{ex.Message}");
            }
            return true;
        }
        return false;
    }
    private void InvokeThis(object obj)
    {
        var package = (SimpleDmtpRpcPackage)obj;
        var methodModel = this.TryFindMethod.Invoke(package.MethodName);
        if (methodModel == null)
        {
            using (var byteBlock = new ByteBlock(1024*64))
            {
                package.Status = 4;
                package.SwitchId();
                package.Package(byteBlock);
                this.DmtpActor.SendAsync(this.m_invoke_Response, byteBlock);
                return;
            }
        }
        try
        {
            methodModel.Method.Invoke(methodModel.Target, default);
            using (var byteBlock = new ByteBlock(1024*64))
            {
                package.Status = 1;
                package.SwitchId();
                package.Package(byteBlock);
                this.DmtpActor.SendAsync(this.m_invoke_Response, byteBlock);
                return;
            }
        }
        catch (Exception ex)
        {
            using (var byteBlock = new ByteBlock(1024*64))
            {
                package.Status = 5;
                package.Message = ex.Message;
                package.SwitchId();
                package.Package(byteBlock);
                this.DmtpActor.SendAsync(this.m_invoke_Response, byteBlock);
                return;
            }
        }
    }
    private async Task<SimpleDmtpRpcActor> TryFindDmtpRpcActor(string targetId)
    {
        if (targetId == this.DmtpActor.Id)
        {
            return this;
        }
        if (await this.DmtpActor.TryFindDmtpActor(targetId) is DmtpActor dmtpActor)
        {
            if (dmtpActor.GetSimpleDmtpRpcActor() is SimpleDmtpRpcActor newActor)
            {
                return newActor;
            }
        }
        return default;
    }
    public void Invoke(string methodName)
    {
        this.PrivateInvoke(default, methodName);
    }
    public async void Invoke(string targetId, string methodName)
    {
        if (string.IsNullOrEmpty(targetId))
        {
            throw new ArgumentException($"“{nameof(targetId)}”不能为 null 或空。", nameof(targetId));
        }
        if (string.IsNullOrEmpty(methodName))
        {
            throw new ArgumentException($"“{nameof(methodName)}”不能为 null 或空。", nameof(methodName));
        }
        if (this.DmtpActor.AllowRoute && await this.TryFindDmtpRpcActor(targetId) is SimpleDmtpRpcActor actor)
        {
            actor.Invoke(methodName);
            return;
        }
        this.PrivateInvoke(targetId, methodName);
    }
    private void PrivateInvoke(string id, string methodName)
    {
        var package = new SimpleDmtpRpcPackage()
        {
            MethodName = methodName,
            Route = id.HasValue(),
            SourceId = this.DmtpActor.Id,
            TargetId = id
        };
        var waitData = this.DmtpActor.WaitHandlePool.GetReverseWaitData(package);
        try
        {
            using (var byteBlock = new ByteBlock(1024*64))
            {
                package.Package(byteBlock);
                this.DmtpActor.SendAsync(this.m_invoke_Request, byteBlock);
            }
            switch (waitData.Wait(5000))
            {
                case WaitDataStatus.SetRunning:
                    var result = (SimpleDmtpRpcPackage)waitData.WaitResult;
                    result.CheckStatus();
                    return;
                case WaitDataStatus.Overtime:
                    throw new TimeoutException();
                case WaitDataStatus.Canceled:
                    break;
                case WaitDataStatus.Default:
                case WaitDataStatus.Disposed:
                default:
                    throw new Exception("未知异常");
            }
        }
        finally
        {
            this.DmtpActor.WaitHandlePool.Destroy(waitData);
        }
    }
}
额外模型类
MethodModel用于储存反射调用相关。
class MethodModel
{
    public MethodModel(MethodInfo method, object target)
    {
        this.Method = method;
        this.Target = target;
    }
    public MethodInfo Method { get; private set; }
    public object Target { get; private set; }
}
SimpleDmtpRpcPackage类,用于打包Rpc请求,关于打包的使用详情,可以看包序列化
class SimpleDmtpRpcPackage : WaitRouterPackage
{
    protected override bool IncludedRouter => true;
    public string MethodName { get; set; }
    public override void PackageBody(in ByteBlock byteBlock)
    {
        base.PackageBody(byteBlock);
        byteBlock.Write(this.MethodName);
    }
    public override void UnpackageBody(in ByteBlock byteBlock)
    {
        base.UnpackageBody(byteBlock);
        this.MethodName = byteBlock.ReadString();
    }
    public void CheckStatus()
    {
        switch (this.Status)
        {
            case 0:
                throw new TimeoutException();
            case 1: return;
            case 2: throw new Exception("没有找到目标Id");
            case 3: throw new Exception("不允许路由");
            case 4: throw new Exception("没找到Rpc");
            case 5: throw new Exception($"其他异常:{this.Message}");
        }
    }
}
3.2 声明扩展
扩展方法是为IPluginsManager、IDmtpActor、IDmtpActorObject等提供扩展功能的静态方法。
static class SimpleDmtpRpcExtension
{
    #region DependencyProperty
    /// <summary>
    /// SimpleDmtpRpcActor
    /// </summary>
    public static readonly DependencyProperty<ISimpleDmtpRpcActor> SimpleDmtpRpcActorProperty =
        DependencyProperty<ISimpleDmtpRpcActor>.Register("SimpleDmtpRpcActor", default);
    #endregion DependencyProperty
    #region 插件扩展
    /// <summary>
    /// 使用SimpleDmtpRpc插件
    /// </summary>
    /// <param name="pluginsManager"></param>
    /// <returns></returns>
    public static SimpleDmtpRpcFeature UseSimpleDmtpRpc(this IPluginsManager pluginsManager)
    {
        return pluginsManager.Add<SimpleDmtpRpcFeature>();
    }
    #endregion 插件扩展
    /// <summary>
    /// 从<see cref="DmtpActor"/>中获取<see cref="ISimpleDmtpRpcActor"/>
    /// </summary>
    /// <param name="smtpActor"></param>
    /// <returns></returns>
    public static ISimpleDmtpRpcActor GetSimpleDmtpRpcActor(this IDmtpActor smtpActor)
    {
        return smtpActor.GetValue(SimpleDmtpRpcActorProperty);
    }
    /// <summary>
    /// 从<see cref="IDmtpActorObject"/>中获取<see cref="ISimpleDmtpRpcActor"/>,以实现Rpc调用功能。
    /// </summary>
    /// <param name="client"></param>
    /// <returns></returns>
    /// <exception cref="ArgumentNullException"></exception>
    public static ISimpleDmtpRpcActor GetSimpleDmtpRpcActor(this IDmtpActorObject client)
    {
        var smtpRpcActor = client.DmtpActor.GetSimpleDmtpRpcActor();
        if (smtpRpcActor is null)
        {
            throw new ArgumentNullException(nameof(smtpRpcActor), "SimpleRpcAcotr为空,请检查是否已启用UseSimpleDmtpRpc");
        }
        return smtpRpcActor;
    }
    /// <summary>
    /// 向<see cref="DmtpActor"/>中设置<see cref="ISimpleDmtpRpcActor"/>
    /// </summary>
    /// <param name="smtpActor"></param>
    /// <param name="smtpRpcActor"></param>
    internal static void SetSimpleDmtpRpcActor(this IDmtpActor smtpActor, ISimpleDmtpRpcActor smtpRpcActor)
    {
        smtpActor.SetValue(SimpleDmtpRpcActorProperty, smtpRpcActor);
    }
}
3.3 声明功能插件
功能插件是为Actor提供创建时机,和必要的配置信息。如果有需要,还可以抛出插件信息(例如文件传输 )。
class SimpleDmtpRpcFeature : PluginBase, IDmtpConnectingPlugin, IDmtpReceivedPlugin
{
    readonly Dictionary<string, MethodModel> m_pairs = new Dictionary<string, MethodModel>();
    public async Task OnDmtpConnecting(IDmtpActorObject client, DmtpVerifyEventArgs e)
    {
        var actor = new SimpleDmtpRpcActor(client.DmtpActor)
        {
            TryFindMethod = this.TryFindMethod
        };
        client.DmtpActor.SetSimpleDmtpRpcActor(actor);
        await e.InvokeNext();
    }
    private MethodModel TryFindMethod(string methodName)
    {
        if (this.m_pairs.TryGetValue(methodName, out var methodModel))
        {
            return methodModel;
        }
        return default;
    }
    public void RegisterRpc(object server)
    {
        if (server is null)
        {
            throw new ArgumentNullException(nameof(server));
        }
        foreach (var item in server.GetType().GetMethods(BindingFlags.Default | BindingFlags.Instance | BindingFlags.Public))
        {
            m_pairs.Add(item.Name, new MethodModel(item, server));
        }
    }
    public async Task OnDmtpReceived(IDmtpActorObject client, DmtpMessageEventArgs e)
    {
        if (client.DmtpActor.GetSimpleDmtpRpcActor() is SimpleDmtpRpcActor actor)
        {
            if (await actor.InputReceivedData(e.DmtpMessage))
            {
                
                return;
            }
        }
        await e.InvokeNext();
    }
}
四、使用
使用非常简单,基本和现有的功能插件类似。
【服务器】
private static TcpDmtpService GetTcpDmtpService()
{
    var service = new TcpDmtpService();
    var config = new TouchSocketConfig()//配置
           .SetListenIPHosts(new IPHost[] { new IPHost(7789) })
           .ConfigureContainer(a =>
           {
               a.AddConsoleLogger();
               a.AddDmtpRouteService();//添加路由策略
           })
           .ConfigurePlugins(a =>
           {
               a.UseSimpleDmtpRpc()
               .RegisterRpc(new MyServer());
           })
           .SetDmtpOption(new DmtpOption()
           {
                VerifyToken = "Dmtp"//连接验证口令。
           });
    await service.SetupAsync(config);
    await service.StartAsync();
    service.Logger.Info("服务器成功启动");
    return service;
}
【客户端】
private static TcpDmtpClient GetTcpDmtpClient()
{
    var client = new TouchSocketConfig()
           .SetRemoteIPHost("127.0.0.1:7789")
           .SetDmtpOption(new DmtpOption()
           {
                VerifyToken = "Dmtp"//连接验证口令。
           })
           .ConfigureContainer(a =>
           {
               a.AddConsoleLogger();
           })
           .ConfigurePlugins(a =>
           {
               a.UseSimpleDmtpRpc();
               a.UseDmtpHeartbeat()//使用Dmtp心跳
               .SetTick(TimeSpan.FromSeconds(3))
               .SetMaxFailCount(3);
           })
           .BuildWithTcpDmtpClient();
    client.Logger.Info("连接成功");
    return client;
}
static void Main(string[] args)
{
    var service = GetTcpDmtpService();
    var client = GetTcpDmtpClient();
    while (true)
    {
        string methodName = Console.ReadLine();
        var actor = client.GetSimpleDmtpRpcActor();
        try
        {
            actor.Invoke(methodName);
            Console.WriteLine("调用成功");
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
    }
}