跳到主要内容
版本:3.0

Dmtp基础功能

定义

命名空间:TouchSocket.Dmtp
程序集:TouchSocket.Dmtp.dll

一、连接

连接验证可以初步保证连接客户端的安全性。框架内部默认使用一个string类型的Token作为验证凭证。当然也允许服务器进行其他验证。具体如下:

1.1 Token验证

在服务器或客户端的配置上,设置VerifyToken,即可实现字符串Token验证。

var config = new TouchSocketConfig()
.SetDmtpOption(new DmtpOption()
{
VerifyToken = "Dmtp"
});

1.2 动态验证

使用插件,实现IDmtpHandshakingPlugin插件。然后可以自行判断一些信息,例如元数据等。

如果是特定协议的Dmtp,还可以判断一些特定信息,例如,在TcpDmtp中,可以判断IP地址等。

客户端,在连接时,可以设置元数据。

var config = new TouchSocketConfig()
.SetDmtpOption(new DmtpOption()
{
Metadata=new Metadata().Add("a","a")
});

服务端使用插件验证连接信息。

internal class MyVerifyPlugin : PluginBase, IDmtpHandshakingPlugin
{
public async Task OnDmtpHandshaking(IDmtpActorObject client, DmtpVerifyEventArgs e)
{
if (e.Metadata["a"] != "a")
{
e.IsPermitOperation = false;//不允许连接
e.Message = "元数据不对";//同时返回消息
e.Handled = true;//表示该消息已在此处处理。
return;
}

if (client is ITcpDmtpSessionClient sessionClient)
{
//在特定协议的情况下,可以获取特定信息
var ip = sessionClient.IP;
}

if (e.Token == "Dmtp")
{
e.IsPermitOperation = true;
e.Handled = true;
return;
}

await e.InvokeNext();
}
}

1.4 跨语言

为使Dmtp支持跨语言,Dmtp在设计之初就预留了跨语言连接的便利性。诚如Dmtp描述所示,其基础数据报文为Head+Flags+Length+Data。而框架内部的HandshakePingPongClose等指令均是采用Json数据格式。但是即使如此,连接时的真正数据,还与其基础协议有关。具体如下:

以连接、操作TcpDmtpService为例。其基础协议即为tcp,则使用常规的tcp客户端即可模拟链接。

using var tcpClient = new TcpClient();//创建一个普通的tcp客户端。
tcpClient.Received = (client, e) =>
{
//此处接收服务器返回的消息

var head = e.ByteBlock.ToArray(0, 2);
e.ByteBlock.Seek(2, SeekOrigin.Begin);
var flags = e.ByteBlock.ReadUInt16(EndianType.Big);
var length = e.ByteBlock.ReadInt32(EndianType.Big);

var json = e.ByteBlock.Span.ToString(Encoding.UTF8);

ConsoleLogger.Default.Info($"收到响应:flags={flags},length={length},json={json.Replace("\r\n", string.Empty).Replace(" ", string.Empty)}");


return Task.CompletedTask;
};

//开始链接服务器
await tcpClient.ConnectAsync("127.0.0.1:7789");

//以json的数据方式。
//其中Token、Metadata为连接的验证数据,分别为字符串、字符串字典类型。
//Id则表示指定的默认id,字符串类型。
//Sign为本次请求的序号,一般在连接时指定一个大于0的任意数字即可。
var json = @"{""Token"":""Dmtp"",""Metadata"":{""a"":""a""},""Id"":null,""Sign"":1}";

//将json转为utf-8编码。
var jsonBytes = Encoding.UTF8.GetBytes(json);

using (var byteBlock = new ByteBlock())
{
//按照Head+Flags+Length+Data的格式。
byteBlock.Write(Encoding.ASCII.GetBytes("dm"));
byteBlock.Write(TouchSocketBitConverter.BigEndian.GetBytes((ushort)1));
byteBlock.Write(TouchSocketBitConverter.BigEndian.GetBytes((int)jsonBytes.Length));
byteBlock.Write(jsonBytes);

await tcpClient.SendAsync(byteBlock.Memory);
}

await Task.Delay(2000);

接收输出:

收到的Json字符串,会返回服务器最终修改的TokenMetadata。同时还包括分配或指定的IdSign会与请求时一致,表示这是同一组请求。Status等于1即为连接成功。其他值则可能在Message表明连接失败的原因。

收到响应:flags=2,length=124,json={"Token":"Dmtp","Metadata":{"a":"a"},"Id":"1","Message":null,"Sign":1,"Status":1}

其他Json请求,包括:

