创建命名管道服务器
定义
定义
一、说明
NamedPipeService是命名管道系服务器基类,它不参与实际的数据交互,只是配置、激活、管理、注销、重建NamedPipeSocketClient类实例。而NamedPipeSocketClient是当**NamedPipeClient(客户端)**成功连接服务器以后,由服务器新建的一个实例类,后续的所有通信,也都是通过该实例完成的。
二、特点
- 简单易用。
- 异步执行。
- 内存池支持
- 高性能(实测服务器单客户端单线程,每秒可接收数据流量可达6.5GB/s)。
- 多管道名称监听(可以一次性监听多个管道名称)
- 适配器预处理,一键式解决分包、粘包、对象解析等(即适用于Tcp的一切适配器)。
- 超简单的同步发送、异步发送、接收等操作。
- 基于委托、插件驱动,让每一步都能执行AOP。
三、产品应用场景
- 所有本机IPC(进程通讯)基础使用场景:可跨平台、跨语言使用。
四、服务器架构
服务器在收到新客户端连接时,会创建一个NamedPipeSocketClient
的派生类实例,与客户端NamedPipeClient
一一对应,后续的数据通 信均由此实例负责。
NamedPipeSocketClient
在Service里面以字典映射。Id为键,NamedPipeSocketClient
本身为值。

