跳到主要内容
版本:2.0.0

自定义DmtpActor

定义

命名空间:TouchSocket.Dmtp
程序集:TouchSocket.Dmtp.dll

一、说明

DmtpActorDmtp协议的Actor,负责Dmtp协议的交互。规范的Actor可以做到一次开发,多种协议适用,比如DmtpRpcActorDmtpFileTransferActor等。所以学习Dmtp的Actor开发,是进阶使用Dmtp的必要途径。

二、使用Actor开发的优点

实际上,使用协议扩展即可扩展Dmtp的功能,但是使用Actor开发,则更具优点:

  1. 很好的分离了协议的交互逻辑和协议的实现逻辑。可以做到一次开发,多种协议适用。
  2. 按照规范的开发,可以非常好的支持客户端到服务器、服务器到客户端、客户端到客户端的交互。
  3. 更高等级的代码封装,让程序更易维护。

三、实现

Actor通讯的原理,就是当DmtpActor(主通讯器)收到DmtpMessage后,会依次投递给所有的Dmtp插件,当某个插件通过ProtocolFlag判断正确后,由本插件支持的Actor进行处理数据。

下面我们将通过实现一个最简单的Rpc调用来熟悉Actor通讯。

基本流程如下:

3.1 声明Actor

首先新建一个接口ISimpleDmtpRpcActor,继承IActor。然后新建一个类SimpleDmtpRpcActor,实现ISimpleDmtpRpcActor接口。

声明m_invoke_Requestm_invoke_Response两个变量,用于判断ProtocolFlags能否在当前Actor处理。

声明TryFindMethod委托,用于向外面插件通过方法名搜索Rpc方法。

interface ISimpleDmtpRpcActor : IActor
{
void Invoke(string methodName);
void Invoke(string targetId, string methodName);
}

class SimpleDmtpRpcActor : ISimpleDmtpRpcActor
{
private ushort m_invoke_Request = 1000;
private ushort m_invoke_Response = 1001;

public IDmtpActor DmtpActor { get; private set; }
public Func<string, MethodModel> TryFindMethod { get; set; }

public SimpleDmtpRpcActor(IDmtpActor dmtpActor)
{
this.DmtpActor = dmtpActor;
}

public async Task<bool> InputReceivedData(DmtpMessage message)
{
var byteBlock = message.BodyByteBlock;
if (message.ProtocolFlags == this.m_invoke_Request)
{
try
{
var rpcPackage = new SimpleDmtpRpcPackage();
rpcPackage.UnpackageRouter(byteBlock);
if (rpcPackage.Route && this.DmtpActor.AllowRoute)
{
if (await this.DmtpActor.TryRoute(new PackageRouterEventArgs(new RouteType("SimpleRpc"), rpcPackage)))
{
if (await this.DmtpActor.TryFindDmtpActor(rpcPackage.TargetId) is DmtpActor actor)
{
actor.Send(this.m_invoke_Request, byteBlock);
return true;
}
else
{
rpcPackage.Status = 2;
}
}
else
{
rpcPackage.Status = 3;
}

byteBlock.Reset();
rpcPackage.SwitchId();

rpcPackage.Package(byteBlock);
this.DmtpActor.Send(this.m_invoke_Response, byteBlock);
}
else
{
rpcPackage.UnpackageBody(byteBlock);
_ = Task.Factory.StartNew(this.InvokeThis, rpcPackage);
}
}
catch (Exception ex)
{
this.DmtpActor.Logger.Error(this, $"在protocol={message.ProtocolFlags}中发生错误。信息:{ex.Message}");
}
return true;
}
else if (message.ProtocolFlags == this.m_invoke_Response)
{
try
{
var rpcPackage = new SimpleDmtpRpcPackage();
rpcPackage.UnpackageRouter(byteBlock);
if (this.DmtpActor.AllowRoute && rpcPackage.Route)
{
if (await this.DmtpActor.TryFindDmtpActor(rpcPackage.TargetId) is DmtpActor actor)
{
actor.Send(this.m_invoke_Response, byteBlock);
}
}
else
{
rpcPackage.UnpackageBody(byteBlock);
this.DmtpActor.WaitHandlePool.SetRun(rpcPackage);
}
}
catch (Exception ex)
{
this.DmtpActor.Logger.Error(this, $"在protocol={message.ProtocolFlags}中发生错误。信息:{ex.Message}");
}
return true;
}
return false;
}

private void InvokeThis(object obj)
{
var package = (SimpleDmtpRpcPackage)obj;

var methodModel = this.TryFindMethod.Invoke(package.MethodName);
if (methodModel == null)
{
using (var byteBlock = new ByteBlock())
{
package.Status = 4;
package.SwitchId();
package.Package(byteBlock);
this.DmtpActor.Send(this.m_invoke_Response, byteBlock);
return;
}
}

try
{
methodModel.Method.Invoke(methodModel.Target, default);
using (var byteBlock = new ByteBlock())
{
package.Status = 1;
package.SwitchId();
package.Package(byteBlock);
this.DmtpActor.Send(this.m_invoke_Response, byteBlock);
return;
}
}
catch (Exception ex)
{
using (var byteBlock = new ByteBlock())
{
package.Status = 5;
package.Message = ex.Message;
package.SwitchId();
package.Package(byteBlock);
this.DmtpActor.Send(this.m_invoke_Response, byteBlock);
return;
}
}
}

private async Task<SimpleDmtpRpcActor> TryFindDmtpRpcActor(string targetId)
{
if (targetId == this.DmtpActor.Id)
{
return this;
}
if (await this.DmtpActor.TryFindDmtpActor(targetId) is DmtpActor dmtpActor)
{
if (dmtpActor.GetSimpleDmtpRpcActor() is SimpleDmtpRpcActor newActor)
{
return newActor;
}
}
return default;
}

public void Invoke(string methodName)
{
this.PrivateInvoke(default, methodName);
}
public async void Invoke(string targetId, string methodName)
{
if (string.IsNullOrEmpty(targetId))
{
throw new ArgumentException($"“{nameof(targetId)}”不能为 null 或空。", nameof(targetId));
}

if (string.IsNullOrEmpty(methodName))
{
throw new ArgumentException($"“{nameof(methodName)}”不能为 null 或空。", nameof(methodName));
}

if (this.DmtpActor.AllowRoute && await this.TryFindDmtpRpcActor(targetId) is SimpleDmtpRpcActor actor)
{
actor.Invoke(methodName);
return;
}

this.PrivateInvoke(targetId, methodName);
}

private void PrivateInvoke(string id, string methodName)
{
var package = new SimpleDmtpRpcPackage()
{
MethodName = methodName,
Route = id.HasValue(),
SourceId = this.DmtpActor.Id,
TargetId = id
};

var waitData = this.DmtpActor.WaitHandlePool.GetReverseWaitData(package);

try
{
using (var byteBlock = new ByteBlock())
{
package.Package(byteBlock);
this.DmtpActor.Send(this.m_invoke_Request, byteBlock);
}
switch (waitData.Wait(5000))
{
case WaitDataStatus.SetRunning:
var result = (SimpleDmtpRpcPackage)waitData.WaitResult;
result.CheckStatus();
return;
case WaitDataStatus.Overtime:
throw new TimeoutException();
case WaitDataStatus.Canceled:
break;
case WaitDataStatus.Default:
case WaitDataStatus.Disposed:
default:
throw new Exception("未知异常");
}
}
finally
{
this.DmtpActor.WaitHandlePool.Destroy(waitData);
}
}
}

