创建WebSocket服务器
定义
命名空间:TouchSocket.Http.WebSockets
程序集:TouchSocket.Http.dll
一、说明
WebSocket
是基于Http
协议的升级协议,所以应当挂载在http
服务器执行。
二、可配置项
三、支持插件接口
插件方法 | 功能 |
---|---|
IWebSocketHandshakingPlugin | 当收到握手请求之前,可以进行连接验证等 |
IWebSocketHandshakedPlugin | 当成功握手响应之后 |
IWebSocketReceivedPlugin | 当收到Websocket的数据报文 |
IWebSocketClosingPlugin | 当收到关闭请求时触发。如果对方直接断开连接,则此方法则不会触发。 |
IWebSocketClosedPlugin | 当WebSocket连接断开时触发,无论是否正常断开。但如果是断网等操作,可能不会立即执行,需要结合心跳操作和CheckClear插件来进行清理。 |
四、创建WebSocket服务
4.1 简单直接创建
可以使用WebSocket插件,直接指定一个特殊url
路由,来完全接收WebSocket连接。
然后通过插件来接收数据。(例:下列接收数据)
var service = new HttpService();
await service.SetupAsync(new TouchSocketConfig()//加载配置
.SetListenIPHosts(7789)
.ConfigureContainer(a =>
{
a.AddConsoleLogger();
})
.ConfigurePlugins(a =>
{
a.UseWebSocket()//添加WebSocket功能
.SetWSUrl("/ws")//设置url直接可以连接。
.UseAutoPong();//当收到ping报文时自动回应pong
}));
await service.StartAsync();
service.Logger.Info("服务器已启动");
4.2 验证连接
可以对连接的Url
、Query
、Header
等参数进行验证,然后决定是否执行WebSocket
连接。
var service = new HttpService();
await service.SetupAsync(new TouchSocketConfig()//加载配置
.SetListenIPHosts(7789)
.ConfigureContainer(a =>
{
a.AddConsoleLogger();
})
.ConfigurePlugins(a =>
{
a.UseWebSocket()//添加WebSocket功能
.SetVerifyConnection(VerifyConnection)
.UseAutoPong();//当收到ping报文时自动回应pong
}));
await service.StartAsync();
service.Logger.Info("服务器已启动");
/// <summary>
/// 验证websocket的连接
/// </summary>
/// <param name="client"></param>
/// <param name="context"></param>
/// <returns></returns>
private static bool VerifyConnection(IHttpSessionClient client, HttpContext context)
{
if (!context.Request.IsUpgrade())//如果不包含升级协议的header,就直接返回false。
{
return false;
}
if (context.Request.UrlEquals("/ws"))//以此连接,则直接可以连接
{
return true;
}
else if (context.Request.UrlEquals("/wsquery"))//以此连接,则需要传入token才可以连接
{
if (context.Request.Query.Get("token") == "123456")
{
return true;
}
else
{
context.Response
.SetStatus(403, "token不正确")
.Answer();
}
}
else if (context.Request.UrlEquals("/wsheader"))//以此连接,则需要从header传入token才可以连接
{
if (context.Request.Headers.Get("token") == "123456")
{
return true;
}
else
{
context.Response
.SetStatus(403, "token不正确")
.Answer();
}
}
return false;
}
4.3 通过WebApi创建
通过WebApi的方式会更加灵活,也能很方便的获得Http相关参数。还能实现多个Url的连接路由。
实现步骤:
- 配置
WebApi
相关,详情请看WebApi - 在插件中接收
WebSocket
连接。
var service = new HttpService();
await service.SetupAsync(new TouchSocketConfig()//加载配置
.SetListenIPHosts(7789)
.ConfigureContainer(a =>
{
a.AddConsoleLogger();
a.AddRpcStore(store =>
{
store.RegisterServer<MyApiServer>();//注册服务
});
})
.ConfigurePlugins(a =>
{
a.UseWebApi();
}));
await service.StartAsync();
service.Logger.Info("服务器已启动");
【接收连接 】
public class MyApiServer : RpcServer
{
private readonly ILog m_logger;
public MyApiServer(ILog logger)
{
this.m_logger = logger;
}
[Router("/[api]/[action]")]
[WebApi(HttpMethodType.GET)]
public async Task ConnectWS(IWebApiCallContext callContext)
{
if (callContext.Caller is HttpSessionClient sessionClient)
{
if (await sessionClient.SwitchProtocolToWebSocketAsync(callContext.HttpContext))
{
m_logger.Info("WS通 过WebApi连接");
var webSocket = sessionClient.WebSocket;
}
}
}
}
或者接收到连接后,直接使用WebSocket
接收。这样如果想直接在WebApi
返回数据也比较方便。
public class MyApiServer : RpcServer
{
private readonly ILog m_logger;
public MyApiServer(ILog logger)
{
this.m_logger = logger;
}
[Router("/[api]/[action]")]
[WebApi(HttpMethodType.GET)]
public async Task ConnectWS(IWebApiCallContext callContext)
{
if (callContext.Caller is HttpSessionClient sessionClient)
{
if (await sessionClient.SwitchProtocolToWebSocketAsync(callContext.HttpContext))
{
m_logger.Info("WS通过WebApi连接");
var webSocket = sessionClient.WebSocket;
webSocket.AllowAsyncRead = true;
while (true)
{
using (var tokenSource=new CancellationTokenSource(TimeSpan.FromSeconds(30)))
{
using (var receiveResult = await webSocket.ReadAsync(tokenSource.Token))
{
if (receiveResult.IsCompleted)
{
//webSocket已断开
return;
}
//webSocket数据 帧
var dataFrame = receiveResult.DataFrame;
//此处可以处理数据
}
}
}
}
}
}
}
4.4 通过Http上下文直接创建
使用上下文直接创建的优点在于能更加个性化的实现WebSocket
的连接。
class MyHttpPlugin : PluginBase, IHttpPlugin
{
public async Task OnHttpRequest(IHttpSessionClient client, HttpContextEventArgs e)
{
if (e.Context.Request.UrlEquals("/GetSwitchToWebSocket"))
{
var result = await client.SwitchProtocolToWebSocketAsync(e.Context);
return;
}
await e.InvokeNext();
}
}
4.5 创建基于Ssl的WebSocket服务
创建WSs
服务器时,其他配置不变,只需要在config
中配置SslOption
代码即可,放置了一个自制Ssl
证书,密码为“RRQMSocket”以供测试。使用配置非常方便。
var config = new TouchSocketConfig();
config.SetServiceSslOption(new ServiceSslOption() //Ssl配置,当为null的时候,相当于创建了ws服务器,当赋值的时候,相当于wss服务器。
{
Certificate = new X509Certificate2("RRQMSocket.pfx", "RRQMSocket"),
SslProtocols = SslProtocols.Tls12
});
五、接收消息
WebSocket服务器接收消息,目前有两种方式。第一种就是通过订阅IWebSocketReceivedPlugin
插件完全异步的接收消息。第二种就是调用WebSocket
,然后调用ReadAsync
方法异步阻塞式读取。
5.1 简单接收消息
【定义插件】
public class MyWebSocketPlugin : PluginBase, IWebSocketReceivedPlugin
{
private readonly ILog m_logger;
public MyWebSocketPlugin(ILog logger)
{
this.m_logger = logger;
}
public async Task OnWebSocketReceived(IWebSocket client, WSDataFrameEventArgs e)
{
switch (e.DataFrame.Opcode)
{
case WSDataType.Cont:
m_logger.Info($"收到中间数据,长度为:{e.DataFrame.PayloadLength}");
return;
case WSDataType.Text:
m_logger.Info(e.DataFrame.ToText());
if (!client.Client.IsClient)
{
client.SendAsync("我已收到");
}
return;
case WSDataType.Binary:
if (e.DataFrame.FIN)
{
m_logger.Info($"收到二进制数据,长度为:{e.DataFrame.PayloadLength}");
}
else
{
m_logger.Info($"收到未结束的二进制数据,长度为:{e.DataFrame.PayloadLength}");
}
return;
case WSDataType.Close:
{
m_logger.Info("远程请求断开");
client.Close("断开");
}
return;
case WSDataType.Ping:
break;
case WSDataType.Pong:
break;
default:
break;
}
await e.InvokeNext();
}
}
【使用】
var service = new HttpService();
await service.SetupAsync(new TouchSocketConfig()//加载配置
.SetListenIPHosts(7789)
.ConfigureContainer(a =>
{
a.AddConsoleLogger();
})
.ConfigurePlugins(a =>
{
a.UseWebSocket()//添加WebSocket功能
.SetWSUrl("/ws");
a.Add<MyWebSocketPlugin>();//自定义插件。
}));
await service.StartAsync();
插件的所有函数,都是可能被并发执行的,所以应当做好线程安全。
5.2 WebSocket显式ReadAsync
WebSocket
显式ReadAsync
数据,实际上是直接通过WebSocket
直接循环读取数据。
一般的,如果WebSocket
是常规连接,则可能需要使用IWebSocketHandshakedPlugin
插件,在握手成功
的后直接读取数据。
如果连接是通过WebApi
、或者Http上下文
创建的连接,则可以直接使用ReadAsync
。
总之要能拿到WebSocket
对象,则可以直接使用。
和简单接收消息
相比,使用ReadAsync
接收消息可以直接访问代码上下文资源,这可能在处理中继数据
时是方便的。
class MyReadWebSocketPlugin : PluginBase, IWebSocketHandshakedPlugin
{
private readonly ILog m_logger;
public MyReadWebSocketPlugin(ILog logger)
{
this.m_logger = logger;
}
public async Task OnWebSocketHandshaked(IWebSocket client, HttpContextEventArgs e)
{
//当WebSocket想要使用ReadAsync时,需要设置此值为true
client.AllowAsyncRead = true;
//此处即表明websocket已连接
while (true)
{
using (var receiveResult = await client.ReadAsync(CancellationToken.None))
{
if (receiveResult.DataFrame == null)
{
break;
}
//判断是否为最后数据
//例如发送方发送了一个10Mb的数据,接收时可能会多次接收,所以需要此属性判断。
if (receiveResult.DataFrame.FIN)
{
if (receiveResult.DataFrame.IsText)
{
m_logger.Info($"WebSocket文本:{receiveResult.DataFrame.ToText()}");
}
}
}
}
//此处即表明websocket已断开连接
m_logger.Info("WebSocket断开连接");
await e.InvokeNext();
}
}
【使用】
private static HttpService CreateHttpService()
{
var service = new HttpService();
await service.SetupAsync(new TouchSocketConfig()//加载配置
.SetListenIPHosts(7789)
.ConfigureContainer(a =>
{
a.AddConsoleLogger();
})
.ConfigurePlugins(a =>
{
a.UseWebSocket()//添加WebSocket功能
.SetWSUrl("/ws")//设置url直接可以连接。
.UseAutoPong();//当收到ping报文时自动回应pong
a.Add<MyReadWebSocketPlugin>();
}));
await service.StartAsync();
service.Logger.Info("服务器已启动");
service.Logger.Info("直接连接地址=>ws://127.0.0.1:7789/ws");
return service;
}
ReadAsync
的方式是属于同步不阻塞的接收方式(和当下Aspnetcore模式一样)。他不会单独占用线程,只会阻塞当前Task
。所以可以大量使用,不需要考虑性能问题。同时,ReadAsync
的好处就是单线程访问上下文,这样在处理ws分包时是非常方便的。
使用该方式,会阻塞IWebSocketHandshakedPlugin
的插件传递。在收到WebSocket
消息的时候,不会再触发插件。
5.3 接收中继数据
WebSocket
协议本身是支持超大数据包的,但是这些包不会一次性接收,而是分多次接收的,同时会通过Opcode
来表明其为中继数据。
下面将演示接收文本数据。
【方法1】
使用ReadAsync
,在OnWebSocketHandshaked
组合消息。
internal class MyReadTextWebSocketPlugin : PluginBase, IWebSocketHandshakedPlugin
{
private readonly ILog m_logger;
public MyReadTextWebSocketPlugin(ILog logger)
{
this.m_logger = logger;
}
public async Task OnWebSocketHandshaked(IWebSocket client, HttpContextEventArgs e)
{
//当WebSocket想要使用ReadAsync时,需要设置此值为true
client.AllowAsyncRead = true;
//此处即表明websocket已连接
MemoryStream stream = default;//中继包缓存
var isText = false;//标识是否为文本
while (true)
{
using (var receiveResult = await client.ReadAsync(CancellationToken.None))
{
if (receiveResult.IsCompleted)
{
break;
}
var dataFrame = receiveResult.DataFrame;
var data = receiveResult.DataFrame.PayloadData;
switch (dataFrame.Opcode)
{
case WSDataType.Cont:
{
//收到的是中继包
if (dataFrame.FIN)//判断是否为最终包
{
//是
if (isText)//判断是否为文本
{
this.m_logger.Info($"WebSocket文本:{Encoding.UTF8.GetString(stream.ToArray())}");
}
else
{
this.m_logger.Info($"WebSocket二进制:{stream.Length}长度");
}
}
else
{
//否,继续缓存
//如果是非net6.0即以上,即:NetFramework平台使用。原因是stream不支持span写入
//var segment = data.AsSegment();
//stream.Write(segment.Array, segment.Offset, segment.Count);
//如果是net6.0以上,直接写入span即可
stream.Write(data.Span);
}
}
break;
case WSDataType.Text:
{
if (dataFrame.FIN)//判断是不是最后的包
{
//是,则直接输出
//说明上次并没有中继数据缓存,直接输出本次内容即可
this.m_logger.Info($"WebSocket文本:{dataFrame.ToText()}");
}
else
{
isText = true;
//否,则说明数据太大了,分中继包了。
//则,初始化缓存容器
stream ??= new MemoryStream();
//下面则是缓存逻辑
//如果是非net6.0即以上,即:NetFramework平台使用。原因是stream不支持span写入
//var segment = data.AsSegment();
//stream.Write(segment.Array, segment.Offset, segment.Count);
//如果是net6.0以上,直接写入span即可
stream.Write(data.Span);
}
}
break;
case WSDataType.Binary:
{
if (dataFrame.FIN)//判断是不是最后的包
{
//是,则直接输出
//说明上次并没有中继数据缓存,直接输出本次内容即可
this.m_logger.Info($"WebSocket二进制:{data.Length}长度");
}
else
{
isText = false;
//否,则说明数据太大了,分中继包了。
//则,初始化缓存容器
stream ??= new MemoryStream();
//下面则是缓存逻辑
//如果是非net6.0即以上,即:NetFramework平台使用。原因是stream不支持span写入
//var segment = data.AsSegment();
//stream.Write(segment.Array, segment.Offset, segment.Count);
//如果是net6.0以上,直接写入span即可
stream.Write(data.Span);
}
}
break;
case WSDataType.Close:
break;
case WSDataType.Ping:
break;
case WSDataType.Pong:
break;
default:
break;
}
}
}
//此处即表明websocket已断开连接
this.m_logger.Info("WebSocket断开连接");
await e.InvokeNext();
}
}
或者可以使用内置的扩展方法来进行一次性接收。
例如一次性接收文本:
var str = await client.ReadStringAsync();
一次性接收二进制数据:
using (MemoryStream stream=new MemoryStream())
{
var str = await client.ReadBinaryAsync(stream);
}
ReadStringAsync
或者ReadBinaryAsync
,都只接收对应的数据类型,如果收到非匹配数据则会抛出异常。
【方法2】
使用消息组合器。如果想在OnWebSocketReceived
进行合并消息,则可以使用该方法。
public class MyWebSocketPlugin : PluginBase, IWebSocketReceivedPlugin
{
public MyWebSocketPlugin(ILog logger)
{
this.m_logger = logger;
}
private readonly ILog m_logger;
public async Task OnWebSocketReceived(IWebSocket client, WSDataFrameEventArgs e)
{
switch (e.DataFrame.Opcode)
{
case WSDataType.Close:
{
this.m_logger.Info("远程请求断开");
await client.CloseAsync("断开");
}
return;
case WSDataType.Ping:
this.m_logger.Info("Ping");
await client.PongAsync();//收到ping时,一般需要响应pong
break;
case WSDataType.Pong:
this.m_logger.Info("Pong");
break;
default:
{
//其他报文,需要考虑中继包的情况。所以需要手动合并 WSDataType.Cont类型的包。
//或者使用消息合并器
//获取消息组合器
var messageCombinator = client.GetMessageCombinator();
try
{
//尝试组合
if (messageCombinator.TryCombine(e.DataFrame, out var webSocketMessage))
{
//组合成功,必须using释放模式
using (webSocketMessage)
{
//合并后的消息
var dataType = webSocketMessage.Opcode;
//合并后的完整消息
var data = webSocketMessage.PayloadData;
if (dataType == WSDataType.Text)
{
//按文本处理
}
else if (dataType == WSDataType.Binary)
{
//按字节处理
}
else
{
//可能是其他自定义协议
}
}
}
}
catch (Exception ex)
{
this.m_logger.Exception(ex);
messageCombinator.Clear();//当组合发生异常时,应该清空组合器数据
}
}
break;
}
await e.InvokeNext();
}
}
使用消息组合器,实际上是由框架缓存了数据,所以,整体数据不要太大,不然可能会有爆内存的风险。
六、回复、响应数据
要回复Websocket
消息,必须使用HttpSessionClient对象。如果要获取到所有的HttpSessionClient
对象,则 必须通过HttpService
对象获取。
所以,首先,得确保能访问到HttpService
对象。
如果是在HttpService
的创建过程中,你可以直接访问到HttpService
对象。
但是如果在插件中,或者其他地方,你需要使用其他方法来实现。
6.1 在插件中获取HttpService
对象
插件是支持依赖注入的,所以可以直接注入HttpService
对象。然后在插件中获取。
例如:
【注册服务】
var service = new HttpService();
await service.SetupAsync(new TouchSocketConfig()//加载配置
...
.ConfigureContainer(a =>
{
a.RegisterSingleton<IHttpService>(service);
...
}));
await service.StartAsync();
【获取服务】
public class MyWebSocketPlugin : PluginBase,...
{
private readonly IHttpService m_httpService;
public MyWebSocketPlugin(ILog logger,IHttpService httpService)
{
this.m_logger = logger;
this.m_httpService = httpService;
}
}
除了使用容器,还可以在插件触发时,通过WebSocket
获取HttpService
对象。
public class MyWebSocketPlugin : PluginBase, IWebSocketReceivedPlugin
{
...
public async Task OnWebSocketReceived(IWebSocket client, WSDataFrameEventArgs e)
{
if (client.Client is IHttpSessionClient httpSessionClient)
{
var service = httpSessionClient.Service as IHttpService
}
await e.InvokeNext();
}
}
6.2 获取SessionClient
(1)直接获取所有在线客户端
通过service.Clients
属性,获取当前在线的所有客户端。
var clients = service.Clients;
foreach (var item in clients)
{
if (item.Protocol == Protocol.WebSocket)//先判断是不是websocket协议
{
if (item.Id == "id")//再按指定id发送,或者直接广播发送
{
await item.WebSocket.SendAsync("hello");
}
}
}
由于HttpSessionClient
的生命周期是由框架控制的,所以最好尽量不要直接引用该实例,可以引用HttpSessionClient.Id
,然后再通过服务器查找。
(2)通过Id获取
先调用service.GetIds
方法,获取当前在线的所有客户端的Id,然后选择需要的Id,通过TryGetClient
方法,获取到想要的客户端。
string[] ids = service.GetIds();
if (service.TryGetClient(ids[0], out HttpSessionClient SessionClient))
{
}
6.3 发送文本类消息
await webSocket.SendAsync("Text");
6.4 发送二进制消息
await webSocket.SendAsync(new byte[10]);
6.5 直接发送自定义构建的数据帧
WSDataFrame frame=new WSDataFrame();
frame.Opcode= WSDataType.Text;
frame.FIN= true;
frame.RSV1= true;
frame.RSV2= true;
frame.RSV3= true;
frame.AppendText("I");
frame.AppendText("Love");
frame.AppendText("U");
await webSocket.SendAsync(frame);
此部分功能就需要你对WebSocket
有充分了解才可以操作。
6.6 发送Ping
await webSocket.PingAsync();
6.7 发送Pong
await webSocket.PongAsync();
6.8 发送大数据
发送大数据时,需要分包发送,可以使用SendAsync
的重载方法,设置FIN
标志。
// 发送 "hello" 消息 100 次,最后一次设置 FIN 标志
var i = 0;
while (true)
{
if (i++ == 100)
{
await sessionClient.WebSocket.SendAsync("hello", true);
break;
}
await sessionClient.WebSocket.SendAsync("hello", false);
}
6.9 关闭连接
await webSocket.CloseAsync("关闭");