【请求连接】

  • Token:连接令牌,字符串类型。
  • Metadata:连接元数据,字典类型。
  • Id:初始连接Id,字符串类型。
  • Sign:请求序号,整数类型,应当每次请求递增。
  • Status(仅返回时携带):连接状态,整数类型,为1时成功,其他状态可以获取Message。
  • Message(仅返回时携带):连接消息。
{"Token":"Dmtp","Metadata":{"a":"a"},"Id":null,"Sign":1}

【Ping】

  • Sign:请求序号,整数类型,应当每次请求递增。
  • Status(仅返回时携带):连接状态,整数类型,为1时成功,其他状态可以获取Message。
  • Message(仅返回时携带):连接消息。
  • Route:是否路由,布尔类型,当TargetId不为空时应设为True。
  • SourceId:源Id,字符串类型,当需要Ping其他客户端时,应该将该值设为自身Id,以保证返回的信息能够顺利路由回来。
  • TargetId:目标Id,字符串类型,当需要Ping其他客户端时,应该将该值设为目标Id,以保证信息能够顺利送达。
{"Sign":0,"Status":0,"Message":null,"Route":false,"SourceId":null,"TargetId":null}

【Close】

Close报文直接采用utf8编码的字符串。

"close"
备注

DMTP基础功能是比较容易跨语言的,但这并不意味着DMTP所有的功能都支持。其扩展功能,则需要实际的跨平台设计。不过依靠简单的Head+Flags+Length+Data的格式,也能自己实现很多的功能。

二、Id同步

在Dmtp中,存在于服务器的辅助客户端(SessionClient),与远程客户端(Client)是一一对应关系,其Id也完全一致。所以在任意一方修改Id(调用ResetId),都会同时修改远程Id。所以合理使用该操作,可以完成复用Id(重置Id)的需求。

三、发送数据

Dmtp提供协议发送数据,又叫协议扩展功能,就是对现有的Dmtp进行自定义的扩展协议。

使用起来是非常简单的,每个DmtpActor,都实现了Send方法接口。

第一个参数为ushort类型,使用者可以约定任意大于20数值

client.SendAsync(1000,Encoding.UTF8.GetBytes("RRQM"));
注意

Flags不要使用小于20的,因为框架内部在使用。并且小于100的也最好不要使用,因为可能其他组件也在使用。

接收方订阅IDmtpReceivedPlugin,已经包含了协议参数,所以直接自行筛选即可。

internal class MyFlagsPlugin : PluginBase, IDmtpReceivedPlugin
{
public async Task OnDmtpReceived(IDmtpActorObject client, DmtpMessageEventArgs e)
{
if (e.DmtpMessage.ProtocolFlags == 1000)
{
//判断完协议以后,从 e.DmtpMessage.BodyByteBlock可以拿到实际的数据
string msg = e.DmtpMessage.BodyByteBlock.ToString();

return;
}

//flags不满足,调用下一个插件
await e.InvokeNext();
}
}

四、使用Channel发送数据

ChannelDmtpActor的一个传输通道。一般来说,DmtpActor是一个逻辑连接(例如:一个TcpUdpNamedPipe等)。而Channel则是更加细分DmtpActor的一个传输通道。

换言之,一个DmtpActor可以有多个Channel。多个Channel可以同时进行传输。并且数据互不干扰。

4.1 特点

  1. 每个Channel都有独立的Id,所以多个Channel可以同时工作。
  2. 每个Channel在被创建后,都支持双向读写。
  3. 每个Channel都支持自定义限速和读写超时设定。

4.2 使用步骤

  1. 由一方DmtpActor创建一个Channel,并记住Id。
  2. 另一方DmtpActor使用这个Id订阅Channel。默认情况下,Channel在被创建时,接收端会触发IDmtpCreatedChannelPlugin插件。此时可以直接订阅。
  3. 发送方使用Channel发送(WriteAsync)数据
  4. 接收方通过订阅的Channel,接收(Read)数据

下列以TcpDmtpClient发送、TcpDmtpService接收为例:

【创建服务器】

var service = new TcpDmtpService();

var config = new TouchSocketConfig()//配置
.SetListenIPHosts(7789)
.ConfigureContainer(a =>
{
a.AddConsoleLogger();
})
.ConfigurePlugins(a =>
{
a.Add<MyPlugin>();
})
.SetDmtpOption(new DmtpOption()
{
VerifyToken = "Channel"//连接验证口令。
});

await service.SetupAsync(config);
await service.StartAsync();
service.Logger.Info("服务器成功启动");

【创建客户端】

var client = await new TouchSocketConfig()
.SetRemoteIPHost("127.0.0.1:7789")
.SetDmtpOption(new DmtpOption()
{
VerifyToken = "Channel"
})
.SetSendTimeout(0)
.ConfigureContainer(a =>
{
a.AddConsoleLogger();
})
.ConfigurePlugins(a =>
{
})
.BuildClientAsync<TcpDmtpClient>();//相当于创建客户端并配置,连接