五、可配置项
可配置项
SetMaxPackageSize
数据包最大值(单位:byte),默认1024×1024×10。该值会在适当时间,直接作用DataHandlingAdapter.MaxPackageSize。
SetPipeName
设置管道名称。
SetNamedPipeListenOptions
设置独立的管道监听,可以独立控制当前监听的个性化配置。
SetServerName
服务器标识名称,无实际使用意义。
SetMaxCount
最大可连接数,默认为10000
UseDelaySender(暂未实现)
使用延迟发送。众所周知,tcp数据报文为了发送效率,会默认启用延迟算法。但是这种设置,只能一定程度的缓解小数据发送效率低的问题,因为它为了保证多线程发送的有序性,在send函数中设置了线程同步,所以说,每调用一次send,实际上都是巨大的性能消耗(此处用iocp发送亦然)。所以,要解决该问题, 最终还是要将小数据,组合成大数据,这样才能更高效率的发送。所以,DelaySender正是负责此类工作的。
使用DelaySender,会一定程度的降低发送的及时性,但是降低程度并不高,简单来说:
- 如果一个包大于512kb,则不会延迟,直接发送。
- 如果发送第一个包,与第二个包的时间间隔小于一个线程池线程调度的时间(这个时间极短,一般来说会在10微秒左右),则会将这两个包压缩为一个包发送。
六、支持插件
插件方法 | 功能 |
---|---|
INamedPipeConnectingPlugin | 此时管道实际上已经完成连接,但是并没有启动接收,然后触发。 |
INamedPipeConnectedPlugin | 同意连接,且成功启动接收后触发 |
INamedPipeDisconnectingPlugin | 当客户端主动调用Close时触发 |
INamedPipeDisconnectedPlugin | 当客户端断开连接后触发 |
INamedPipeReceivingPlugin | 在收到原始数据时触发,所有的数据均在ByteBlock里面。 |
INamedPipeReceivedPlugin | 在收到适配器数据时触发,根据适配器类型,数据可能在ByteBlock或者IRequestInfo里面。 |
INamedPipeSendingPlugin | 当即将发送数据时,调用该方法在适配器之后,接下来即会发送数据。 |
IIdChangedPlugin | 当NamedPipeSocketClient的Id发生改变时触发。 |
七、创建NamedPipeService
7.1 简单创建
直接初始化NamedPipeService,会使用默认的NamedPipeSocketClient。 简单的处理逻辑可通过Connecting、Connected、Received等委托直接实现。
代码如下:
var service = new NamedPipeService();
service.Connecting = (client, e) => { return EasyTask.CompletedTask; };//有客户端正在连接
service.Connected = (client, e) => {return EasyTask.CompletedTask; };//有客户端成功连接
service.Disconnecting = (client, e) => {return EasyTask.CompletedTask; };//有客户端正在断开连接,只有当主动断开时才有效。
service.Disconnected = (client, e) => {return EasyTask.CompletedTask; };//有客户端断开连接
service.Received = (client, e) =>
{
//从客户端收到信息
string mes = Encoding.UTF8.GetString(e.ByteBlock.Buffer, 0, e.ByteBlock.Len);//注意:数据长 度是byteBlock.Len
client.Logger.Info($"已从{client.Id}接收到信息:{mes}");
client.Send(mes);//将收到的信息直接返回给发送方
//client.Send("id",mes);//将收到的信息返回给特定ID的客户端
//将收到的信息返回给在线的所有客户端。
//注意:这只是个设计建议,实际上群发应该使用生产者消费者的设计模式
//var ids = service.GetIds();
//foreach (var clientId in ids)
//{
// if (clientId != client.Id)//不给自己发
// {
// service.Send(clientId, mes);
// }
//}
return Task.CompletedTask;
};
service.Setup(new TouchSocketConfig()//载入配置
.SetPipeName("TouchSocketPipe")//设置命名管道名称
.ConfigureContainer(a =>
{
a.AddConsoleLogger();//添加一个控制台日志注入(注意:在maui中控制台日志不可用)
})
.ConfigurePlugins(a =>
{
//a.Add();//此处可以添加插件
}));
service.Start();//启动
Service.Start()方法并不会阻塞当前运行,所以当在控制台运行时,可能需要使用Console.ReadKey()等操作进行阻塞。
7.2 泛型创建
通过泛型创建服务器,可以实现很多有意思,且能重写一些有用的功能。下面就演示,如何通过泛型创建服务器。
代码如下:
(1)建立NamedPipeSocketClient
继承类。
public class MySocketClient : NamedPipeSocketClient
{
protected override async Task ReceivedData(ReceivedDataEventArgs e)
{
//此处逻辑单线程处理。
//此处处理数据,功能相当于Received委托。
string mes = Encoding.UTF8.GetString(e.ByteBlock.Buffer, 0, e.ByteBlock.Len);
Console.WriteLine($"已接收到信息:{mes}");
await base.ReceivedData(e);
}
}
(2)建立NamedPipeService
继承类。实际上如果业务不涉及服务器配置的话,可以省略该步骤,使用NamedPipeService的泛型直接创建。
public class MyService : NamedPipeService<MySocketClient>
{
protected override void LoadConfig(TouchSocketConfig config)
{
//此处加载配置,用户可以从配置中获取配置项。
base.LoadConfig(config);
}
protected override async Task OnConnecting(MySocketClient socketClient, ConnectingEventArgs e)
{
//此处逻辑会多线程处理。
//e.Id:对新连接的客户端进行ID初始化,默认情况下是按照设定的规则随机分配的。
//但是按照需求,您可以自定义设置,例如设置为其IP地址。但是需要注意的是id必须在生命周期内唯一。
//e.IsPermitOperation:指示是否允许该客户端链接。
await base.OnConnecting(socketClient, e);
}
}
(3)创建服务器(包含MyService)。
var service = new MyService();
service.Setup(new TouchSocketConfig()//载入配置
.SetPipeName("TouchSocketPipe")//设置命名管道名称
.ConfigureContainer(a =>//容器的配置顺序应该在最前面
{
a.AddConsoleLogger();//添加一个控制台日志注入(注意:在maui中控制台日志不可用)
})
.ConfigurePlugins(a =>
{
//a.Add();//此处可以添加插件
}));
service.Start();//启动
(4)创建服务器(不含MyService)。
var service = new NamedPipeService<MySocketClient>();
service.Setup(new TouchSocketConfig()//载入配置
.SetPipeName("TouchSocketPipe")//设置命名管道名称
.ConfigureContainer(a =>
{
a.AddConsoleLogger();//添加一个控制台日志注入(注意:在maui中控制台日志不可用)
})
.ConfigurePlugins(a =>
{
//a.Add();//此处可以添加插件
}));
service.Start();//启动
由上述代码可以看出,通过继承,可以更加灵活的实现扩展。但实际上,很多业务我们希望大家能通过插件完成。
八、配置监听
8.1 Config直接配置
服务器在配置监听时,有多种方式实现。其中最简单、最常见的配置方式就是通过Config直接配置。
var service = new NamedPipeService();
service.Setup(new TouchSocketConfig()//载入配置
.SetPipeName("TouchSocketPipe"));//设置命名管道名称
service.Start();//启动
8.2 直接添加监听配置
直接添加监听配置是更加个性化的监听配置,它可以单独控制指定监听地址的具体配置,例如:使用何种适配器等。
var service = new NamedPipeService();
service.Setup(new TouchSocketConfig()//载入配置
.SetPipeName("TouchSocketPipe")//设置默认命名管道名称
.SetNamedPipeListenOptions(list =>
{
//如果想实现多个命名管道的监听,即可这样设置,一直Add即可。
list.Add(new NamedPipeListenOption()
{
Adapter = () => new NormalDataHandlingAdapter(),
Name = "TouchSocketPipe2"//管道名称
});
list.Add(new NamedPipeListenOption()
{
Adapter = () => new NormalDataHandlingAdapter(),
Name = "TouchSocketPipe3"//管道名称
});
})
.ConfigureContainer(a =>//容器的配置顺序应该在最前面
{
a.AddConsoleLogger();//添加一个控制台日志注入(注意:在maui中控制台日志不可用)
})
.ConfigurePlugins(a =>
{
//a.Add();//此处可以添加插件
}));
service.Start();//启动
service.Logger.Info("服务器已启动");
SetPipeName
可以和SetNamedPipeListenOptions
可以同时使用,但是需要注意的是,Config的全局配置仅会对SetPipeName
单独生效的。SetNamedPipeListenOptions
的地址配置均是单独配置的。
8.3 动态添加、移除监听配置
服务器支持在运行时,动态添加,和移除监听配置,这极大的为灵活监听提供了方便,并且还不影响现有连接。可以轻量级的实现Stop操作。
var service = new NamedPipeService();
service.Setup(new TouchSocketConfig());
service.Start();//启动
service.AddListen(new NamedPipeListenOption()//在Service运行时,可以调用,直接添加监听
{
Name = "TouchSocketPipe4",//名称用于区分监听
Adapter = () => new FixedHeaderPackageAdapter(),//可以单独对当前地址监听,配置适配器,还有其他可配置项,都是单独对当前地址有效。
});
foreach (var item in service.Monitors)
{
service.RemoveListen(item);//在Service运行时,可以调用,直接移除现有监听
}
九、接收数据
在NamedPipeService中,接收数据的方式有很多种。多种方式可以组合使用。
9.1 Received委托处理
当使用NamedPipeService(非泛型)创 建服务器时,内部已经定义好了一个外置委托Received,可以通过该委托直接接收数据。
var service = new NamedPipeService();
service.Received = (client, e) =>
{
//从客户端收到信息
string mes = Encoding.UTF8.GetString(e.ByteBlock.Buffer, 0, e.ByteBlock.Len);
client.Logger.Info($"已从{client.Id}接收到信息:{mes}");
return EasyTask.CompletedTask;
};
service.Start("TouchSocketPipe");//启动
9.2 重写NamedPipeSocketClient处理
正如6.2所示,可以直接在MySocketClient的重写ReceivedData中直接处理数据。
9.3 插件处理
按照TouchSocket的设计理念,使用插件处理数据,是一项非常简单,且高度解耦的方式。步骤如下:
(1)声明插件
插件可以先继承PluginBase
,然后再实现需要的功能插件接口,可以按需选择泛型或者非泛型实现。
如果已经有继承类,直接实现IPlugin
接口即可。
class MyNamedPipePlugin : PluginBase, INamedPipeConnectedPlugin, INamedPipeDisconnectedPlugin, INamedPipeReceivedPlugin
{
private readonly ILog m_logger;
public MyNamedPipePlugin(ILog logger)
{
this.m_logger = logger;
}
public async Task OnNamedPipeConnected(INamedPipeClientBase client, ConnectedEventArgs e)
{
m_logger.Info("Connected");
await e.InvokeNext();
}
public async Task OnNamedPipeDisconnected(INamedPipeClientBase client, DisconnectEventArgs e)
{
m_logger.Info("Disconnected");
await e.InvokeNext();
}
public async Task OnNamedPipeReceived(INamedPipeClientBase client, ReceivedDataEventArgs e)
{
m_logger.Info(e.ByteBlock.ToString());
client.Send(e.ByteBlock);
await e.InvokeNext();
}
}
(2)创建使用插件处理的服务器
var service = new NamedPipeService();
service.Setup(new TouchSocketConfig()
.SetPipeName("TouchSocketPipe")//设置命名管道名称
.ConfigureContainer(a =>
{
a.AddConsoleLogger();
})
.ConfigurePlugins(a =>
{
a.Add<MyNamedPipePlugin>();
}));
service.Start();
十、AspNetCore中创建 (暂未实现)
首先建议安装TouchSocket.AspNetCore
或者TouchSocketPro.AspNetCore
,因为这个里面有很多可以直接使用的注入项。
在安装TouchSocket.AspNetCore
或者TouchSocketPro.AspNetCore
的同时,最好也安装TouchSocket
或者TouchSocketPro
。这样更新也及时一些。
在AspNetCore中使用NamedPipeService
,不应该像普通端一样,订阅Received。应该是通过插件,注入等方式实现。步骤如下:
- 注入
NamedPipeService
,并做好配置(和常规服务器配置一样)。 - 新建插件,处理收到的数据。
代码如下:
(1)声明插件
class MyNamedPipePlugin : PluginBase, INamedPipeConnectedPlugin, INamedPipeDisconnectedPlugin, INamedPipeReceivedPlugin
{
private readonly ILog m_logger;
public MyNamedPipePlugin(ILog logger)
{
this.m_logger = logger;
}
public async Task OnNamedPipeConnected(INamedPipeClientBase client, ConnectedEventArgs e)
{
m_logger.Info("Connected");
await e.InvokeNext();
}
public async Task OnNamedPipeDisconnected(INamedPipeClientBase client, DisconnectEventArgs e)
{
m_logger.Info("Disconnected");
await e.InvokeNext();
}
public async Task OnNamedPipeReceived(INamedPipeClientBase client, ReceivedDataEventArgs e)
{
m_logger.Info(e.ByteBlock.ToString());
client.Send(e.ByteBlock);
await e.InvokeNext();
}
}
(2)注入服务
public void ConfigureServices(IServiceCollection services)
{
var tcpService = services.AddNamedPipeService(config =>
{
config.SetPipeName("TouchSocketPipe")//设置命名管道名称
.UseAspNetCoreContainer(services)//使用和Aspnetcore一致的容器,可以做到注册类型的共享
.ConfigurePlugins(a =>
{
a.Add<MyNamedPipePlugin>();//此插件就可以处理接收数据
});
});
}
UseAspNetCoreContainer时,可以做到和Aspnetcore注册类型的共享。但是需要注意的是,IServiceProvider
并不是同一个实例,所以当使用单例注册时,可能无法做到单例共享。解决方法就是将单例类型,使用实例注册。这样就完全可以。其他生命周期不受影响。
然后在任意地方,也可获得服务。例如:在控制器中
此时,NamedPipeService 与整个AspNetCore是共享IOC容器的。即:NamedPipeService中的任何地方(例如:插件)也能获得AspNetCore已注册的服务。
十一、发送数据
按照架构图,每个客户端成功连接后,服务器都会创建一个派生自NamedPipeSocketClient的实例,通过该实例即可将数据发送至客户端。
11.1 如何获取NamedPipeSocketClient?
(1)直接获取所有在线客户端
通过service.GetClients
方法,获取当前在线的所有客户端。
var socketClients = service.GetClients();
由于NamedPipeSocketClient的生命周期是由框架控制的,所以最好尽量不要直接引用该实例,可以引用NamedPipeSocketClient.Id,然后再通过服务器查找。
(2)通过Id获取
先调用service.GetIds
方法,获取当前在线的所有客户端的Id,然后选择需要的Id,通过TryGetSocketClient方法,获取到想要的客户端。
var ids = service.GetIds();
if (service.TryGetSocketClient(ids.First(), out var socketClient))
{
}
11.2 发送
【同步发送】
NamedPipeSocketClient已经内置了三种同步发送方法,直接调用就可以发送,如果发送失败,则会立即抛出异常。
public virtual void Send(byte[] buffer);
public virtual void Send(ByteBlock byteBlock);
public virtual void Send(byte[] buffer, int offset, int length);
【异步发送】
NamedPipeClient已经内置了三种异步发送方法,直接调用就可以发送。如果发送失败,await就会触发异常。
public virtual Task SendAsync(byte[] buffer);
public virtual Task SendAsync(ByteBlock byteBlock);
public virtual Task SendAsync(byte[] buffer, int offset, int length);
通过上述方法发送的数据,都会经过适配器,如果想要直接发送,请使用DefaultSend。
11.3 通过NamedPipeService发送
通过Id发送数据。
public virtual void Send(string id, ByteBlock byteBlock);
public virtual void Send(string id, byte[] buffer, int offset, int length);
public virtual void Send(string id, byte[] buffer);
public virtual Task SendAsync(string id, ByteBlock byteBlock);
public virtual Task SendAsync(string id, byte[] buffer, int offset, int length);
public virtual Task SendAsync(string id, byte[] buffer);
框架不仅内置了Send
字节的发送,也扩展了字符串等常见数据的发送。而且还包括了TrySend
等不会抛出异常的发送方法。