额外模型类

MethodModel用于储存反射调用相关。

class MethodModel
{
public MethodModel(MethodInfo method, object target)
{
this.Method = method;
this.Target = target;
}

public MethodInfo Method { get; private set; }
public object Target { get; private set; }
}

SimpleDmtpRpcPackage类,用于打包Rpc请求,关于打包的使用详情,可以看包序列化

class SimpleDmtpRpcPackage : WaitRouterPackage
{
protected override bool IncludedRouter => true;

public string MethodName { get; set; }

public override void PackageBody(in ByteBlock byteBlock)
{
base.PackageBody(byteBlock);
byteBlock.Write(this.MethodName);
}

public override void UnpackageBody(in ByteBlock byteBlock)
{
base.UnpackageBody(byteBlock);
this.MethodName = byteBlock.ReadString();
}

public void CheckStatus()
{
switch (this.Status)
{
case 0:
throw new TimeoutException();
case 1: return;
case 2: throw new Exception("没有找到目标Id");
case 3: throw new Exception("不允许路由");
case 4: throw new Exception("没找到Rpc");
case 5: throw new Exception($"其他异常:{this.Message}");
}
}
}

3.2 声明扩展

扩展方法是为IPluginsManagerIDmtpActorIDmtpActorObject等提供扩展功能的静态方法。