client.Logger.Info("连接成功");

【MyPlugin插件】

internal class MyPlugin : PluginBase, IDmtpCreatedChannelPlugin
{
private readonly ILog m_logger;

public MyPlugin(ILog logger)
{
this.m_logger = logger;
}

public async Task OnDmtpCreatedChannel(IDmtpActorObject client, CreateChannelEventArgs e)
{
if (client.TrySubscribeChannel(e.ChannelId, out var channel))
{
//设定读取超时时间
//channel.Timeout = TimeSpan.FromSeconds(30);
using (channel)
{
this.m_logger.Info("通道开始接收");

//此判断主要是探测是否有Hold操作
while (channel.CanMoveNext)
{
long count = 0;
foreach (var byteBlock in channel)
{
//这里处理数据
count += byteBlock.Length;
this.m_logger.Info($"通道已接收:{count}字节");
}

this.m_logger.Info($"通道接收结束,状态={channel.Status},短语={channel.LastOperationMes},共接收{count / (1048576.0):0.00}Mb字节");
}
}
}

await e.InvokeNext();
}
}

【客户端创建Channel并写入数据】

Channel支持持续写入,且无长度限制。但是需要注意的是,在写入结束时,应该调用终止语句(下文详解),以结束写入并通知接收端。

var count = 1024 * 1;//测试1Gb数据

//1.创建通道,同时支持通道路由和元数据传递
using (var channel = client.CreateChannel())
{
//设置限速
//channel.MaxSpeed = 1024 * 1024;

ConsoleLogger.Default.Info($"通道创建成功,即将写入{count}Mb数据");
var bytes = new byte[1024 * 1024];
for (var i = 0; i < count; i++)
{
//2.持续写入数据
await channel.WriteAsync(bytes);
}

//3.在写入完成后调用终止指令。例如:Complete、Cancel、HoldOn、Dispose等
await channel.CompleteAsync("我完成了");
ConsoleLogger.Default.Info("通道写入结束");
}

【服务器使用插件订阅Channel】

插件的主要作用是处理OnDmtpCreatedChannel事件。

在插件中,我们需要订阅Channel,然后接收Channel的数据,最后执行相应的操作。

不过这并不是必须的,如果你有其他逻辑能处理订阅,那么也可以不使用插件。例如:DmtpRpc大数据传输示例中,就使用了Rpc将创建的ChannelId传递到响应端,然后响应端再处理订阅和读写。

public async Task OnDmtpCreatedChannel(IDmtpActorObject client, CreateChannelEventArgs e)
{
if (client.TrySubscribeChannel(e.ChannelId, out var channel))
{
//设定读取超时时间
//channel.Timeout = TimeSpan.FromSeconds(30);
using (channel)
{
this.m_logger.Info("通道开始接收");

//此判断主要是探测是否有Hold操作
while (channel.CanMoveNext)
{
long count = 0;
foreach (var byteBlock in channel)
{
//这里处理数据
count += byteBlock.Length;
this.m_logger.Info($"通道已接收:{count}字节");
}

this.m_logger.Info($"通道接收结束,状态={channel.Status},短语={channel.LastOperationMes},共接收{count / (1048576.0):0.00}Mb字节");
}
}
}

await e.InvokeNext();
}

4.3 终止语句

Channel的终止语句总共有4个:

  • Complete:完成,表示写入结束,接收端会收到完成的状态。
  • Cancel:取消,表示写入被取消,接收端会收到取消的状态。
  • HoldOn:等待继续,表示写入被中断,但是可以恢复传输,接收端会跳出接收,但是可以重新进入接收。
  • Dispose:释放,表示写入被释放,接收端会收到释放的状态。

其中CompleteCancelDispose均为一次性终止语句,即调用以后,Channel将不可再被使用。如果想再使用Channel,则必须重新创建,重新订阅。

HoldOn为可恢复的终止语句,调用以后,Channel会进入等待状态,接收端会跳出接收,但是可以重新进入接收。这在多段数据传输传输时是有用的。

例如:

下列示例中,我们使用HoldOn来模拟多段数据传输。每次发送10段1024字节的数据,然后调用HoldOn,循环100次。

//HoldOn的使用,主要是解决同一个通道中,多个数据流传输的情况。

