跳到主要内容
版本:2.0.0

原始适配器

定义

命名空间:TouchSocket.Core
程序集:TouchSocket.Core.dll

一、说明

原始适配器则是直接从SingleStreamDataHandlingAdapter(或者NormalDataHandlingAdapter)继承,能够在第一时间,第一手接触到流式数据。可以自定实现数据的继续投递方式。但一般实现算法比较困难,因为所考虑的情况比较多。

例如:假设如下数据格式。

  • 第1个字节表示整个数据长度(包括数据类型和指令类型)
  • 第2字节表示数据类型。
  • 第3字节表示指令类型。
  • 后续字节表示其他数据。

其次,希望在发送时,只传入数据类型,指令类型和其他数据,而数据长度则由适配器自行封装。最后,希望在接收端每次能接收到一个完整的数据。

二、特点

  1. 能够第一时间,第一手接触到流式数据
  2. 希望能自定义投递数据的方式
  3. 本身不具备处理数据粘分包的能力,需要自己实现。

三、实现适配器

首先,创建类,继承自SingleStreamDataHandlingAdapter,然后实现对应属性,及方法。

然后,从原生适配器解封数据,需要考虑的情况比较多。在本示例中,需要考虑以下情况:

  1. 一次刚好接收一个数据。
  2. 一次刚好接收了多个数据。
  3. 一次接收了多个数据,但最后一个数据不完整。
  4. 一次未接收完一个数据。

所以,情况比较复杂,所以就必须自己做接收数据的缓存容器。

具体代码如下:

internal class MyDataHandleAdapter : SingleStreamDataHandlingAdapter
{
/// <summary>
/// 包剩余长度
/// </summary>
private byte m_surPlusLength;

/// <summary>
/// 临时包,此包仅当前实例储存
/// </summary>
private ByteBlock m_tempByteBlock;

public override bool CanSendRequestInfo => false;

public override bool CanSplicingSend => false;

protected override void PreviewReceived(ByteBlock byteBlock)
{
//收到从原始流式数据。

var buffer = byteBlock.Buffer;
var r = byteBlock.Len;
if (this.m_tempByteBlock == null)//如果没有临时包,则直接分包。
{
this.SplitPackage(buffer, 0, r);
}
else
{
if (this.m_surPlusLength == r)//接收长度正好等于剩余长度,组合完数据以后直接处理数据。
{
this.m_tempByteBlock.Write(buffer, 0, this.m_surPlusLength);
this.PreviewHandle(this.m_tempByteBlock);
this.m_tempByteBlock = null;
this.m_surPlusLength = 0;
}
else if (this.m_surPlusLength < r)//接收长度大于剩余长度,先组合包,然后处理包,然后将剩下的分包。
{
this.m_tempByteBlock.Write(buffer, 0, this.m_surPlusLength);
this.PreviewHandle(this.m_tempByteBlock);
this.m_tempByteBlock = null;
this.SplitPackage(buffer, this.m_surPlusLength, r);
}
else//接收长度小于剩余长度,无法处理包,所以必须先组合包,然后等下次接收。
{
this.m_tempByteBlock.Write(buffer, 0, r);
this.m_surPlusLength -= (byte)r;
}
}
}

protected override void PreviewSend(byte[] buffer, int offset, int length)
{
//在发送流式数据之前

if (length > byte.MaxValue)//超长判断
{
throw new OverlengthException("发送数据太长。");
}

//从内存池申请内存块,因为此处数据绝不超过255,所以避免内存池碎片化,每次申请1K
//ByteBlock byteBlock = new ByteBlock(dataLen+1);//实际写法。
using (var byteBlock = new ByteBlock(1024))
{
byteBlock.Write((byte)length);//先写长度
byteBlock.Write(buffer, offset, length);//再写数据
this.GoSend(byteBlock.Buffer, 0, byteBlock.Len);
}
}

protected override void PreviewSend(IList<ArraySegment<byte>> transferBytes)
{
//使用拼接模式发送,在发送流式数据之前
}

protected override void PreviewSend(IRequestInfo requestInfo)
{
//使用对象发送,在发送流式数据之前
}

/// <summary>
/// 处理数据
/// </summary>
/// <param name="byteBlock"></param>
private void PreviewHandle(ByteBlock byteBlock)
{
try
{
this.GoReceived(byteBlock, null);
}
finally
{
byteBlock.Dispose();//在框架里面将内存块释放
}
}

/// <summary>
/// 分解包
/// </summary>
/// <param name="dataBuffer"></param>
/// <param name="index"></param>
/// <param name="r"></param>
private void SplitPackage(byte[] dataBuffer, int index, int r)
{
while (index < r)
{
var length = dataBuffer[index];
var recedSurPlusLength = r - index - 1;
if (recedSurPlusLength >= length)
{
var byteBlock = new ByteBlock(length);
byteBlock.Write(dataBuffer, index + 1, length);
this.PreviewHandle(byteBlock);
this.m_surPlusLength = 0;
}
else//半包
{
this.m_tempByteBlock = new ByteBlock(length);
this.m_surPlusLength = (byte)(length - recedSurPlusLength);
this.m_tempByteBlock.Write(dataBuffer, index + 1, recedSurPlusLength);
}
index += length + 1;
}
}
}

