Dmtp基础功能
定义
命名空间:TouchSocket.Dmtp
程序集:TouchSocket.Dmtp.dll
一、连接
连接验证可以初步保证连接客户端的安全性。框架内部默认使用一个string
类型的Token
作为验证凭证。当然也允许服务器进行其他验证。具体如下:
1.1 Token验证
在服务器或客户端的配置上,设置VerifyToken
,即可实现字符串Token
验证。
var config = new TouchSocketConfig()
.SetDmtpOption(new DmtpOption()
{
VerifyToken = "Dmtp"
});
1.2 动态验证
使用插件,实现IDmtpHandshakingPlugin插件。然后可以自行判断一些信息,例如元数据等。
如果是特定协议的Dmtp
,还可以判断一些特定信息,例如,在TcpDmtp
中,可以判断IP
地址等。
客户端,在连接时,可以设置元数据。
var config = new TouchSocketConfig()
.SetDmtpOption(new DmtpOption()
{
Metadata=new Metadata().Add("a","a")
});
服务端使用插件验证连接信息。
internal class MyVerifyPlugin : PluginBase, IDmtpHandshakingPlugin
{
public async Task OnDmtpHandshaking(IDmtpActorObject client, DmtpVerifyEventArgs e)
{
if (e.Metadata["a"] != "a")
{
e.IsPermitOperation = false;//不允许连接
e.Message = "元数据不对";//同时返回消息
e.Handled = true;//表示该消息已在此处处理。
return;
}
if (client is ITcpDmtpSessionClient sessionClient)
{
//在特定协议的情况下,可以获取特定信息
var ip = sessionClient.IP;
}
if (e.Token == "Dmtp")
{
e.IsPermitOperation = true;
e.Handled = true;
return;
}
await e.InvokeNext();
}
}
1.4 跨语言
为使Dmtp
支持跨语言,Dmtp
在设计之初就预留了跨语言连接的便利性。诚如Dmtp描述所示,其基础数据报文为Head+Flags+Length+Data
。而框架内部的Handshake
、Ping
、Pong
、Close
等指令均是采用Json
数据格式。但是即使如此,连接时的真正数据,还与其基础协议有关。具体如下:
以连接、操作TcpDmtpService
为例。其基础协议即为tcp
,则使用常规的tcp
客户端即可模拟链接。
using var tcpClient = new TcpClient();//创建一个普通的tcp客户端。
tcpClient.Received = (client, e) =>
{
//此处接收服务器返回的消息
var head = e.ByteBlock.ToArray(0, 2);
e.ByteBlock.Seek(2, SeekOrigin.Begin);
var flags = e.ByteBlock.ReadUInt16(EndianType.Big);
var length = e.ByteBlock.ReadInt32(EndianType.Big);
var json = e.ByteBlock.Span.ToString(Encoding.UTF8);
ConsoleLogger.Default.Info($"收到响应:flags={flags},length={length},json={json.Replace("\r\n", string.Empty).Replace(" ", string.Empty)}");
return Task.CompletedTask;
};
//开始链接服务器
await tcpClient.ConnectAsync("127.0.0.1:7789");
//以json的数据方式。
//其中Token、Metadata为连接的验证数据,分别为字符串、字符串字典类型。
//Id则表示指定的默认id,字符串类型。
//Sign为本次请求的序号,一般在连接时指定一个大于0的任意数字即可。
var json = @"{""Token"":""Dmtp"",""Metadata"":{""a"":""a""},""Id"":null,""Sign"":1}";
//将json转为utf-8编码。
var jsonBytes = Encoding.UTF8.GetBytes(json);
using (var byteBlock = new ByteBlock())
{
//按照Head+Flags+Length+Data的格式。
byteBlock.Write(Encoding.ASCII.GetBytes("dm"));
byteBlock.Write(TouchSocketBitConverter.BigEndian.GetBytes((ushort)1));
byteBlock.Write(TouchSocketBitConverter.BigEndian.GetBytes((int)jsonBytes.Length));
byteBlock.Write(jsonBytes);
await tcpClient.SendAsync(byteBlock.Memory);
}
await Task.Delay(2000);
接收输出:
收到的Json
字符串,会返回服务器最终修改的Token
、Metadata
。同时还包括分配或指定的Id
。Sign
会与请求时一致,表示这是同一组请求。Status
等于1
即为连接成功。其他值则可能在Message
表明连接失败的原因。
收到响应:flags=2,length=124,json={"Token":"Dmtp","Metadata":{"a":"a"},"Id":"1","Message":null,"Sign":1,"Status":1}
其他Json
请求,包括:
【请求连接】
- Token:连接令牌,字符串类型。
- Metadata:连接元数据,字典类型。
- Id:初始连接Id,字符串类型。
- Sign:请求序号,整数类型,应当每次请求递增。
- Status(仅返回时携带):连接状态,整数类型,为1时成功,其他状态可以获取Message。
- Message(仅返回时携带):连接消息。
{"Token":"Dmtp","Metadata":{"a":"a"},"Id":null,"Sign":1}
【Ping】
- Sign:请求序号,整数类型,应当每次请求递增。
- Status(仅返回时携带):连接状态,整数类型,为1时成功,其他状态可以获取Message。
- Message(仅返回时携带):连接消息。
- Route:是否路由,布尔类型,当TargetId不为空时应设为True。
- SourceId:源Id,字符串类型,当需要Ping其他客户端时,应该将该值设为自身Id,以保证返回的信息能够顺 利路由回来。
- TargetId:目标Id,字符串类型,当需要Ping其他客户端时,应该将该值设为目标Id,以保证信息能够顺利送达。
{"Sign":0,"Status":0,"Message":null,"Route":false,"SourceId":null,"TargetId":null}
【Close】
Close
报文直接采用utf8编码的字符串。
"close"
DMTP基础功能是比较容易跨语言的,但这并不意味着DMTP所有的功能都支持。其扩展功能,则需要实际的跨平台设计。不过依靠简单的Head+Flags+Length+Data的格式,也能自己实现很多的功能。
二、Id同步
在Dmtp中,存在于服务器的辅助客户端(SessionClient),与远程客户端(Client)是一一对应关系,其Id也完全一致。所以在任意一方修改Id(调用ResetId),都会同时修改远程Id。所以合理使用该操作,可以完成复用Id(重置Id)的需求。
三、发送数据
Dmtp提供协议发送数据,又叫协议扩展功能,就是对现有的Dmtp进行自定义的扩展协议。
使用起来是非常简单的,每个DmtpActor,都实现了Send方法接口。
第一个参数为ushort
类型,使用者可以约定任意大于20数值。
client.SendAsync(1000,Encoding.UTF8.GetBytes("RRQM"));
Flags
不要使用小于20的,因为框架内部在使用。并且小于100的也最好不要使用,因为可能其他组件也在使用。
在接收方订阅IDmtpReceivedPlugin
,已经包含了协议参数,所以直接自行筛选即可。
internal class MyFlagsPlugin : PluginBase, IDmtpReceivedPlugin
{
public async Task OnDmtpReceived(IDmtpActorObject client, DmtpMessageEventArgs e)
{
if (e.DmtpMessage.ProtocolFlags == 1000)
{
//判断完协议以后,从 e.DmtpMessage.BodyByteBlock可以拿到实际的数据
string msg = e.DmtpMessage.BodyByteBlock.ToString();
return;
}
//flags不满足,调用下一个插件
await e.InvokeNext();
}
}
四、使用Channel发送数据
Channel
是DmtpActor
的一个传输通道。一般来说,DmtpActor
是一个逻辑连接(例如:一个Tcp
、Udp
、NamedPipe
等)。而Channel
则是更加细分DmtpActor
的一个传输通道。
换言之,一个DmtpActor
可以有多个Channel。多个Channel
可以同时进行传输。并且数据互不干扰。
4.1 特点
- 每个
Channel
都有独立的Id,所以多个Channel
可以同时工作。 - 每个
Channel
在被创建后,都支持双向读写。 - 每个
Channel
都支持自定义限速和读写超时设定。
4.2 使用步骤
- 由一方
DmtpActor
创建一个Channel
,并记住Id。 - 另一方
DmtpActor
使用这个Id订阅Channel
。默认情况下,Channel
在被创建时,接收端会触发IDmtpCreatedChannelPlugin
插件。此时可以直接订阅。 - 发送方使用
Channel
发送(WriteAsync
)数据 - 接收方通过订阅的
Channel
,接收(Read)数据
下列以TcpDmtpClient
发送、TcpDmtpService
接收为 例:
【创建服务器】
var service = new TcpDmtpService();
var config = new TouchSocketConfig()//配置
.SetListenIPHosts(7789)
.ConfigureContainer(a =>
{
a.AddConsoleLogger();
})
.ConfigurePlugins(a =>
{
a.Add<MyPlugin>();
})
.SetDmtpOption(new DmtpOption()
{
VerifyToken = "Channel"//连接验证口令。
});
await service.SetupAsync(config);
await service.StartAsync();
service.Logger.Info("服务器成功启动");
【创建客户端】
var client = await new TouchSocketConfig()
.SetRemoteIPHost("127.0.0.1:7789")
.SetDmtpOption(new DmtpOption()
{
VerifyToken = "Channel"
})
.SetSendTimeout(0)
.ConfigureContainer(a =>
{
a.AddConsoleLogger();
})
.ConfigurePlugins(a =>
{
})
.BuildClientAsync<TcpDmtpClient>();//相当于创建客户端并配置,连接
client.Logger.Info("连接成功");
【MyPlugin插件】
internal class MyPlugin : PluginBase, IDmtpCreatedChannelPlugin
{
private readonly ILog m_logger;
public MyPlugin(ILog logger)
{
this.m_logger = logger;
}
public async Task OnDmtpCreatedChannel(IDmtpActorObject client, CreateChannelEventArgs e)
{
if (client.TrySubscribeChannel(e.ChannelId, out var channel))
{
//设定读取超时时间
//channel.Timeout = TimeSpan.FromSeconds(30);
using (channel)
{
this.m_logger.Info("通道开始接收");
//此判断主要是探测是否有Hold操作
while (channel.CanMoveNext)
{
long count = 0;
foreach (var byteBlock in channel)
{
//这里处理数据
count += byteBlock.Length;
this.m_logger.Info($"通道已接收:{count}字节");
}
this.m_logger.Info($"通道接收结束,状态={channel.Status},短语={channel.LastOperationMes},共接收{count / (1048576.0):0.00}Mb字节");
}
}
}
await e.InvokeNext();
}
}
【客户端创建Channel并写入数据】
Channel
支持持续写入,且无长度限制。但是需要注意的是,在写入结束时,应该调用终止语句(下文详解),以结束写入并通知接收端 。
var count = 1024 * 1;//测试1Gb数据
//1.创建通道,同时支持通道路由和元数据传递
using (var channel = client.CreateChannel())
{
//设置限速
//channel.MaxSpeed = 1024 * 1024;
ConsoleLogger.Default.Info($"通道创建成功,即将写入{count}Mb数据");
var bytes = new byte[1024 * 1024];
for (var i = 0; i < count; i++)
{
//2.持续写入数据
await channel.WriteAsync(bytes);
}
//3.在写入完成后调用终止指令。例如:Complete、Cancel、HoldOn、Dispose等
await channel.CompleteAsync("我完成了");
ConsoleLogger.Default.Info("通道写入结束");
}
【服务器使用插件订阅Channel】
插件的主要作用是处理OnDmtpCreatedChannel
事件。
在插件中,我们需要订阅Channel,然后接收Channel的数据,最后执行相应的操作。
不过这并不是必须的,如果你有其他逻辑能处理订阅,那么也可以不使用插件。例如:DmtpRpc大数据传输示例中,就使用了Rpc
将创建的Channel
的Id
传递到响应端,然后响应端再处理订阅和读写。
public async Task OnDmtpCreatedChannel(IDmtpActorObject client, CreateChannelEventArgs e)
{
if (client.TrySubscribeChannel(e.ChannelId, out var channel))
{
//设定读取超时时间
//channel.Timeout = TimeSpan.FromSeconds(30);
using (channel)
{
this.m_logger.Info("通道开始接收");
//此判断主要是探测是否有Hold操作
while (channel.CanMoveNext)
{
long count = 0;
foreach (var byteBlock in channel)
{
//这里处理数据
count += byteBlock.Length;
this.m_logger.Info($"通道已接收:{count}字节");
}
this.m_logger.Info($"通道接收结束,状态={channel.Status},短语={channel.LastOperationMes},共接收{count / (1048576.0):0.00}Mb字节");
}
}
}
await e.InvokeNext();
}
4.3 终止语句
Channel的终止语句总共有4个:
- Complete:完成,表示写入结束,接收端会收到完成的状态。
- Cancel:取消,表示写入被取消,接收端会收到取消的状态。
- HoldOn:等待继续,表示写入被中断,但是可以恢复传输,接收端会跳出接收,但是可以重新进入接收。
- Dispose:释放,表示写入被释放,接收端会收到释放的状态。
其中Complete
、Cancel
、Dispose
均为一次性终止语句,即调用以后,Channel
将不可再被使用。如果想再使用Channel
,则必须重新创建,重新订阅。
HoldOn
为可恢复的终止语句,调用以后,Channel
会进入等待状态,接收端会跳出接收,但是可以重新进入接收。这在多段数据传输传输时是有用的。
例如:
下列示例中,我们使用HoldOn
来模拟多段数据传输。每次发送10段1024字节的数据,然后调用HoldOn
,循环100次。
//HoldOn的使用,主要是解决同一个通道中,多个数据流传输的情况。
//1.创建通道,同时支持通道路由和元数据传递
using (var channel = client.CreateChannel())
{
//设置限速
//channel.MaxSpeed = 1024 * 1024;
ConsoleLogger.Default.Info($"通道创建成功,即将写入");
var bytes = new byte[1024];
for (var i = 0; i < 100; i++)//循环100次
{
for (var j = 0; j < 10; j++)
{
//2.持续写入数据
await channel.WriteAsync(bytes);
}
//3.在某个阶段完成数据传输时,可以调用HoldOn
await channel.HoldOnAsync("等一下下");
}
//4.在写入完成后调用终止指令。例如:Complete、Cancel、HoldOn、Dispose等
await channel.CompleteAsync("我完成了");
ConsoleLogger.Default.Info("通道写入结束");
}
这样接收端会收到100次10段1024字节的数据。
internal class MyPlugin : PluginBase, IDmtpCreatedChannelPlugin
{
...
public async Task OnDmtpCreatedChannel(IDmtpActorObject client, CreateChannelEventArgs e)
{
if (client.TrySubscribeChannel(e.ChannelId, out var channel))
{
//设定读取超时时间
//channel.Timeout = TimeSpan.FromSeconds(30);
using (channel)
{
this.m_logger.Info("通道开始接收");
//此判断主要是探测是否有Hold操作
while (channel.CanMoveNext)
{
long count = 0;
foreach (var byteBlock in channel)
{
//这里处理数据
count += byteBlock.Length;
this.m_logger.Info($"通道已接收:{count}字节");
}
this.m_logger.Info($"通道接收结束,状态={channel.Status},短语={channel.LastOperationMes},共接收{count / (1048576.0):0.00}Mb字节");
}
}
}
await e.InvokeNext();
}
}
4.3 终止语句短语
我们在调用终止指令时,同时可以传递一个字符串短语。
例如:
await channel.CompleteAsync("我完成了");
4.4 传输限速
传输限速,可以限制Channel
的传输速度。
限速操作仅可以在发送端设置。
//设置限速
channel.MaxSpeed = 1024 * 1024;
4.5 传输超时
Channel
是基于写入和读取的异步机制。如果写入方迟迟没有数据写入,那读取方可能在一段时间的等待后超时。
或者如果读取方迟迟没有数据读取,那写入方在写入一部分数据后,会发生拥塞,如果时间较长,也会发生等待超时。
当然我们可以设置超时时间:
//设定读取超时时间
channel.Timeout = TimeSpan.FromSeconds(30);
4.6 注意事项
Channel
的传输是有序的。Channel
支持所有的Dmtp
组件,但要求是该组件必须基于可靠协议,例如:UdpDmtp
则不支持。Channel
拥有自己的拥塞控制,当发生拥塞的时候,不会阻塞底层协议。但是应该也要避免发生拥塞。Channel
在使用完成后,必须显示释放,所以建议使用using关键字。
五、使用心跳
Dmtp
组件自带了Ping
(或者PingAsync
)方法。这个方法强制要求被Ping
的一方无条件响应的。
所以,如果Ping
返回true
,则说明对方必在线。但是如果返回false
,则不一定代表对方不在线。因为有可能是Ping
超时,或者当前传输链路正在忙。
总之,使用者可以在业务中调用这个方法,来检测通讯是否通畅。
同时,库中基于此方法,封装了一个可用心跳插件。
其中可以配置心跳间隔和最大失败次数。
.ConfigurePlugins(a =>
{
a.UseDmtpHeartbeat()
.SetTick(TimeSpan.FromSeconds(3))
.SetMaxFailCount(3);
})
Ping
可以由客户端或服务器任意一端发起,然后要求对方回应。但是一般我们建议由客户端发起。由服务器响应即可。如果有特殊要求,则自行解决即可。
六、使用断线重连
.ConfigurePlugins(a =>
{
a.Add<MyPlugin>();
//使用重连
a.UseDmtpReconnection<TcpDmtpClient>()
.UsePolling(TimeSpan.FromSeconds(3))//使用轮询,每3秒检测一次
.SetActionForCheck(async (c, i) =>//重新定义检活策略
{
//方法1,直接判断是否在握手状态。使用该方式,最好和心跳插件配合使用。因为如果直接断网,则检测不出来
//await Task.CompletedTask;//消除Task
//return c.Online;//判断是否在握手状态
//方法2,直接ping,如果true,则客户端必在线。如果false,则客户端不一定不在线,原因是可能当前传输正在忙
if (await c.PingAsync())
{
return true;
}
//返回false时可以判断,如果最近活动时间不超过3秒,则猜测客户端确实在忙,所以跳过本次重连
else if (DateTime.Now - c.GetLastActiveTime() < TimeSpan.FromSeconds(3))
{
return null;
}
//否则,直接重连。
else
{
return false;
}
});
})