//1.创建通道,同时支持通道路由和元数据传递
using (var channel = client.CreateChannel())
{
//设置限速
//channel.MaxSpeed = 1024 * 1024;

ConsoleLogger.Default.Info($"通道创建成功,即将写入");
var bytes = new byte[1024];

for (var i = 0; i < 100; i++)//循环100次
{
for (var j = 0; j < 10; j++)
{
//2.持续写入数据
await channel.WriteAsync(bytes);
}
//3.在某个阶段完成数据传输时,可以调用HoldOn
await channel.HoldOnAsync("等一下下");
}
//4.在写入完成后调用终止指令。例如:Complete、Cancel、HoldOn、Dispose等
await channel.CompleteAsync("我完成了");

ConsoleLogger.Default.Info("通道写入结束");
}

这样接收端会收到100次10段1024字节的数据。

internal class MyPlugin : PluginBase, IDmtpCreatedChannelPlugin
{
...

public async Task OnDmtpCreatedChannel(IDmtpActorObject client, CreateChannelEventArgs e)
{
if (client.TrySubscribeChannel(e.ChannelId, out var channel))
{
//设定读取超时时间
//channel.Timeout = TimeSpan.FromSeconds(30);
using (channel)
{
this.m_logger.Info("通道开始接收");

//此判断主要是探测是否有Hold操作
while (channel.CanMoveNext)
{
long count = 0;
foreach (var byteBlock in channel)
{
//这里处理数据
count += byteBlock.Length;
this.m_logger.Info($"通道已接收:{count}字节");
}

this.m_logger.Info($"通道接收结束,状态={channel.Status},短语={channel.LastOperationMes},共接收{count / (1048576.0):0.00}Mb字节");
}
}
}

await e.InvokeNext();
}
}

4.3 终止语句短语

我们在调用终止指令时,同时可以传递一个字符串短语。

例如:

await channel.CompleteAsync("我完成了");

4.4 传输限速

传输限速,可以限制Channel的传输速度。

限速操作仅可以在发送端设置。

//设置限速
channel.MaxSpeed = 1024 * 1024;

4.5 传输超时

Channel是基于写入和读取的异步机制。如果写入方迟迟没有数据写入,那读取方可能在一段时间的等待后超时。

或者如果读取方迟迟没有数据读取,那写入方在写入一部分数据后,会发生拥塞,如果时间较长,也会发生等待超时。

当然我们可以设置超时时间:

//设定读取超时时间
channel.Timeout = TimeSpan.FromSeconds(30);

4.6 注意事项

  1. Channel的传输是有序的。
  2. Channel支持所有的Dmtp组件,但要求是该组件必须基于可靠协议,例如:UdpDmtp则不支持。
  3. Channel拥有自己的拥塞控制,当发生拥塞的时候,不会阻塞底层协议。但是应该也要避免发生拥塞。
  4. Channel在使用完成后,必须显示释放,所以建议使用using关键字。

Channel示例Demo

五、使用心跳

Dmtp组件自带了Ping(或者PingAsync)方法。这个方法强制要求被Ping的一方无条件响应的。

所以,如果Ping返回true,则说明对方必在线。但是如果返回false,则不一定代表对方不在线。因为有可能是Ping超时,或者当前传输链路正在忙。

总之,使用者可以在业务中调用这个方法,来检测通讯是否通畅。

同时,库中基于此方法,封装了一个可用心跳插件。

其中可以配置心跳间隔最大失败次数

.ConfigurePlugins(a =>
{
a.UseDmtpHeartbeat()
.SetTick(TimeSpan.FromSeconds(3))
.SetMaxFailCount(3);
})
提示

Ping可以由客户端或服务器任意一端发起,然后要求对方回应。但是一般我们建议由客户端发起。由服务器响应即可。如果有特殊要求,则自行解决即可。

六、使用断线重连

.ConfigurePlugins(a =>
{
a.Add<MyPlugin>();

//使用重连
a.UseDmtpReconnection<TcpDmtpClient>()
.UsePolling(TimeSpan.FromSeconds(3))//使用轮询,每3秒检测一次
.SetActionForCheck(async (c, i) =>//重新定义检活策略
{
//方法1,直接判断是否在握手状态。使用该方式,最好和心跳插件配合使用。因为如果直接断网,则检测不出来
//await Task.CompletedTask;//消除Task
//return c.Online;//判断是否在握手状态

//方法2,直接ping,如果true,则客户端必在线。如果false,则客户端不一定不在线,原因是可能当前传输正在忙
if (await c.PingAsync())
{
return true;
}
//返回false时可以判断,如果最近活动时间不超过3秒,则猜测客户端确实在忙,所以跳过本次重连
else if (DateTime.Now - c.GetLastActiveTime() < TimeSpan.FromSeconds(3))
{
return null;
}
//否则,直接重连。
else
{
return false;
}
});
})

Dmtp基础功能示例Demo

DmtpChannel示例Demo