static class SimpleDmtpRpcExtension
{
#region DependencyProperty

/// <summary>
/// SimpleDmtpRpcActor
/// </summary>
public static readonly DependencyProperty<ISimpleDmtpRpcActor> SimpleDmtpRpcActorProperty =
DependencyProperty<ISimpleDmtpRpcActor>.Register("SimpleDmtpRpcActor", default);

#endregion DependencyProperty

#region 插件扩展

/// <summary>
/// 使用SimpleDmtpRpc插件
/// </summary>
/// <param name="pluginsManager"></param>
/// <returns></returns>
public static SimpleDmtpRpcFeature UseSimpleDmtpRpc(this IPluginsManager pluginsManager)
{
return pluginsManager.Add<SimpleDmtpRpcFeature>();
}
#endregion 插件扩展

/// <summary>
/// 从<see cref="DmtpActor"/>中获取<see cref="ISimpleDmtpRpcActor"/>
/// </summary>
/// <param name="smtpActor"></param>
/// <returns></returns>
public static ISimpleDmtpRpcActor GetSimpleDmtpRpcActor(this IDmtpActor smtpActor)
{
return smtpActor.GetValue(SimpleDmtpRpcActorProperty);
}

/// <summary>
/// 从<see cref="IDmtpActorObject"/>中获取<see cref="ISimpleDmtpRpcActor"/>,以实现Rpc调用功能。
/// </summary>
/// <param name="client"></param>
/// <returns></returns>
/// <exception cref="ArgumentNullException"></exception>
public static ISimpleDmtpRpcActor GetSimpleDmtpRpcActor(this IDmtpActorObject client)
{
var smtpRpcActor = client.DmtpActor.GetSimpleDmtpRpcActor();
if (smtpRpcActor is null)
{
throw new ArgumentNullException(nameof(smtpRpcActor), "SimpleRpcAcotr为空,请检查是否已启用UseSimpleDmtpRpc");
}
return smtpRpcActor;
}

/// <summary>
/// 向<see cref="DmtpActor"/>中设置<see cref="ISimpleDmtpRpcActor"/>
/// </summary>
/// <param name="smtpActor"></param>
/// <param name="smtpRpcActor"></param>
internal static void SetSimpleDmtpRpcActor(this IDmtpActor smtpActor, ISimpleDmtpRpcActor smtpRpcActor)
{
smtpActor.SetValue(SimpleDmtpRpcActorProperty, smtpRpcActor);
}
}

3.3 声明功能插件

功能插件是为Actor提供创建时机,和必要的配置信息。如果有需要,还可以抛出插件信息(例如文件传输 )。

class SimpleDmtpRpcFeature : PluginBase, IDmtpHandshakingPlugin, IDmtpReceivedPlugin
{
readonly Dictionary<string, MethodModel> m_pairs = new Dictionary<string, MethodModel>();
public async Task OnDmtpHandshaking(IDmtpActorObject client, DmtpVerifyEventArgs e)
{
var actor = new SimpleDmtpRpcActor(client.DmtpActor)
{
TryFindMethod = this.TryFindMethod
};
client.DmtpActor.SetSimpleDmtpRpcActor(actor);
await e.InvokeNext();
}

private MethodModel TryFindMethod(string methodName)
{
if (this.m_pairs.TryGetValue(methodName, out var methodModel))
{
return methodModel;
}
return default;
}

public void RegisterRpc(object server)
{
if (server is null)
{
throw new ArgumentNullException(nameof(server));
}

foreach (var item in server.GetType().GetMethods(BindingFlags.Default | BindingFlags.Instance | BindingFlags.Public))
{
m_pairs.Add(item.Name, new MethodModel(item, server));
}
}

public async Task OnDmtpReceived(IDmtpActorObject client, DmtpMessageEventArgs e)
{
if (client.DmtpActor.GetSimpleDmtpRpcActor() is SimpleDmtpRpcActor actor)
{
if (await actor.InputReceivedData(e.DmtpMessage))
{
e.Handled = true;
return;
}
}
await e.InvokeNext();
}
}

四、使用

使用非常简单,基本和现有的功能插件类似。

【服务器】

private static TcpDmtpService GetTcpDmtpService()
{
var service = new TcpDmtpService();

var config = new TouchSocketConfig()//配置
.SetListenIPHosts(new IPHost[] { new IPHost(7789) })
.ConfigureContainer(a =>
{
a.AddConsoleLogger();

a.AddDmtpRouteService();//添加路由策略
})
.ConfigurePlugins(a =>
{
a.UseSimpleDmtpRpc()
.RegisterRpc(new MyServer());
})
.SetDmtpOption(new DmtpOption()
{
VerifyToken = "Dmtp"//连接验证口令。
});

service.Setup(config);
service.Start();
service.Logger.Info("服务器成功启动");
return service;
}

【客户端】

private static TcpDmtpClient GetTcpDmtpClient()
{
var client = new TouchSocketConfig()
.SetRemoteIPHost("127.0.0.1:7789")
.SetDmtpOption(new DmtpOption()
{
VerifyToken = "Dmtp"//连接验证口令。
})
.ConfigureContainer(a =>
{
a.AddConsoleLogger();
})
.ConfigurePlugins(a =>
{
a.UseSimpleDmtpRpc();

a.UseDmtpHeartbeat()//使用Dmtp心跳
.SetTick(TimeSpan.FromSeconds(3))
.SetMaxFailCount(3);
})
.BuildWithTcpDmtpClient();

client.Logger.Info("连接成功");
return client;
}
static void Main(string[] args)
{
var service = GetTcpDmtpService();
var client = GetTcpDmtpClient();

while (true)
{
string methodName = Console.ReadLine();
var actor = client.GetSimpleDmtpRpcActor();

try
{
actor.Invoke(methodName);
Console.WriteLine("调用成功");
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}

本文示例Demo