跳到主要内容
版本:2.1

原始适配器

定义

命名空间:TouchSocket.Core
程序集:TouchSocket.Core.dll

一、说明

原始适配器则是直接从SingleStreamDataHandlingAdapter(或者NormalDataHandlingAdapter)继承,能够在第一时间,第一手接触到流式数据。可以自定实现数据的继续投递方式。但一般实现算法比较困难,因为所考虑的情况比较多。

例如:假设如下数据格式。

  • 第1个字节表示整个数据长度(包括数据类型和指令类型)
  • 第2字节表示数据类型。
  • 第3字节表示指令类型。
  • 后续字节表示其他数据。

其次,希望在发送时,只传入数据类型,指令类型和其他数据,而数据长度则由适配器自行封装。最后,希望在接收端每次能接收到一个完整的数据。

二、特点

  1. 能够第一时间,第一手接触到流式数据
  2. 希望能自定义投递数据的方式
  3. 本身不具备处理数据粘分包的能力,需要自己实现。

三、实现适配器

首先,创建类,继承自SingleStreamDataHandlingAdapter,然后实现对应属性,及方法。

然后,从原生适配器解封数据,需要考虑的情况比较多。在本示例中,需要考虑以下情况:

  1. 一次刚好接收一个数据。
  2. 一次刚好接收了多个数据。
  3. 一次接收了多个数据,但最后一个数据不完整。
  4. 一次未接收完一个数据。

所以,情况比较复杂,所以就必须自己做接收数据的缓存容器。

具体代码如下:

internal class MyDataHandleAdapter : SingleStreamDataHandlingAdapter
{
/// <summary>
/// 包剩余长度
/// </summary>
private byte m_surPlusLength;

/// <summary>
/// 临时包,此包仅当前实例储存
/// </summary>
private ByteBlock m_tempByteBlock;

public override bool CanSendRequestInfo => false;

public override bool CanSplicingSend => false;

protected override async Task PreviewReceivedAsync(ByteBlock byteBlock)
{
//收到从原始流式数据。

var buffer = byteBlock.TotalMemory.GetArray().Array;
var r = byteBlock.Length;
if (this.m_tempByteBlock == null)//如果没有临时包,则直接分包。
{
await this.SplitPackageAsync(buffer, 0, r);
}
else
{
if (this.m_surPlusLength == r)//接收长度正好等于剩余长度,组合完数据以后直接处理数据。
{
this.m_tempByteBlock.Write(new ReadOnlySpan<byte>(buffer, 0, this.m_surPlusLength));
await this.PreviewHandleAsync(this.m_tempByteBlock);
this.m_tempByteBlock = null;
this.m_surPlusLength = 0;
}
else if (this.m_surPlusLength < r)//接收长度大于剩余长度,先组合包,然后处理包,然后将剩下的分包。
{
this.m_tempByteBlock.Write(new ReadOnlySpan<byte>(buffer, 0, this.m_surPlusLength));
await this.PreviewHandleAsync(this.m_tempByteBlock);
this.m_tempByteBlock = null;
await this.SplitPackageAsync(buffer, this.m_surPlusLength, r);
}
else//接收长度小于剩余长度,无法处理包,所以必须先组合包,然后等下次接收。
{
this.m_tempByteBlock.Write(new ReadOnlySpan<byte>(buffer, 0, r));
this.m_surPlusLength -= (byte)r;
}
}
}


protected override async Task PreviewSendAsync(ReadOnlyMemory<byte> memory)
{
//在发送流式数据之前

var length = memory.Length;
if (length > byte.MaxValue)//超长判断
{
throw new OverlengthException("发送数据太长。");
}

//从内存池申请内存块,因为此处数据绝不超过255,所以避免内存池碎片化,每次申请1K
//ByteBlock byteBlock = new ByteBlock(dataLen+1);//实际写法。
using (var byteBlock = new ByteBlock(1024))
{
byteBlock.WriteByte((byte)length);//先写长度
byteBlock.Write(memory.Span);//再写数据
await this.GoSendAsync(byteBlock.Memory);
}
}

protected override async Task PreviewSendAsync(IList<ArraySegment<byte>> transferBytes)
{
//使用拼接模式发送,在发送流式数据之前

var dataLen = 0;
foreach (var item in transferBytes)
{
dataLen += item.Count;
}
if (dataLen > byte.MaxValue)//超长判断
{
throw new OverlengthException("发送数据太长。");
}

//从内存池申请内存块,因为此处数据绝不超过255,所以避免内存池碎片化,每次申请1K
//ByteBlock byteBlock = new ByteBlock(dataLen+1);//实际写法。

using (var byteBlock = new ByteBlock(1024))
{
byteBlock.WriteByte((byte)dataLen);//先写长度
foreach (var item in transferBytes)
{
byteBlock.Write(new ReadOnlySpan<byte>(item.Array, item.Offset, item.Count));//依次写入
}
await this.GoSendAsync(byteBlock.Memory);
}
}

protected override async Task PreviewSendAsync(IRequestInfo requestInfo)
{
//使用对象发送,在发送流式数据之前

if (requestInfo is MyClass myClass)
{
var data = myClass.Data ?? Array.Empty<byte>();
if (data.Length > byte.MaxValue)//超长判断
{
throw new OverlengthException("发送数据太长。");
}

//从内存池申请内存块,因为此处数据绝不超过255,所以避免内存池碎片化,每次申请1K
//ByteBlock byteBlock = new ByteBlock(dataLen+1);//实际写法。
using (var byteBlock = new ByteBlock(1024))
{
byteBlock.WriteByte((byte)data.Length);//先写长度
byteBlock.WriteByte((byte)myClass.DataType);//然后数据类型
byteBlock.WriteByte((byte)myClass.OrderType);//然后指令类型
byteBlock.Write(data);//再写数据
await this.GoSendAsync(byteBlock.Memory);
}
}
}

/// <summary>
/// 处理数据
/// </summary>
/// <param name="byteBlock"></param>
private async Task PreviewHandleAsync(ByteBlock byteBlock)
{
try
{
await this.GoReceivedAsync(byteBlock, null);
}
finally
{
byteBlock.Dispose();//在框架里面将内存块释放
}
}

/// <summary>
/// 分解包
/// </summary>
/// <param name="dataBuffer"></param>
/// <param name="index"></param>
/// <param name="r"></param>
private async Task SplitPackageAsync(byte[] dataBuffer, int index, int r)
{
while (index < r)
{
var length = dataBuffer[index];
var recedSurPlusLength = r - index - 1;
if (recedSurPlusLength >= length)
{
var byteBlock = new ByteBlock(length);
byteBlock.Write(new ReadOnlySpan<byte>(dataBuffer, index + 1, length));
await this.PreviewHandleAsync(byteBlock);
this.m_surPlusLength = 0;
}
else//半包
{
this.m_tempByteBlock = new ByteBlock(length);
this.m_surPlusLength = (byte)(length - recedSurPlusLength);
this.m_tempByteBlock.Write(new ReadOnlySpan<byte>(dataBuffer, index + 1, recedSurPlusLength));
}
index += length + 1;
}
}
}

