自定义DmtpActor
定义
命名空间:TouchSocket.Dmtp
程序集:TouchSocket.Dmtp.dll
一、说明
DmtpActor
是Dmtp
协议的Actor
,负责Dmtp
协议的交互。规范的Actor
可以做到一次开发,多种协议适用,比如DmtpRpcActor
、DmtpFileTransferActor
等。所以学习Dmtp的Actor
开发,是进阶使用Dmtp
的必要途径。
二、使用Actor开发的优点
实际上,使用协议扩展即可扩展Dmtp的功能,但是使用Actor开发,则更具优点:
- 很好的分离了协议的交互逻辑和协议的实现逻辑。可以做到一次开发,多种协议适用。
- 按照规范的开发,可以非常好的支持客户端到服务器、服务器到客户端、客户端到客户端的交互。
- 更高等级的代码封装, 让程序更易维护。
三、实现
Actor
通讯的原理,就是当DmtpActor
(主通讯器)收到DmtpMessage
后,会依次投递给所有的Dmtp插件,当某个插件通过ProtocolFlag
判断正确后,由本插件支持的Actor
进行处理数据。
下面我们将通过实现一个最简单的Rpc调用来熟悉Actor通讯。
基本流程如下:
3.1 声明Actor
首先新建一个接口ISimpleDmtpRpcActor
,继承IActor
。然后新建一个类SimpleDmtpRpcActor
,实现ISimpleDmtpRpcActor
接口。
声明m_invoke_Request
和m_invoke_Response
两个变量,用于判断ProtocolFlags
能否在当前Actor处理。
声明TryFindMethod
委托,用于向外面插件通过方法名搜索Rpc方法。
interface ISimpleDmtpRpcActor : IActor
{
void Invoke(string methodName);
void Invoke(string targetId, string methodName);
}
class SimpleDmtpRpcActor : ISimpleDmtpRpcActor
{
private ushort m_invoke_Request = 1000;
private ushort m_invoke_Response = 1001;
public IDmtpActor DmtpActor { get; private set; }
public Func<string, MethodModel> TryFindMethod { get; set; }
public SimpleDmtpRpcActor(IDmtpActor dmtpActor)
{
this.DmtpActor = dmtpActor;
}
public async Task<bool> InputReceivedData(DmtpMessage message)
{
var byteBlock = message.BodyByteBlock;
if (message.ProtocolFlags == this.m_invoke_Request)
{
try
{
var rpcPackage = new SimpleDmtpRpcPackage();
rpcPackage.UnpackageRouter(byteBlock);
if (rpcPackage.Route && this.DmtpActor.AllowRoute)
{
if (await this.DmtpActor.TryRoute(new PackageRouterEventArgs(new RouteType("SimpleRpc"), rpcPackage)))
{
if (await this.DmtpActor.TryFindDmtpActor(rpcPackage.TargetId) is DmtpActor actor)
{
actor.Send(this.m_invoke_Request, byteBlock);
return true;
}
else
{
rpcPackage.Status = 2;
}
}
else
{
rpcPackage.Status = 3;
}
byteBlock.Reset();
rpcPackage.SwitchId();
rpcPackage.Package(byteBlock);
this.DmtpActor.Send(this.m_invoke_Response, byteBlock);
}
else
{
rpcPackage.UnpackageBody(byteBlock);
_ = Task.Factory.StartNew(this.InvokeThis, rpcPackage);
}
}
catch (Exception ex)
{
this.DmtpActor.Logger.Error(this, $"在protocol={message.ProtocolFlags}中发生错误。信息:{ex.Message}");
}
return true;
}
else if (message.ProtocolFlags == this.m_invoke_Response)
{
try
{
var rpcPackage = new SimpleDmtpRpcPackage();
rpcPackage.UnpackageRouter(byteBlock);
if (this.DmtpActor.AllowRoute && rpcPackage.Route)
{
if (await this.DmtpActor.TryFindDmtpActor(rpcPackage.TargetId) is DmtpActor actor)
{
actor.Send(this.m_invoke_Response, byteBlock);
}
}
else
{
rpcPackage.UnpackageBody(byteBlock);
this.DmtpActor.WaitHandlePool.SetRun(rpcPackage);
}
}
catch (Exception ex)
{
this.DmtpActor.Logger.Error(this, $"在protocol={message.ProtocolFlags}中发生错误。信息:{ex.Message}");
}
return true;
}
return false;
}
private void InvokeThis(object obj)
{
var package = (SimpleDmtpRpcPackage)obj;
var methodModel = this.TryFindMethod.Invoke(package.MethodName);
if (methodModel == null)
{
using (var byteBlock = new ByteBlock())
{
package.Status = 4;
package.SwitchId();
package.Package(byteBlock);
this.DmtpActor.Send(this.m_invoke_Response, byteBlock);
return;
}
}
try
{
methodModel.Method.Invoke(methodModel.Target, default);
using (var byteBlock = new ByteBlock())
{
package.Status = 1;
package.SwitchId();
package.Package(byteBlock);
this.DmtpActor.Send(this.m_invoke_Response, byteBlock);
return;
}
}
catch (Exception ex)
{
using (var byteBlock = new ByteBlock())
{
package.Status = 5;
package.Message = ex.Message;
package.SwitchId();
package.Package(byteBlock);
this.DmtpActor.Send(this.m_invoke_Response, byteBlock);
return;
}
}
}
private async Task<SimpleDmtpRpcActor> TryFindDmtpRpcActor(string targetId)
{
if (targetId == this.DmtpActor.Id)
{
return this;
}
if (await this.DmtpActor.TryFindDmtpActor(targetId) is DmtpActor dmtpActor)
{
if (dmtpActor.GetSimpleDmtpRpcActor() is SimpleDmtpRpcActor newActor)
{
return newActor;
}
}
return default;
}
public void Invoke(string methodName)
{
this.PrivateInvoke(default, methodName);
}
public async void Invoke(string targetId, string methodName)
{
if (string.IsNullOrEmpty(targetId))
{
throw new ArgumentException($"“{nameof(targetId)}”不能为 null 或空。", nameof(targetId));
}
if (string.IsNullOrEmpty(methodName))
{
throw new ArgumentException($"“{nameof(methodName)}”不能为 null 或空。", nameof(methodName));
}
if (this.DmtpActor.AllowRoute && await this.TryFindDmtpRpcActor(targetId) is SimpleDmtpRpcActor actor)
{
actor.Invoke(methodName);
return;
}
this.PrivateInvoke(targetId, methodName);
}
private void PrivateInvoke(string id, string methodName)
{
var package = new SimpleDmtpRpcPackage()
{
MethodName = methodName,
Route = id.HasValue(),
SourceId = this.DmtpActor.Id,
TargetId = id
};
var waitData = this.DmtpActor.WaitHandlePool.GetReverseWaitData(package);
try
{
using (var byteBlock = new ByteBlock())
{
package.Package(byteBlock);
this.DmtpActor.Send(this.m_invoke_Request, byteBlock);
}
switch (waitData.Wait(5000))
{
case WaitDataStatus.SetRunning:
var result = (SimpleDmtpRpcPackage)waitData.WaitResult;
result.CheckStatus();
return;
case WaitDataStatus.Overtime:
throw new TimeoutException();
case WaitDataStatus.Canceled:
break;
case WaitDataStatus.Default:
case WaitDataStatus.Disposed:
default:
throw new Exception("未知异常");
}
}
finally
{
this.DmtpActor.WaitHandlePool.Destroy(waitData);
}
}
}
额外模型类
MethodModel
用于储存反射调用相关。
class MethodModel
{
public MethodModel(MethodInfo method, object target)
{
this.Method = method;
this.Target = target;
}
public MethodInfo Method { get; private set; }
public object Target { get; private set; }
}
SimpleDmtpRpcPackage
类,用于打包Rpc请求,关于打包的使用详情,可以看包序列化
class SimpleDmtpRpcPackage : WaitRouterPackage
{
protected override bool IncludedRouter => true;
public string MethodName { get; set; }
public override void PackageBody(in ByteBlock byteBlock)
{
base.PackageBody(byteBlock);
byteBlock.Write(this.MethodName);
}
public override void UnpackageBody(in ByteBlock byteBlock)
{
base.UnpackageBody(byteBlock);
this.MethodName = byteBlock.ReadString();
}
public void CheckStatus()
{
switch (this.Status)
{
case 0:
throw new TimeoutException();
case 1: return;
case 2: throw new Exception("没有找到目标Id");
case 3: throw new Exception("不允许路由");
case 4: throw new Exception("没找到Rpc");
case 5: throw new Exception($"其他异常:{this.Message}");
}
}
}
3.2 声明扩展
扩展方法是为IPluginsManager
、IDmtpActor
、IDmtpActorObject
等提供扩展功能的静态方法。
static class SimpleDmtpRpcExtension
{
#region DependencyProperty
/// <summary>
/// SimpleDmtpRpcActor
/// </summary>
public static readonly DependencyProperty<ISimpleDmtpRpcActor> SimpleDmtpRpcActorProperty =
DependencyProperty<ISimpleDmtpRpcActor>.Register("SimpleDmtpRpcActor", default);
#endregion DependencyProperty
#region 插件扩展
/// <summary>
/// 使用SimpleDmtpRpc插件
/// </summary>
/// <param name="pluginsManager"></param>
/// <returns></returns>
public static SimpleDmtpRpcFeature UseSimpleDmtpRpc(this IPluginsManager pluginsManager)
{
return pluginsManager.Add<SimpleDmtpRpcFeature>();
}
#endregion 插件扩展
/// <summary>
/// 从<see cref="DmtpActor"/>中获取<see cref="ISimpleDmtpRpcActor"/>
/// </summary>
/// <param name="smtpActor"></param>
/// <returns></returns>
public static ISimpleDmtpRpcActor GetSimpleDmtpRpcActor(this IDmtpActor smtpActor)
{
return smtpActor.GetValue(SimpleDmtpRpcActorProperty);
}
/// <summary>
/// 从<see cref="IDmtpActorObject"/>中获取<see cref="ISimpleDmtpRpcActor"/>,以实现Rpc调用功能。
/// </summary>
/// <param name="client"></param>
/// <returns></returns>
/// <exception cref="ArgumentNullException"></exception>
public static ISimpleDmtpRpcActor GetSimpleDmtpRpcActor(this IDmtpActorObject client)
{
var smtpRpcActor = client.DmtpActor.GetSimpleDmtpRpcActor();
if (smtpRpcActor is null)
{
throw new ArgumentNullException(nameof(smtpRpcActor), "SimpleRpcAcotr为空,请检查是否已启用UseSimpleDmtpRpc");
}
return smtpRpcActor;
}
/// <summary>
/// 向<see cref="DmtpActor"/>中设置<see cref="ISimpleDmtpRpcActor"/>
/// </summary>
/// <param name="smtpActor"></param>
/// <param name="smtpRpcActor"></param>
internal static void SetSimpleDmtpRpcActor(this IDmtpActor smtpActor, ISimpleDmtpRpcActor smtpRpcActor)
{
smtpActor.SetValue(SimpleDmtpRpcActorProperty, smtpRpcActor);
}
}
3.3 声明功能插件
功能插件是为Actor提供创建时机,和必要的配置信息。如果有需要,还可以抛出插件信息(例如文件传输 )。
class SimpleDmtpRpcFeature : PluginBase, IDmtpHandshakingPlugin, IDmtpReceivedPlugin
{
readonly Dictionary<string, MethodModel> m_pairs = new Dictionary<string, MethodModel>();
public async Task OnDmtpHandshaking(IDmtpActorObject client, DmtpVerifyEventArgs e)
{
var actor = new SimpleDmtpRpcActor(client.DmtpActor)
{
TryFindMethod = this.TryFindMethod
};
client.DmtpActor.SetSimpleDmtpRpcActor(actor);
await e.InvokeNext();
}
private MethodModel TryFindMethod(string methodName)
{
if (this.m_pairs.TryGetValue(methodName, out var methodModel))
{
return methodModel;
}
return default;
}
public void RegisterRpc(object server)
{
if (server is null)
{
throw new ArgumentNullException(nameof(server));
}
foreach (var item in server.GetType().GetMethods(BindingFlags.Default | BindingFlags.Instance | BindingFlags.Public))
{
m_pairs.Add(item.Name, new MethodModel(item, server));
}
}
public async Task OnDmtpReceived(IDmtpActorObject client, DmtpMessageEventArgs e)
{
if (client.DmtpActor.GetSimpleDmtpRpcActor() is SimpleDmtpRpcActor actor)
{
if (await actor.InputReceivedData(e.DmtpMessage))
{
return;
}
}
await e.InvokeNext();
}
}
四、使用
使用非常简单,基本和现有的功能插件类似。
【服务器】
private static TcpDmtpService GetTcpDmtpService()
{
var service = new TcpDmtpService();
var config = new TouchSocketConfig()//配置
.SetListenIPHosts(new IPHost[] { new IPHost(7789) })
.ConfigureContainer(a =>
{
a.AddConsoleLogger();
a.AddDmtpRouteService();//添加路由策略
})
.ConfigurePlugins(a =>
{
a.UseSimpleDmtpRpc()
.RegisterRpc(new MyServer());
})
.SetDmtpOption(new DmtpOption()
{
VerifyToken = "Dmtp"//连接验证口令。
});
service.Setup(config);
service.Start();
service.Logger.Info("服务器成功启动");
return service;
}
【客户端】
private static TcpDmtpClient GetTcpDmtpClient()
{
var client = new TouchSocketConfig()
.SetRemoteIPHost("127.0.0.1:7789")
.SetDmtpOption(new DmtpOption()
{
VerifyToken = "Dmtp"//连接验证口令。
})
.ConfigureContainer(a =>
{
a.AddConsoleLogger();
})
.ConfigurePlugins(a =>
{
a.UseSimpleDmtpRpc();
a.UseDmtpHeartbeat()//使用Dmtp心跳
.SetTick(TimeSpan.FromSeconds(3))
.SetMaxFailCount(3);
})
.BuildWithTcpDmtpClient();
client.Logger.Info("连接成功");
return client;
}
static void Main(string[] args)
{
var service = GetTcpDmtpService();
var client = GetTcpDmtpClient();
while (true)
{
string methodName = Console.ReadLine();
var actor = client.GetSimpleDmtpRpcActor();
try
{
actor.Invoke(methodName);
Console.WriteLine("调用成功");
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}