四、使用适配器

private static TcpClient CreateClient()
{
var client = new TcpClient();
//载入配置
client.Setup(new TouchSocketConfig()
.SetRemoteIPHost("127.0.0.1:7789")
.SetTcpDataHandlingAdapter(() => new MyDataHandleAdapter())
.ConfigureContainer(a =>
{
a.AddConsoleLogger();//添加一个日志注入
}));

client.Connect();//调用连接,当连接不成功时,会抛出异常。
client.Logger.Info("客户端成功连接");
return client;
}

private static TcpService CreateService()
{
var service = new TcpService();
service.Received = (client, e) =>
{
//从客户端收到信息
var mes = Encoding.UTF8.GetString(e.ByteBlock.Buffer, 0, e.ByteBlock.Len);//注意:数据长度是byteBlock.Len
client.Logger.Info($"已从{client.Id}接收到信息:{mes}");
return EasyTask.CompletedTask;
};

service.Setup(new TouchSocketConfig()//载入配置
.SetListenIPHosts("tcp://127.0.0.1:7789", 7790)//同时监听两个地址
.SetTcpDataHandlingAdapter(() => new MyDataHandleAdapter())
.ConfigureContainer(a =>
{
a.AddConsoleLogger();//添加一个控制台日志注入(注意:在maui中控制台日志不可用)
})
.ConfigurePlugins(a =>
{
//a.Add();//此处可以添加插件
}));
service.Start();//启动
service.Logger.Info("服务器已启动");
return service;
}

五、可设置参数

属性描述默认值
MaxPackageSize适配器能接收的最大数据包长度1024*1024*1024字节
CanSendRequestInfo是否允许发送IRequestInfo对象false
CanSplicingSend拼接发送false
CacheTimeoutEnable是否启用缓存超时。true
CacheTimeout缓存超时时间。1秒
UpdateCacheTimeWhenRev是否在收到数据时,即刷新缓存时间。当设为true时,将弱化CacheTimeout的作用,只要一直有数据,则缓存不会过期。当设为false时,则在CacheTimeout的时效内。必须完成单个缓存的数据true

六、进阶用法

6.1 拼接发送

在上述案例中,发送数据时应当传入数据类型指令类型其他数据三个有效值,而在框架中,发送函数仅有Send(和重载),这无疑需要我们自己封装其他方法

假设以下情况需要实现:

数据类型有两种,分别为Up(1),Down(0)。指令类型有两种,分别为Go(1)、Hold(0)。数据类型和指令类型可以任意组合,且均可携带其他数据。

面对上述情况,我们可以封装以下函数使用:

public class MySocketClient : SimpleSocketClient
{
public void Up_Go_Send(byte[] data)
{
ByteBlock byteBlock = new ByteBlock();//内存池实现,可以直接new byte[].
byteBlock.Write((byte)1);
byteBlock.Write((byte)1);
byteBlock.Write(data);
try
{
this.Send(byteBlock);
}
finally
{
byteBlock.Dispose();
}
}
public void Down_Go_Send(byte[] data)
{
ByteBlock byteBlock = new ByteBlock();//内存池实现,可以直接new byte[].
byteBlock.Write((byte)0);
byteBlock.Write((byte)1);
byteBlock.Write(data);
try
{
this.Send(byteBlock);
}
finally
{
byteBlock.Dispose();
}
}
public void Up_Hold_Send(byte[] data)
{
ByteBlock byteBlock = new ByteBlock();//内存池实现,可以直接new byte[].
byteBlock.Write((byte)1);
byteBlock.Write((byte)0);
byteBlock.Write(data);
try
{
this.Send(byteBlock);
}
finally
{
byteBlock.Dispose();
}
}
public void Down_Hold_Send(byte[] data)
{
ByteBlock byteBlock = new ByteBlock();//内存池实现,可以直接new byte[].
byteBlock.Write((byte)0);
byteBlock.Write((byte)0);
byteBlock.Write(data);
try
{
this.Send(byteBlock);
}
finally
{
byteBlock.Dispose();
}
}
}