四、使用适配器

private static async Task<TcpClient> CreateClient()
{
var client = new TcpClient();
//载入配置
await client.SetupAsync(new TouchSocketConfig()
.SetRemoteIPHost("127.0.0.1:7789")
.SetTcpDataHandlingAdapter(() => new MyDataHandleAdapter())
.ConfigureContainer(a =>
{
a.AddConsoleLogger();//添加一个日志注入
}));

await client.ConnectAsync();//调用连接,当连接不成功时,会抛出异常。
client.Logger.Info("客户端成功连接");
return client;
}

private static async Task<TcpService> CreateService()
{
var service = new TcpService();
service.Received = (client, e) =>
{
//从客户端收到信息
var mes = e.ByteBlock.Span.ToString(Encoding.UTF8);
client.Logger.Info($"已从{client.Id}接收到信息:{mes}");
return Task.CompletedTask;
};

await service.SetupAsync(new TouchSocketConfig()//载入配置
.SetListenIPHosts("tcp://127.0.0.1:7789", 7790)//同时监听两个地址
.SetTcpDataHandlingAdapter(() => new MyDataHandleAdapter())
.ConfigureContainer(a =>
{
a.AddConsoleLogger();//添加一个控制台日志注入(注意:在maui中控制台日志不可用)
})
.ConfigurePlugins(a =>
{
//a.Add();//此处可以添加插件
}));
await service.StartAsync();//启动
service.Logger.Info("服务器已启动");
return service;
}

五、可设置参数

属性描述默认值
MaxPackageSize适配器能接收的最大数据包长度1024*1024*1024字节
CanSendRequestInfo是否允许发送IRequestInfo对象false
CanSplicingSend拼接发送false
CacheTimeoutEnable是否启用缓存超时。true
CacheTimeout缓存超时时间。1秒
UpdateCacheTimeWhenRev是否在收到数据时,即刷新缓存时间。当设为true时,将弱化CacheTimeout的作用,只要一直有数据,则缓存不会过期。当设为false时,则在CacheTimeout的时效内。必须完成单个缓存的数据true

本文示例Demo