原始适配器
定义
命名空间:TouchSocket.Core
程序集:TouchSocket.Core.dll
一、说明
原始适配器则是直接从SingleStreamDataHandlingAdapter(或者NormalDataHandlingAdapter)
继承,能够在第一时间,第一手接触到流式数据。可以自定实现数据的继续投递方式。但一般实现算法比较困难,因为所考虑的情况比较多。
例如:假设如下数据格式。
- 第1个字节表示整个数据长度(包括数据类型和指令类型)
- 第2字节表示数据类型。
- 第3字节表示指令类型。
- 后续字节表示其他数据。
其次,希望在发送时,只传入数据类型,指令类型和其他数据,而数据长度则由适配器自行封装。最后,希望在接收端每次能接收到一个完整的数据。
二、特点
- 能够第一时间,第一手接触到流式数据
- 希望能自定义投递数据的方式
- 本身不具备处理数据粘分包的能力,需要自己实现。
三、实现适配器
首先,创建类,继承自SingleStreamDataHandlingAdapter
,然后实现对应属性,及方法。
然后,从原生适配器解封数据,需要考虑的情况比较多。在本示例中,需要考虑以下情况:
- 一次刚好接收一个数据。
- 一次刚好接收了多个数据。
- 一次接收了多个数据,但最后一个数据不完整。
- 一次未接收完一个数据。
所以,情况比较复杂,所以就必须自己做接收数据的缓存容器。
具体代码如下:
internal class MyDataHandleAdapter : SingleStreamDataHandlingAdapter
{
/// <summary>
/// 包剩余长度
/// </summary>
private byte m_surPlusLength;
/// <summary>
/// 临时包,此包仅当前实例储存
/// </summary>
private ByteBlock m_tempByteBlock;
public override bool CanSendRequestInfo => false;
public override bool CanSplicingSend => false;
protected override async Task PreviewReceivedAsync(ByteBlock byteBlock)
{
//收到从原始流式数据。
var buffer = byteBlock.TotalMemory.GetArray().Array;
var r = byteBlock.Length;
if (this.m_tempByteBlock == null)//如果没有临时包,则直接分包。
{
await this.SplitPackageAsync(buffer, 0, r);
}
else
{
if (this.m_surPlusLength == r)//接收长度正好等于剩余长度,组合完数据以后直接处理数据。
{
this.m_tempByteBlock.Write(new ReadOnlySpan<byte>(buffer, 0, this.m_surPlusLength));
await this.PreviewHandleAsync(this.m_tempByteBlock);
this.m_tempByteBlock = null;
this.m_surPlusLength = 0;
}
else if (this.m_surPlusLength < r)//接收长度大于剩余长度,先组合包,然后处理包,然后将剩下的分包。
{
this.m_tempByteBlock.Write(new ReadOnlySpan<byte>(buffer, 0, this.m_surPlusLength));
await this.PreviewHandleAsync(this.m_tempByteBlock);
this.m_tempByteBlock = null;
await this.SplitPackageAsync(buffer, this.m_surPlusLength, r);
}
else//接收长度小于剩余长度,无法处理包,所以必须先组合包,然后等下次接收。
{
this.m_tempByteBlock.Write(new ReadOnlySpan<byte>(buffer, 0, r));
this.m_surPlusLength -= (byte)r;
}
}
}
protected override async Task PreviewSendAsync(ReadOnlyMemory<byte> memory)
{
//在发送流式数据之前
var length = memory.Length;
if (length > byte.MaxValue)//超长判断
{
throw new OverlengthException("发送数据太长。");
}
//从内存池申请内存块,因为此处数据绝不超过255,所以避免内存池碎片化,每次申请1K
//ByteBlock byteBlock = new ByteBlock(dataLen+1);//实际写法。
using (var byteBlock = new ByteBlock(1024))
{
byteBlock.WriteByte((byte)length);//先写长度
byteBlock.Write(memory.Span);//再写数据
await this.GoSendAsync(byteBlock.Memory);
}
}
protected override async Task PreviewSendAsync(IList<ArraySegment<byte>> transferBytes)
{
//使用拼接模式发送,在发送流式数据之前
var dataLen = 0;
foreach (var item in transferBytes)
{
dataLen += item.Count;
}
if (dataLen > byte.MaxValue)//超长判断
{
throw new OverlengthException("发送数据太长。");
}
//从内存池申请内存块,因为此处数据绝不超过255,所以避免内存池碎片化,每次申请1K
//ByteBlock byteBlock = new ByteBlock(dataLen+1);//实际写法。
using (var byteBlock = new ByteBlock(1024))
{
byteBlock.WriteByte((byte)dataLen);//先写长度
foreach (var item in transferBytes)
{
byteBlock.Write(new ReadOnlySpan<byte>(item.Array, item.Offset, item.Count));//依次写入
}
await this.GoSendAsync(byteBlock.Memory);
}
}
protected override async Task PreviewSendAsync(IRequestInfo requestInfo)
{
//使用对象发送,在发送流式数据之前
if (requestInfo is MyClass myClass)
{
var data = myClass.Data ?? Array.Empty<byte>();
if (data.Length > byte.MaxValue)//超长判断
{
throw new OverlengthException("发送数据太长。");
}
//从内存池申请内存块,因为此处数据绝不超过255,所以避免内存池碎片化,每次申请1K
//ByteBlock byteBlock = new ByteBlock(dataLen+1);//实际写法。
using (var byteBlock = new ByteBlock(1024))
{
byteBlock.WriteByte((byte)data.Length);//先写长度
byteBlock.WriteByte((byte)myClass.DataType);//然后数据类型
byteBlock.WriteByte((byte)myClass.OrderType);//然后指令类型
byteBlock.Write(data);//再写数据
await this.GoSendAsync(byteBlock.Memory);
}
}
}
/// <summary>
/// 处理数据
/// </summary>
/// <param name="byteBlock"></param>
private async Task PreviewHandleAsync(ByteBlock byteBlock)
{
try
{
await this.GoReceivedAsync(byteBlock, null);
}
finally
{
byteBlock.Dispose();//在框架里面将内存块释放
}
}
/// <summary>
/// 分解包
/// </summary>
/// <param name="dataBuffer"></param>
/// <param name="index"></param>
/// <param name="r"></param>
private async Task SplitPackageAsync(byte[] dataBuffer, int index, int r)
{
while (index < r)
{
var length = dataBuffer[index];
var recedSurPlusLength = r - index - 1;
if (recedSurPlusLength >= length)
{
var byteBlock = new ByteBlock(length);
byteBlock.Write(new ReadOnlySpan<byte>(dataBuffer, index + 1, length));
await this.PreviewHandleAsync(byteBlock);
this.m_surPlusLength = 0;
}
else//半包
{
this.m_tempByteBlock = new ByteBlock(length);
this.m_surPlusLength = (byte)(length - recedSurPlusLength);
this.m_tempByteBlock.Write(new ReadOnlySpan<byte>(dataBuffer, index + 1, recedSurPlusLength));
}
index += length + 1;
}
}
}