为什么要拼接发送??

在示例代码中不难看出,封装的函数将发送数据进行了Write操作(相当于Copy),这无疑是消耗性能的。只是在该案例中,复制的数据最大为255,感觉优化效果甚微,倘若我们需要发送的数据是1Mb,那就相当于在封装数据时,因为前两个字节的存在而复制1Mb的数据(冤死了),然后在适配器中还需要因为数据包头,再复制一次。

优化

所以我们在封装时,可以使用分片发送,但是同时也需要适配器支持。不然内部会出错。

protected override void PreviewSend(IList<ArraySegment<byte>> transferBytes)
{
int dataLen = 0;
foreach (var item in transferBytes)
{
dataLen += item.Count;
}
if (dataLen > byte.MaxValue)//超长判断
{
throw new OverlengthException("发送数据太长。");
}

//从内存池申请内存块,因为此处数据绝不超过255,所以避免内存池碎片化,每次申请1K
//ByteBlock byteBlock = new ByteBlock(dataLen+1);//实际写法。

using (ByteBlock byteBlock = new ByteBlock(1024))
{
byteBlock.Write((byte)dataLen);//先写长度
foreach (var item in transferBytes)
{
byteBlock.Write(item.Array, item.Offset, item.Count);//依次写入
}
this.GoSend(byteBlock.Buffer, 0, byteBlock.Len);
}
}

重新封装函数。。。

public void Up_Go_SplicingSend(byte[] data)
{
List<TransferByte> transferBytes = new List<TransferByte>();
transferBytes.Add(new TransferByte(new byte[] { 1}));
transferBytes.Add(new TransferByte(new byte[] { 1}));
transferBytes.Add(new TransferByte(data));
this.Send(transferBytes);
}
public void Down_Go_SplicingSend(byte[] data)
{
List<TransferByte> transferBytes = new List<TransferByte>();
transferBytes.Add(new TransferByte(new byte[] { 0 }));
transferBytes.Add(new TransferByte(new byte[] { 1 }));
transferBytes.Add(new TransferByte(data));
this.Send(transferBytes);
}
public void Up_Hold_SplicingSend(byte[] data)
{
List<TransferByte> transferBytes = new List<TransferByte>();
transferBytes.Add(new TransferByte(new byte[] { 1 }));
transferBytes.Add(new TransferByte(new byte[] { 0 }));
transferBytes.Add(new TransferByte(data));
this.Send(transferBytes);
}
public void Down_Hold_SplicingSend(byte[] data)
{
List<TransferByte> transferBytes = new List<TransferByte>();
transferBytes.Add(new TransferByte(new byte[] { 0 }));
transferBytes.Add(new TransferByte(new byte[] { 0 }));
transferBytes.Add(new TransferByte(data));
this.Send(transferBytes);
}

6.2 对象发送

有时候,我们希望能够直接把一个数据对象直接进行发送,可是流式数据包只能发送byte[],那么我们如何发送一个对象呢?

可以重写实现PreviewSend(IRequestInfo requestInfo)函数。实现将对象转为byte[],然后调用GoSend发送。

首先声明对象

class MyClass : IRequestInfo
{
public OrderType OrderType { get; set; }
public DataType DataType { get; set; }

public byte[] Data { get; set; }
}

enum DataType : byte
{
Down = 0,
Up = 1
}

enum OrderType : byte
{
Hold = 0,
Go = 1
}

然后重写PreviewSend(IRequestInfo requestInfo)函数

protected override void PreviewSend(IRequestInfo requestInfo)
{
//使用对象发送,在发送流式数据之前

if (requestInfo is MyClass myClass)
{
var data = myClass.Data ?? Array.Empty<byte>();
if (data.Length > byte.MaxValue)//超长判断
{
throw new OverlengthException("发送数据太长。");
}

//从内存池申请内存块,因为此处数据绝不超过255,所以避免内存池碎片化,每次申请1K
//ByteBlock byteBlock = new ByteBlock(dataLen+1);//实际写法。
using (var byteBlock = new ByteBlock(1024))
{
byteBlock.Write((byte)data.Length);//先写长度
byteBlock.Write((byte)myClass.DataType);//然后数据类型
byteBlock.Write((byte)myClass.OrderType);//然后指令类型
byteBlock.Write(data);//再写数据
this.GoSend(byteBlock.Buffer, 0, byteBlock.Len);
}
}
}
提示

发送对象功能,可以发送多种类型的数据对象,只要做好适配器实现即可。

本文示例Demo