跳到主要内容
版本:2.1

创建TcpService

定义

命名空间:TouchSocket.Sockets
程序集:TouchSocket.dll

一、说明

TcpService是Tcp系服务器基类,它不参与实际的数据交互,只是配置、激活、管理、注销、重建SessionClient类实例。而SessionClient是当**TcpClient(客户端)**成功连接服务器以后,由服务器新建的一个实例类,后续的所有通信,也都是通过该实例完成的。

二、特点

  • 简单易用。
  • IOCP多线程。
  • 内存池支持
  • 高性能(实测服务器单客户端单线程,每秒可接收200w条8字节的信息,接收数据流量可达3GB/s)。
  • 多地址监听(可以一次性监听多个IP及端口)
  • 适配器预处理,一键式解决分包粘包、对象解析(如HTTP,Json)等。
  • 超简单的同步发送、异步发送、接收等操作。
  • 基于委托、插件驱动,让每一步都能执行AOP。

2.1 吞吐量性能测试

如下图所示,使用最简单数据接收,不做任何处理。数据吞吐量可达3Gb。 测试Demo示例

2.2 连接性能测试

如下图所示,使用最简单连接测试,不做任何处理。建立1000本地连接仅需0.1秒。 测试Demo示例

三、产品应用场景

  • 所有Tcp基础使用场景:可跨平台、跨语言使用。
  • 自定义协议解析场景:可解析任意数据格式的TCP数据报文。

四、服务器架构

服务器在收到新客户端连接时,会创建一个SessionClient的派生类实例,与客户端TcpClient一一对应,后续的数据通信均由此实例负责。

SessionClient在Service里面以字典映射。ID为键,SessionClient本身为值。

五、可配置项

可配置项

SetMaxPackageSize

数据包最大值(单位:byte),默认1024×1024×10。该值会在适当时间,直接作用DataHandlingAdapter.MaxPackageSize。

SetGetDefaultNewId

配置初始Id的分配策略。

SetListenIPHosts

设置统一的监听IP和端口号组,可以一次性设置多个地址。

SetListenOptions

设置独立的监听IP和端口号,可以独立控制当前地址监听的个性化配置。

SetServerName

服务器标识名称,无实际使用意义。

SetBacklogProperty

Tcp半连接挂起连接队列的最大长度。默认为30

SetMaxCount

最大可连接数,默认为10000

SetServiceSslOption

Ssl配置,为Null时则不启用。

UseNoDelay

设置Socket的NoDelay属性,默认false。

UseReuseAddress

启用端口复用。该配置可在服务器、或客户端在监听端口时,运行监听同一个端口。可以一定程度缓解端口来不及释放的问题。

SetSendTimeout

设置发送超时时间,默认0ms,即禁用该配置。

六、支持插件

插件方法功能
ITcpConnectingPlugin此时Socket实际上已经完成连接,但是并没有启动接收,然后触发。
ITcpConnectedPlugin同意连接,且成功启动接收后触发
ITcpClosingPlugin当客户端主动调用Close时触发
ITcpClosedPlugin当客户端断开连接后触发
ITcpReceivingPlugin在收到原始数据时触发,所有的数据均在ByteBlock里面。
ITcpReceivedPlugin在收到适配器数据时触发,根据适配器类型,数据可能在ByteBlock或者IRequestInfo里面。
ITcpSendingPlugin当即将发送数据时,调用该方法在适配器之后,接下来即会发送数据。
IIdChangedPlugin当SessionClient的Id发生改变时触发。

七、创建TcpService

7.1 简单创建

直接初始化TcpService,会使用默认的SessionClient。 简单的处理逻辑可通过ConnectingConnectedReceived等委托直接实现。

代码如下:

var service = new TcpService();
service.Connecting = (client, e) => { return EasyTask.CompletedTask; };//有客户端正在连接
service.Connected = (client, e) => { return EasyTask.CompletedTask; };//有客户端成功连接
service.Closing = (client, e) => { return EasyTask.CompletedTask; };//有客户端正在断开连接,只有当主动断开时才有效。
service.Closed = (client, e) => { return EasyTask.CompletedTask; };//有客户端断开连接
service.Received = async (client, e) =>
{
//从客户端收到信息
var mes = e.ByteBlock.Span.ToString(Encoding.UTF8);
client.Logger.Info($"已从{client.Id}接收到信息:{mes}");
};

