自定义DmtpActor
定义
命名空间:TouchSocket.Dmtp
程序集:TouchSocket.Dmtp.dll
一、说明
DmtpActor
是Dmtp
协议的Actor
,负责Dmtp
协议的交互。规范的Actor
可以做到一次开发,多种协议适用,比如DmtpRpcActor
、DmtpFileTransferActor
等。所以学习Dmtp的Actor
开发,是进阶使用Dmtp
的必要途径。
二、使用Actor开发的优点
实际上,使用协议扩展即可扩展Dmtp的功能,但是使用Actor开发,则更具优点:
- 很好的分离了协议的交互逻辑和协议的实现逻辑。可以做到一次开发,多种协议适用。
- 按照规范的开发,可以非常好的支持客户端到服务器、服务器到客户端、客户端到客户端的交互。
- 更高等级的代码封装,让程序更易维护。
三、实现
Actor
通讯的原理,就是当DmtpActor
(主通讯器)收到DmtpMessage
后,会依次投递给所有的Dmtp插件,当某个插件通过ProtocolFlag
判断正确后,由本插件支持的Actor
进行处理数据。
下面我们将通过实现一个最简单的Rpc调用来熟悉Actor通讯。
基本流程如下:
3.1 声明Actor
首先新建一个接口ISimpleDmtpRpcActor
,继承IActor
。然后新建一个类SimpleDmtpRpcActor
,实现ISimpleDmtpRpcActor
接口。
声明m_invoke_Request
和m_invoke_Response
两个变量,用于判断ProtocolFlags
能否在当前Actor处理。
声明TryFindMethod
委托,用于向外面插件通过方法名搜索Rpc方法。
interface ISimpleDmtpRpcActor : IActor
{
void Invoke(string methodName);
void Invoke(string targetId, string methodName);
}
class SimpleDmtpRpcActor : ISimpleDmtpRpcActor
{
private ushort m_invoke_Request = 1000;
private ushort m_invoke_Response = 1001;
public IDmtpActor DmtpActor { get; private set; }
public Func<string, MethodModel> TryFindMethod { get; set; }
public SimpleDmtpRpcActor(IDmtpActor dmtpActor)
{
this.DmtpActor = dmtpActor;
}
public async Task<bool> InputReceivedData(DmtpMessage message)
{
var byteBlock = message.BodyByteBlock;
if (message.ProtocolFlags == this.m_invoke_Request)
{
try
{
var rpcPackage = new SimpleDmtpRpcPackage();
rpcPackage.UnpackageRouter(byteBlock);
if (rpcPackage.Route && this.DmtpActor.AllowRoute)
{
if (await this.DmtpActor.TryRoute(new PackageRouterEventArgs(new RouteType("SimpleRpc"), rpcPackage)))
{
if (await this.DmtpActor.TryFindDmtpActor(rpcPackage.TargetId) is DmtpActor actor)
{
actor.SendAsync(this.m_invoke_Request, byteBlock);
return true;
}
else
{
rpcPackage.Status = 2;
}
}
else
{
rpcPackage.Status = 3;
}
byteBlock.Reset();
rpcPackage.SwitchId();
rpcPackage.Package(byteBlock);
this.DmtpActor.SendAsync(this.m_invoke_Response, byteBlock);
}
else
{
rpcPackage.UnpackageBody(byteBlock);
_ = Task.Factory.StartNew(this.InvokeThis, rpcPackage);
}
}
catch (Exception ex)
{
this.DmtpActor.Logger.Error(this, $"在protocol={message.ProtocolFlags}中发生错误。信息:{ex.Message}");
}
return true;
}
else if (message.ProtocolFlags == this.m_invoke_Response)
{
try
{
var rpcPackage = new SimpleDmtpRpcPackage();
rpcPackage.UnpackageRouter(byteBlock);
if (this.DmtpActor.AllowRoute && rpcPackage.Route)
{
if (await this.DmtpActor.TryFindDmtpActor(rpcPackage.TargetId) is DmtpActor actor)
{
actor.SendAsync(this.m_invoke_Response, byteBlock);
}
}
else
{
rpcPackage.UnpackageBody(byteBlock);
this.DmtpActor.WaitHandlePool.SetRun(rpcPackage);
}
}
catch (Exception ex)
{
this.DmtpActor.Logger.Error(this, $"在protocol={message.ProtocolFlags}中发生错误。信息:{ex.Message}");
}
return true;
}
return false;
}
private void InvokeThis(object obj)
{
var package = (SimpleDmtpRpcPackage)obj;
var methodModel = this.TryFindMethod.Invoke(package.MethodName);
if (methodModel == null)
{
using (var byteBlock = new ByteBlock())
{
package.Status = 4;
package.SwitchId();
package.Package(byteBlock);
this.DmtpActor.SendAsync(this.m_invoke_Response, byteBlock);
return;
}
}
try
{
methodModel.Method.Invoke(methodModel.Target, default);
using (var byteBlock = new ByteBlock())
{
package.Status = 1;
package.SwitchId();
package.Package(byteBlock);
this.DmtpActor.SendAsync(this.m_invoke_Response, byteBlock);
return;
}
}
catch (Exception ex)
{
using (var byteBlock = new ByteBlock())
{
package.Status = 5;
package.Message = ex.Message;
package.SwitchId();
package.Package(byteBlock);
this.DmtpActor.SendAsync(this.m_invoke_Response, byteBlock);
return;
}
}
}
private async Task<SimpleDmtpRpcActor> TryFindDmtpRpcActor(string targetId)
{
if (targetId == this.DmtpActor.Id)
{
return this;
}
if (await this.DmtpActor.TryFindDmtpActor(targetId) is DmtpActor dmtpActor)
{
if (dmtpActor.GetSimpleDmtpRpcActor() is SimpleDmtpRpcActor newActor)
{
return newActor;
}
}
return default;
}
public void Invoke(string methodName)
{
this.PrivateInvoke(default, methodName);
}
public async void Invoke(string targetId, string methodName)
{
if (string.IsNullOrEmpty(targetId))
{
throw new ArgumentException($"“{nameof(targetId)}”不能为 null 或空。", nameof(targetId));
}
if (string.IsNullOrEmpty(methodName))
{
throw new ArgumentException($"“{nameof(methodName)}”不能为 null 或空。", nameof(methodName));
}
if (this.DmtpActor.AllowRoute && await this.TryFindDmtpRpcActor(targetId) is SimpleDmtpRpcActor actor)
{
actor.Invoke(methodName);
return;
}
this.PrivateInvoke(targetId, methodName);
}
private void PrivateInvoke(string id, string methodName)
{
var package = new SimpleDmtpRpcPackage()
{
MethodName = methodName,
Route = id.HasValue(),
SourceId = this.DmtpActor.Id,
TargetId = id
};
var waitData = this.DmtpActor.WaitHandlePool.GetReverseWaitData(package);
try
{
using (var byteBlock = new ByteBlock())
{
package.Package(byteBlock);
this.DmtpActor.SendAsync(this.m_invoke_Request, byteBlock);
}
switch (waitData.Wait(5000))
{
case WaitDataStatus.SetRunning:
var result = (SimpleDmtpRpcPackage)waitData.WaitResult;
result.CheckStatus();
return;
case WaitDataStatus.Overtime:
throw new TimeoutException();
case WaitDataStatus.Canceled:
break;
case WaitDataStatus.Default:
case WaitDataStatus.Disposed:
default:
throw new Exception("未知异常");
}
}
finally
{
this.DmtpActor.WaitHandlePool.Destroy(waitData);
}
}
}
额外模型类
MethodModel
用于储存反射调用相关。
class MethodModel
{
public MethodModel(MethodInfo method, object target)
{
this.Method = method;
this.Target = target;
}
public MethodInfo Method { get; private set; }
public object Target { get; private set; }
}
SimpleDmtpRpcPackage
类,用于打包Rpc请求,关于打包的使用详情,可以看包序列化
class SimpleDmtpRpcPackage : WaitRouterPackage
{
protected override bool IncludedRouter => true;
public string MethodName { get; set; }
public override void PackageBody(in ByteBlock byteBlock)
{
base.PackageBody(byteBlock);
byteBlock.Write(this.MethodName);
}
public override void UnpackageBody(in ByteBlock byteBlock)
{
base.UnpackageBody(byteBlock);
this.MethodName = byteBlock.ReadString();
}
public void CheckStatus()
{
switch (this.Status)
{
case 0:
throw new TimeoutException();
case 1: return;
case 2: throw new Exception("没有找到目标Id");
case 3: throw new Exception("不允许路由");
case 4: throw new Exception("没找到Rpc");
case 5: throw new Exception($"其他异常:{this.Message}");
}
}
}
3.2 声明扩展
扩展方法是为IPluginsManager
、IDmtpActor
、IDmtpActorObject
等提供扩展功能的静态方法。
static class SimpleDmtpRpcExtension
{
#region DependencyProperty
/// <summary>
/// SimpleDmtpRpcActor
/// </summary>
public static readonly DependencyProperty<ISimpleDmtpRpcActor> SimpleDmtpRpcActorProperty =
DependencyProperty<ISimpleDmtpRpcActor>.Register("SimpleDmtpRpcActor", default);
#endregion DependencyProperty
#region 插件扩展
/// <summary>
/// 使用SimpleDmtpRpc插件
/// </summary>
/// <param name="pluginsManager"></param>
/// <returns></returns>
public static SimpleDmtpRpcFeature UseSimpleDmtpRpc(this IPluginsManager pluginsManager)
{
return pluginsManager.Add<SimpleDmtpRpcFeature>();
}
#endregion 插件扩展
/// <summary>
/// 从<see cref="DmtpActor"/>中获取<see cref="ISimpleDmtpRpcActor"/>
/// </summary>
/// <param name="smtpActor"></param>
/// <returns></returns>
public static ISimpleDmtpRpcActor GetSimpleDmtpRpcActor(this IDmtpActor smtpActor)
{
return smtpActor.GetValue(SimpleDmtpRpcActorProperty);
}
/// <summary>
/// 从<see cref="IDmtpActorObject"/>中获取<see cref="ISimpleDmtpRpcActor"/>,以实现Rpc调用功能。
/// </summary>
/// <param name="client"></param>
/// <returns></returns>
/// <exception cref="ArgumentNullException"></exception>
public static ISimpleDmtpRpcActor GetSimpleDmtpRpcActor(this IDmtpActorObject client)
{
var smtpRpcActor = client.DmtpActor.GetSimpleDmtpRpcActor();
if (smtpRpcActor is null)
{
throw new ArgumentNullException(nameof(smtpRpcActor), "SimpleRpcAcotr为空,请检查是否已启用UseSimpleDmtpRpc");
}
return smtpRpcActor;
}
/// <summary>
/// 向<see cref="DmtpActor"/>中设置<see cref="ISimpleDmtpRpcActor"/>
/// </summary>
/// <param name="smtpActor"></param>
/// <param name="smtpRpcActor"></param>
internal static void SetSimpleDmtpRpcActor(this IDmtpActor smtpActor, ISimpleDmtpRpcActor smtpRpcActor)
{
smtpActor.SetValue(SimpleDmtpRpcActorProperty, smtpRpcActor);
}
}