Dmtp基础功能
定义
命名空间:TouchSocket.Dmtp
程序集:TouchSocket.Dmtp.dll
一、连接
连接验证可以初步保证连接客户端的安全性。框架内部默认使用一个string类型的Token作为验证凭证。当然也允许服务器进行其他验证。具体如下:
1.1 Token验证
在服务器或客户端的配置上,设置VerifyToken,即可实现字符串Token验证。
var config = new TouchSocketConfig()
.SetDmtpOption(new DmtpOption()
{
VerifyToken = "Dmtp"
});
1.2 动态验证
使用插件,实现IDmtpHandshakingPlugin插件。然后可以自行判断一些信息,比如:IP地址、元数据等。
设置元数据。
var config = new TouchSocketConfig()
.SetDmtpOption(new DmtpOption()
{
VerifyToken = "Dmtp",
Metadata=new Metadata().Add("a","a")
});
internal class MyVerifyPlugin : PluginBase,IDmtpHandshakingPlugin
{
public async Task OnDmtpHandshaking(IDmtpActorObject client, DmtpVerifyEventArgs e)
{
if (e.Metadata["a"] != "a")
{
e.IsPermitOperation = false;//不允许连接
e.Message = "元数据不对";//同时返回消息
e.Handled = true;//表示该消息已在此处处理。
return;
}
if (e.Token == "Dmtp")
{
e.IsPermitOperation = true;
e.Handled = true;
return;
}
await e.InvokeNext();
}
}
1.4 跨语言
为使Dmtp服务器支持跨语言,Dmtp在设计之初就预留了跨语言连接的便利性。诚如Dmtp描述所示,其基础数据报文为Flags+Length+Data
。而框架内部的Handshake、Ping、Pong、Close等指令均是采用Json数据格式。但是及时如此,连接时的真正数据,还与其基础协议有关。具体如下:
以连接、操作TcpDmtpService
为例。其基础协议即为tcp
,则使用常规的tcp
客户端即可模拟链接。
using var tcpClient = new TcpClient();//创建一个普通的tcp客户端。
tcpClient.Received = (client, e) =>
{
//此处接收服务器返回的消息
var head = e.ByteBlock.ToArray(0,2);
e.ByteBlock.Seek(2, SeekOrigin.Begin);
var flags = e.ByteBlock.ReadUInt16(bigEndian: true);
var length = e.ByteBlock.ReadInt32(bigEndian: true);
var json = Encoding.UTF8.GetString(e.ByteBlock.Buffer, e.ByteBlock.Pos, e.ByteBlock.CanReadLen);
ConsoleLogger.Default.Info($"收到响应:flags={flags},length={length},json={json}");
return Task.CompletedTask;
};
//开始链接服务器
tcpClient.Connect("127.0.0.1:7789");
//以json的数据方式。
//其中Token、Metadata为连接的验证数据,分别为字符串、字符串字典类型。
//Id则表 示指定的默认id,字符串类型。
//Sign为本次请求的序号,一般在连接时指定一个大于0的任意数字即可。
var json = @"{""Token"":""Dmtp"",""Metadata"":{""a"":""a""},""Id"":null,""Sign"":1}";
//将json转为utf-8编码。
var jsonBytes = Encoding.UTF8.GetBytes(json);
using (var byteBlock = new ByteBlock())
{
//按照Head+Flags+Length+Data的格式。
byteBlock.Write(Encoding.ASCII.GetBytes("dm"));
byteBlock.Write(TouchSocketBitConverter.BigEndian.GetBytes((ushort)1));
byteBlock.Write(TouchSocketBitConverter.BigEndian.GetBytes((int)jsonBytes.Length));
byteBlock.Write(jsonBytes);
tcpClient.Send(byteBlock);
}
await Task.Delay(2000);
接收输出:
收到的Json字符串,会返回服务器最终修改的Token、Metadata。同时还包括分配或指定的Id。Sign会与请求时一致,表示这是同一组请求。Status等于1即为连接成功。其他值则可能在Message表明连接失败的原因。
收到响应:flags=2,length=124,json={"Token":"Dmtp","Metadata":{"a":"a"},"Id":"1","Message":null,"Sign":1,"Status":1}
其他Json请求,包括:
【请求连接】
- Token:连接令牌,字符串类型。
- Metadata:连接元数据,字典类型。
- Id:初始连接Id,字符串类型。
- Sign:请求序号,整数类型,应当每次请求递增。
- Status(仅返回时携带):连接状态,整数类型,为1时成功,其他状态可以获取Message。
- Message(仅返回时携带):连接消息。
{"Token":"Dmtp","Metadata":{"a":"a"},"Id":null,"Sign":1}
【Ping】
- Sign:请求序号,整数类型,应当每次请求递增。
- Status(仅返回时携带):连接状态,整数类型,为1时成功,其他状态可以获取Message。
- Message(仅返回时携带):连接消息。
- Route:是否路由,布尔类型,当TargetId不为空时应设为True。
- SourceId:源Id,字符串类型,当需要Ping其他客户端时,应该将该值设为自身Id,以保证返回的信息能够顺利路由回来。
- TargetId:目标Id,字符串类型,当需要Ping其他客户端时,应该将该值设为目标Id,以保证信息能够顺利送达。
{"Sign":0,"Status":0,"Message":null,"Route":false,"SourceId":null,"TargetId":null}
【Close】
Close报文直接采用utf8编码的字符串。
"close"
DMTP基础功能是比较容易跨语言的,但这并不意味着DMTP所有的功能都支持。其扩展功能,则需要实际的跨平台设计。不过依靠简单的Head+Flags+Length+Data的格式,也能自己实现很多的功能。
二、Id同步
在Dmtp中,存在于服务器的辅助客户端(SocketClient),与远程客户端(Client)是一一对应关系,其Id也完全一致。所以在任意一方修改Id(调用ResetId),都会同时修改远程Id。所以合理使用该操作,可以完成复用Id(重置Id)的需求。
三、发送数据
Dmtp提供协议发送数据,又叫协议扩展功能,就是对现有的Dmtp进行自定义的扩展协议。
使用起来是非常简单的,每个DmtpActor,都实现了Send方法接口。
第一个参数为ushort类型,使用者可以约定任意大于20数值。
client.Send(1000,Encoding.UTF8.GetBytes("RRQM"));
Flags不要使用小于20的,因为框架内部在使用。并且小于100的也最好不要使用,因为可能其他组件也在使用。
在接收方订阅IDmtpReceivedPlugin
,已经包含了协议参数,所以直接自行筛选即可。
internal class MyFlagsPlugin : PluginBase, IDmtpReceivedPlugin
{
public async Task OnDmtpReceived(IDmtpActorObject client, DmtpMessageEventArgs e)
{
if (e.DmtpMessage.ProtocolFlags == 1000)
{
//判断完协议以后,从 e.DmtpMessage.BodyByteBlock可以拿到实际的数据
string msg = e.DmtpMessage.BodyByteBlock.ToString();
return;
}
//flags不满足,调用下一个插件
await e.InvokeNext();
}
}
四、使用Channel发送数据
Channel是DmtpActor的一个传输通道。一般来说,DmtpActor是一个逻辑连接(例如:一个Tcp、Udp、NamedPipe等)。而Channel则是更加细分DmtpActor的一个传输通道。
换言之,一个DmtpActor可以有多个Channel。多个Channel可以同时进行传输。并且数据互不干扰。
4.1 特点
- 每个Channel都有独立的Id,所以多个Channel可以同时工作。
- 每个Channel在被创建后,都支持双向读写。
- 每个Channel都支持自定义限速和读写超时设定。
4.2 使用步骤
- 由一方DmtpActor创建一个Channel,并记住Id。
- 另一方DmtpActor使用这个Id订阅Channel。默认情况下,Channel在被创建时,接收端会触发IDmtpCreateChannelPlugin插件。此时可以直接订阅。
- 发送方使用Channel发送(Write)数据
- 接收方通过订阅的Channel,接收(Read)数据
下列以TcpDmtpClient发送、TcpDmtpService接收为例:
【创建客户端与服务器】
private static TcpDmtpClient GetTcpDmtpClient()
{
var client = new TouchSocketConfig()
.SetRemoteIPHost("127.0.0.1:7789")
.SetDmtpOption(new DmtpOption()
{
VerifyToken = "Channel"
})
.SetSendTimeout(0)
.ConfigureContainer(a =>
{
a.AddConsoleLogger();
})
.ConfigurePlugins(a =>
{
a.Add<MyPlugin>();
})
.BuildWithTcpDmtpClient();
client.Logger.Info("连接成功");
return client;
}
private static TcpDmtpService GetTcpDmtpService()
{
var service = new TcpDmtpService();
var config = new TouchSocketConfig()//配置
.SetListenIPHosts(7789)
.SetSendTimeout(0)
.ConfigureContainer(a =>
{
a.AddConsoleLogger();
})
.ConfigurePlugins(a =>
{
a.Add<MyPlugin>();
})
.SetDmtpOption(new DmtpOption()
{
VerifyToken = "Channel"//连接验证口令。
});
service.Setup(config);
service.Start();
service.Logger.Info("服务器成功启动");
return service;
}
【客户端创建Channel并写入数据】
Channel支持持续写入,且无长度限制。但是需要注意的是,在写入结束时,应该调用终止语句(下文详解),以结束写入并通知接收端。
static void Main(string[] args)
{
var service = GetTcpDmtpService();
var client = GetTcpDmtpClient();
var count = 1024 * 1;//测试1Gb数据
//1.创建通道,同时支持通道路由和元数据传递
using (var channel = client.CreateChannel())
{
//设置限速
//channel.MaxSpeed = 1024 * 1024;
ConsoleLogger.Default.Info($"通道创建成功,即将写入{count}Mb数据");
var bytes = new byte[1024 * 1024];
for (var i = 0; i < count; i++)
{
//2.持续写入数据
channel.Write(bytes);
}
//3.在写入完成后调用终止指令。例如:Complete、Cancel、HoldOn、Dispose等
channel.Complete();
ConsoleLogger.Default.Info("通道写入结束");
}
while (true)
{
BytePool.Default.Clear();
GC.Collect();
Console.ReadKey();
}
}
【服务器使用插件订阅Channel】
插件的主要作用是处理DmtpCreateChannel事件。在插件中,我们需要订阅Channel,然后接收Channel的数据,最后执行相应的操作。
不过这并不是必须的,如果你有其他逻辑能处理订阅,那么也可以不使用插件。例如:DmtpRpc大数据传输示例中,就使用了Rpc将创建的Channel的Id传递到响应端,然后响应端再处理订阅和读写。
internal class MyPlugin : PluginBase, IDmtpCreateChannelPlugin
{
private readonly ILog m_logger;
public MyPlugin(ILog logger)
{
this.m_logger = logger;
}
public async Task OnCreateChannel(IDmtpActorObject client, CreateChannelEventArgs e)
{
if (client.TrySubscribeChannel(e.ChannelId, out var channel))
{
//接收端也可以限速
//channel.MaxSpeed = 1024 * 1024;
using (channel)
{
long count = 0;
this.m_logger.Info("通道开始接收");
foreach (var byteBlock in channel)
{
//这里处理数据
count += byteBlock.Len;
this.m_logger.Info($"通道已接收:{count}字节");
}
this.m_logger.Info($"通道接收结束,状态={channel.Status},共接收{count / (1048576.0):0.00}Mb字节");
}
}
await e.InvokeNext();
}
}
4.3 终止语句
Channel的终止语句总共有4个:
- Complete:完成,表示写入结束,接收端会收到完成的状态。
- Cancel:取消,表示写入被取消,接收端会收到取消的状态。
- HoldOn:等待继续,表示写入被中断,但是可以恢复传输,接收端会跳出接收,但是可以重新进入接收。
- Dispose:释放,表示写入被释放,接收端会收到释放的状态。
其中Complete、Cancel、Dispose均为一次性终止语句,即调用以后,Channel将不可再被使用。如果想再使用Channel,则必须重新创建,重新订阅。
HoldOn为可恢复的终止语句,调用以后,Channel会进入等待状态,接收端会跳出接收,但是可以重新进入接收。这在多段数据传输传输时是有用的。
例如:
下列示例中,我们使用HoldOn来模拟多段数据传输。每次发送10段1024字节的数据,然后调用HoldOn,循环100次。
//1.创建通道,同时支持通道路由和元数据传递
using (var channel = client.CreateChannel())
{
//设置限速
//channel.MaxSpeed = 1024 * 1024;
ConsoleLogger.Default.Info($"通道创建成功,即将写入");
var bytes = new byte[1024];
for (var i = 0; i < 100; i++)//循环100次
{
for (int j = 0; j < 10; j++)
{
//2.持续写入数据
channel.Write(bytes);
}
//3.在某个阶段完成数据传输时,可以调用HoldOn
channel.HoldOn();
}
//4.在写入完成后调用终止指令。例如:Complete、Cancel、HoldOn、Dispose等
channel.Complete();
ConsoleLogger.Default.Info("通道写入结束");
}
这样接收端会收到100次10段1024字节的数据。
internal class MyPlugin : PluginBase, IDmtpCreateChannelPlugin
{
...
public async Task OnCreateChannel(IDmtpActorObject client, CreateChannelEventArgs e)
{
if (client.TrySubscribeChannel(e.ChannelId, out var channel))
{
//接收端也可以限速
//channel.MaxSpeed = 1024 * 1024;
using (channel)
{
this.m_logger.Info("通道开始接收");
//此判断主要是探测是否有Hold操作
while (channel.CanMoveNext)
{
long count = 0;
foreach (var byteBlock in channel)
{
//这里处理数据
count += byteBlock.Len;
this.m_logger.Info($"通道已接收:{count}字节");
}
this.m_logger.Info($"通道接收结束,状态={channel.Status},共接收{count / (1048576.0):0.00}Mb字节");
}
}
}
await e.InvokeNext();
}
}
4.3 终止语句短语
我们在调用终止指令时,同时可以传递一个字符串短语。
例如:
channel.Complete("我完成了");
4.4 传输限速
传输限速,可以限制Channel的传输速度。
限速操作不仅可以在发送端设置,还可以在接收端设置。但是一般建议在发送端设置。因为在接收端设置的话,可能会有一些不稳定因素。
//设置限速
channel.MaxSpeed = 1024 * 1024;
4.5 传输超时
Channel是基于写入和读取的异步机制。如果写入方迟迟没有数据写入,那读取方可能在一段时间的等待后超时。
或者如果读取方迟迟没有数据读取,那写入方在写入一部分数据后,会发生拥塞,如果时间较长,也会发生等待超时。
当然我们可以设置超时时间:
//设定读取超时时间
channel.Timeout = TimeSpan.FromSeconds(30);
4.6 注意事项
- Channel的传输是有序的。
- Channel支持所有的Dmtp组件,但要求是该组件必须基于可靠协议,例如:UdpDmtp则不支持。
- Channel拥有自己的拥塞控制,当发生拥塞的时候,不会阻塞底层协议。但是应该也要避免发生拥塞。
- Channel在使用完成后,必须显示释放,所以建议使用using关键字。
五、使用心跳
Dmtp组件自带了Ping
(或者PingAsync
)方法。这个方法强制要求被Ping
的一方无条件响应的。
所以,如果Ping
返回true
,则说明对方必在线。但是如果返回false
,则不一定代表对方不在线。因为有可能是Ping
超时,或者当前传输链路正在忙。
总之,使用者可以在业务中调用这个方法,来检测通讯是否通畅。
同时,库中基于此方法,封装了一个可用心跳插件。
其中可以配置心跳间隔和最大失败次数。
.ConfigurePlugins(a =>
{
a.UseDmtpHeartbeat()
.SetTick(TimeSpan.FromSeconds(3))
.SetMaxFailCount(3);
})
Ping
可以由客户端或服务器任意一端发起,然后要求对方回应。但是一般我们建议由客户端发起。由服务器响应即可。如果有特殊要求,则自行解决即可。
六、使用断线重连
.ConfigurePlugins(a =>
{
//使用重连
a.UseReconnection<TcpDmtpClient>()
.UsePolling()//使用轮询
.SetActionForCheck(async (c, i) =>//重新定义检活策略
{
//方法1,直接判断是否在握手状态。使用该方式,最好和心跳插件配合使用。因为如果直接断网,则检测不出来
//await Task.CompletedTask;//消除Task
//return c.IsHandshaked;//判断是否在握手状态
//方法2,直接ping,如果true,则客户端必在线。如果false,则客户端不一定不在线,原因是可能当前传输正在忙
if (await c.PingAsync())
{
return true;
}
//返回false时可以判断,如果最近活动时间不超过3秒,则猜测客户端确实在忙,所以跳过本次重连
else if (DateTime.Now - c.GetLastActiveTime() < TimeSpan.FromSeconds(3))
{
return null;
}
//否则,直接重连。
else
{
return false;
}
})
.SetTick(TimeSpan.FromSeconds(3));//每3秒检测一次
})