await service.SetupAsync(new TouchSocketConfig()//载入配置
.SetListenIPHosts("tcp://127.0.0.1:7789", 7790)//可以同时监听两个地址
.ConfigureContainer(a =>//容器的配置顺序应该在最前面
{
a.AddConsoleLogger();//添加一个控制台日志注入(注意:在maui中控制台日志不可用)
})
.ConfigurePlugins(a =>
{
//a.Add();//此处可以添加插件
}));

await service.StartAsync();//启动
温馨提示

Service.StartAsync()方法并不会阻塞当前运行,所以当在控制台运行时,可能需要使用Console.ReadKey()等操作进行阻塞。

7.2 泛型创建

通过泛型创建服务器,可以实现很多有意思,且能重写一些有用的功能。下面就演示,如何通过泛型创建服务器。

代码如下:

(1)建立SessionClient继承类。

public sealed class MySessionClient : TcpSessionClient
{
protected override async Task OnTcpReceived(ReceivedDataEventArgs e)
{
//此处逻辑单线程处理。

//此处处理数据,功能相当于Received委托。
var mes = e.ByteBlock.Span.ToString(Encoding.UTF8);
Console.WriteLine($"已接收到信息:{mes}");

await base.OnTcpReceived(e);
}
}

(2)建立TcpService继承类。实际上如果业务不涉及服务器配置的话,可以省略该步骤,使用TcpService的泛型直接创建。

public class MyService : TcpService<MySessionClient>
{
protected override void LoadConfig(TouchSocketConfig config)
{
//此处加载配置,用户可以从配置中获取配置项。
base.LoadConfig(config);
}

protected override MySessionClient NewClient()
{
return new MySessionClient();
}

protected override async Task OnTcpConnecting(MySessionClient socketClient, ConnectingEventArgs e)
{
//此处逻辑会多线程处理。

//e.Id:对新连接的客户端进行ID初始化,默认情况下是按照设定的规则随机分配的。
//但是按照需求,您可以自定义设置,例如设置为其IP地址。但是需要注意的是id必须在生命周期内唯一。

//e.IsPermitOperation:指示是否允许该客户端链接。

await base.OnTcpConnecting(socketClient, e);
}
}

(3)创建服务器。

MyService service = new MyService();

await service.SetupAsync(new TouchSocketConfig()//载入配置
.SetListenIPHosts("tcp://127.0.0.1:7789")
.ConfigureContainer(a =>//容器的配置顺序应该在最前面
{
a.AddConsoleLogger();//添加一个控制台日志注入(注意:在maui中控制台日志不可用)
})
.ConfigurePlugins(a =>
{
//a.Add();//此处可以添加插件
}));

await service.StartAsync();//启动
建议

由上述代码可以看出,通过继承,可以更加灵活的实现扩展。但实际上,很多业务我们希望大家能通过插件完成。

八、配置监听

所有配置监听的项,都是从IPHost类创建而来。

IPHost支持以下格式创建:

  • 端口:直接按int入参,该操作一般在监听Ipv4时使用。
  • IPv4:按"127.0.0.1:7789"入参。
  • IPv6:按"[*::*]:7789"入参。

8.1 Config直接配置

服务器在配置监听时,有多种方式实现。其中最简单、最常见的配置方式就是通过Config直接配置。

TcpService service = new TcpService();

await service.SetupAsync(new TouchSocketConfig()//载入配置
.SetListenIPHosts("tcp://127.0.0.1:7789", 7790));

await service.StartAsync();//启动

8.2 直接添加监听配置

直接添加监听配置是更加个性化的监听配置,它可以单独控制指定监听地址的具体配置,例如:是否启用Ssl加密、使用何种适配器等。

var service = new TcpService();

await service.SetupAsync(new TouchSocketConfig()//载入配置
.SetListenOptions(option =>
{
option.Add(new TcpListenOption()
{
IpHost = "127.0.0.1:7789",
Name = "server1",//名称用于区分监听
ServiceSslOption = null,//可以针对当前监听,单独启用ssl加密
Adapter = () => new NormalDataHandlingAdapter(),//可以单独对当前地址监听,配置适配器
//还有其他可配置项,都是单独对当前地址有效。
});

option.Add(new TcpListenOption()
{
IpHost = 7790,
Name = "server2",//名称用于区分监听
ServiceSslOption = null,//可以针对当前监听,单独启用ssl加密
Adapter = () => new FixedHeaderPackageAdapter(),//可以单独对当前地址监听,配置适配器
//还有其他可配置项,都是单独对当前地址有效。
});
}));

await service.StartAsync();//启动
温馨提示

SetListenIPHosts可以和SetListenOptions可以同时使用,但是需要注意的是,Config的全局配置仅会对SetListenIPHosts单独生效的。SetListenOptions的地址配置均是单独配置的。

8.3 动态添加、移除监听配置

服务器支持在运行时,动态添加,和移除监听配置,这极大的为灵活监听提供了方便,并且还不影响现有连接。可以轻量级的实现Stop操作。

TcpService service = new TcpService();
await service.SetupAsync(new TouchSocketConfig());
await service.StartAsync();//启动

service.AddListen(new TcpListenOption()//在Service运行时,可以调用,直接添加监听
{
IpHost = 7791,
Name = "server3",//名称用于区分监听
ServiceSslOption = null,//可以针对当前监听,单独启用ssl加密
Adapter = () => new FixedHeaderPackageAdapter(),//可以单独对当前地址监听,配置适配器
//还有其他可配置项,都是单独对当前地址有效。
});

foreach (var item in service.Monitors)
{
service.RemoveListen(item);//在Service运行时,可以调用,直接移除现有监听
}

九、接收数据

在TcpService中,接收数据的方式有很多种。多种方式可以组合使用。

9.1 Received委托处理

当使用TcpService(非泛型)创建服务器时,内部已经定义好了一个外置委托Received,可以通过该委托直接接收数据。

var service = new TcpService();
service.Received = (client, e) =>
{
//从客户端收到信息
var mes = e.ByteBlock.Span.ToString(Encoding.UTF8);
client.Logger.Info($"已从{client.Id}接收到信息:{mes}");
return EasyTask.CompletedTask;
};

await service.SetupAsync(new TouchSocketConfig()//载入配置
.SetListenIPHosts("tcp://127.0.0.1:7789", 7790)//同时监听两个地址
.ConfigureContainer(a =>//容器的配置顺序应该在最前面
{
a.AddConsoleLogger();//添加一个控制台日志注入(注意:在maui中控制台日志不可用)
}));
await service.StartAsync();//启动

9.2 重写TcpSessionClient处理

正如6.2所示,可以直接在MySessionClient的重写OnTcpReceived中直接处理数据。

9.3 插件处理

按照TouchSocket的设计理念,使用插件处理数据,是一项非常简单,且高度解耦的方式。步骤如下:

(1)声明插件

插件可以先继承PluginBase,然后再实现需要的功能插件接口,可以按需选择泛型或者非泛型实现。

如果已经有继承类,直接实现IPlugin接口即可。

public class MyPlugin : PluginBase, ITcpReceivedPlugin
{
public async Task OnTcpReceived(ITcpSession client, ReceivedDataEventArgs e)
{
//这里处理数据接收
//根据适配器类型,e.ByteBlock与e.RequestInfo会呈现不同的值,具体看文档=》适配器部分。
ByteBlock byteBlock = e.ByteBlock;
IRequestInfo requestInfo = e.RequestInfo;

//e.Handled = true;//表示该数据已经被本插件处理,无需再投递到其他插件。

await e.InvokeNext();//如果本插件无法处理当前数据,请将数据转至下一个插件。
}
}

(2)创建使用插件处理的服务器

var service = new TcpService();
await service.SetupAsync(new TouchSocketConfig()
.SetListenIPHosts(7789)
.ConfigureContainer(a =>
{
a.AddConsoleLogger();
})
.ConfigurePlugins(a =>
{
a.Add<MyPlugin>();
}));
await service.StartAsync();
注意

当接收数据时,ByteBlock与RequestInfo的值会根据适配器类型不同而不同。并且,当数据存于ByteBlock时,其实际的数据长度是ByteBlock.Length(Len)。而不是ByteBlock.Buffer.Length

9.4 异步阻塞接收

异步阻塞接收,即使用await的方式接收数据。其特点是能在代码上下文中,直接获取到收到的数据。

只是在服务器使用异步阻塞时,建议直接在Connected触发时相关使用。

下列将以插件为例:

class TcpServiceReceiveAsyncPlugin : PluginBase, ITcpConnectedPlugin
{
public async Task OnTcpConnected(ITcpSession client, ConnectedEventArgs e)
{
if (client is ITcpSessionClient sessionClient)
{
//receiver可以复用,不需要每次接收都新建
using (var receiver = sessionClient.CreateReceiver())
{
while (true)
{
//receiverResult每次接收完必须释放
using (var receiverResult = await receiver.ReadAsync(CancellationToken.None))
{
//收到的数据,此处的数据会根据适配器投递不同的数据。
var byteBlock = receiverResult.ByteBlock;
var requestInfo = receiverResult.RequestInfo;

if (receiverResult.IsCompleted)
{
//断开连接了
Console.WriteLine($"断开信息:{receiverResult.Message}");
return;
}

//如果数据是从ByteBlock投递
if (byteBlock != null)
{
Console.WriteLine(byteBlock.Span.ToString(Encoding.UTF8));
}

//如果是适配器信息,则可以直接处理requestInfo;
}
}
}
}

await e.InvokeNext();
}
}

在异步阻塞接收时,当接收的数据不满足解析条件时,还可以缓存起来,下次一起处理。

例如:下列将演示接收字符串,当没有发现“\r\n”时,将缓存数据,直到发现重要字符。

其中,CacheModeMaxCacheSize是启用缓存的重要属性。byteBlock.Seek则是将已读取的数据游标移动至指定位置。

class TcpServiceReceiveAsyncPlugin : PluginBase, ITcpConnectedPlugin
{
public async Task OnTcpConnected(ITcpSession client, ConnectedEventArgs e)
{
if (client is ITcpSessionClient sessionClient)
{
//receiver可以复用,不需要每次接收都新建
using (var receiver = sessionClient.CreateReceiver())
{
receiver.CacheMode = true;
receiver.MaxCacheSize = 1024 * 1024;

var rn = Encoding.UTF8.GetBytes("\r\n");
while (true)
{
//receiverResult每次接收完必须释放
using (var receiverResult = await receiver.ReadAsync(CancellationToken.None))
{
//收到的数据,此处的数据会根据适配器投递不同的数据。
var byteBlock = receiverResult.ByteBlock;
var requestInfo = receiverResult.RequestInfo;

if (receiverResult.IsCompleted)
{
//断开连接了
Console.WriteLine($"断开信息:{receiverResult.Message}");
return;
}

//在CacheMode下,byteBlock将不可能为null

var index = 0;
while (true)
{
var r = byteBlock.Span.Slice(index).IndexOf(rn);
if (r < 0)
{
break;
}

var str = byteBlock.Span.Slice(index, r).ToString(Encoding.UTF8);
Console.WriteLine(str);

index += rn.Length;
index += r;
}

byteBlock.Seek(index);
}
}
}
}

await e.InvokeNext();
}
}
提示

异步阻塞接收,在等待接收数据时,不会阻塞线程资源,所以即使大量使用,也不会影响性能。

十、发送数据

按照架构图,每个客户端成功连接后,服务器都会创建一个派生自TcpSessionClient的实例,并将其存以生成的Id为键,存在一个字典中。

所以,service提供了一下原生方法,可以通过id直接将数据发送至指定客户端。

//原生
public Task SendAsync(string id, ReadOnlyMemory<byte> memory);
public Task SendAsync(string id, IRequestInfo requestInfo);

例如:

await service.SendAsync("id",Encoding.UTF8.GetBytes("hello"));

亦或者,可以先用id查到对应的TcpSessionClient,然后用其提供的方法直接发送。

例如:

//尝试性获取
if (service.TryGetClient("id", out var sessionClient))
{
await sessionClient.SendAsync("hello");
}
//直接获取,如果id不存在,则会抛出异常
var sessionClient = service.GetClient("id");
await sessionClient.SendAsync("hello");
注意

由于TcpSessionClient的生命周期是由框架控制的,所以最好尽量不要直接引用该实例,可以引用TcpSessionClient.Id,然后再通过服务器查找。

注意

所有的发送,框架内部实际上只实现了异步发送,但是为了兼容性,仍然保留了同步发送的扩展。但是强烈建议如有可能,请务必使用异步发送来提高效率

提示

框架不仅内置了字节的发送,也扩展了字符串等常见数据的发送。而且还包括了TrySend等不会抛出异常的发送方法。

本文示例Demo