跳到主要内容

Tcp实现限流服务

· 阅读需 7 分钟
若汝棋茗
Software Engineer Ⅱ

一、引言

在一个阳光明媚的下午,我正在更新博客文章。忽然收到了一条来自群友的私信。这位朋友是一名对网络编程感兴趣的大学生,最近正在研究Tcp连接数量限流和接收流量限流。噼里啪啦的连续问了一通。

看到这个问题后,我感到很高兴,因为这正是我之前准备探讨的一个话题。不过这次打算讨论另外一种新的方式来实现。于是,我决定写一篇博客文章来详细解释这一机制。

二、技术细节

以前写过一篇博客,介绍如何使用在TcpService实现TCP接收流量限流,但是当时写的非常简单,也没有详细介绍原理。

所以这里就先升级一下逻辑,然后详细介绍下原理,以及实现代码。

首先,Tcp服务器使用TouchSocket-TcpService

因为他支持插件实现逻辑,非常方便维护。

然后限流算法使用微软提供的System.Threading.RateLimiting库,用来实现流量控制。

算法支持:

  • 并发限制
  • 令牌桶限制
  • 固定时间窗口限制
  • 滑动时间窗口限制

三、实践应用

基本上实现了连接数量限流功能和流量限流功能。

当连接数超过限制,则拒绝连接。在等待一定时间后,可以再接收连接。

当某个客户端接收的数据流量超过限制,则延缓接收。

四、代码实现

4种限流的逻辑在使用时大同小异,具体的参数可以参考微软官方说明,下面示例仅以固定时间窗口限制为例。

4.1 连接限流

对于连接限流,可以考虑使用插件来做,这样限流策略就可以复用,而且逻辑清晰,也便于维护。

首先,新建一个插件类,然后实现 ITcpConnectingPlugin 接口,如下:

internal class LimitNumberOfConnectionsPlugin : PluginBase, ITcpConnectingPlugin
{
private readonly ILog m_logger;

public LimitNumberOfConnectionsPlugin(ILog logger)
{
this.m_logger = logger;

logger.Info($"限制连接插件生效");
}

public async Task OnTcpConnecting(ITcpSession client, ConnectingEventArgs e)
{
await e.InvokeNext();
}
}

然后我们的想法就是在OnTcpConnecting方法中,进行连接限流。

我们这里的逻辑非常简单,就是,对所有连接进行统一限流,如果连接数超过我们指定的最大连接数,就直接拒绝连接。

然后限流算法使用FixedWindowRateLimiter

那首先初始化限流器,然后参数如下:

  • PermitLimit:一个窗口中允许的最大允许计数器数。
  • QueueLimit:排队获取请求的最大累积允许计数。
  • Window:指定请求的时间范围。
private RateLimiter m_rateLimiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions()
{
PermitLimit = 2,
QueueLimit = 4,
Window = TimeSpan.FromSeconds(5)
});

然后实现OnTcpConnecting方法的具体限流逻辑。

如下:

var rateLimitLease = this.m_rateLimiter.AttemptAcquire(1);

if (!rateLimitLease.IsAcquired)
{
e.IsPermitOperation = false;//表示不许连接
e.Handled = true;//并且已经处理该消息。
this.m_logger.Warning($"IP={client.IP}的客户端,连接数达到设置阈值。已拒绝连接。");
return;
}

最后把新建的插件添加到服务器上。

.ConfigurePlugins(a =>
{
a.Add<LimitNumberOfConnectionsPlugin>();
})

4.2 接收流量限流

和连接限流基本思路一致,只不过在接收到超多数据时,应该延迟接收,而不是断开。基本流程如下:

首先,新建一个插件类,实现ITcpReceivedPlugin

internal class LimitNumberOfReceivePlugin : PluginBase, ITcpReceivedPlugin
{
private readonly ILog m_logger;

public LimitNumberOfReceivePlugin(ILog logger)
{
this.m_logger = logger;
}

public async Task OnTcpReceived(ITcpSession client, ReceivedDataEventArgs e)
{
await e.InvokeNext();
}
}

然后只需要在OnTcpReceived中进行限流即可。

不过值得注意的是,和限制连接不同,限制流速,应该是只限制当前客户端的流量,而不是整个服务端。

所以,这里需要建立ITcpSessionRateLimiter的对应关系。

一般来说,我们可以使用字典,或者通过继承的方式来直接新增属性的方式来实现。但是考虑到扩展性和通用性,这里我们选择使用TouchSocket自带的扩展属性来实现。

首先,需要在插件内部声明一个扩展属性。确保ITcpSession在获取限流器时能得到与自身关联的限流器。

private static readonly DependencyProperty<RateLimiter> RateLimiterProperty = new DependencyProperty<RateLimiter>("RateLimiterProperty", OnCreateRateLimiter, false);

private static RateLimiter OnCreateRateLimiter(IDependencyObject @object)
{
var rateLimiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions()
{
AutoReplenishment = true,
PermitLimit = 10,
QueueLimit = 20000,
Window = TimeSpan.FromSeconds(5)
});

@object.SetValue(RateLimiterProperty, rateLimiter);
return rateLimiter;
}

然后,实现基本限流逻辑即可。

不过也要注意,请求的窗口大小,不能大于设置的最大数值。所以这块需要使用一个循环来处理。每次等待窗口时间,再重新计算。

public async Task OnTcpReceived(ITcpSession client, ReceivedDataEventArgs e)
{
var rateLimiter = client.GetValue(RateLimiterProperty);

var byteBlock = e.ByteBlock;
var length = byteBlock.Length;

while (length > 0)
{
var step=Math.Min(maxNumber, length);
await rateLimiter.AcquireAsync(step);
length -= step;

this.m_logger.Info($"收到数据:{byteBlock.ReadToSpan(step).ToString(Encoding.UTF8)}");
}

await e.InvokeNext();
}

最后把新建的插件添加到服务器上。

.ConfigurePlugins(a =>
{
a.Add<LimitNumberOfReceivePlugin>();
})

五、总结

本文详细介绍了如何使用TouchSocket-TcpService和System.Threading.RateLimiting库实现TCP连接数量限流和接收流量限流。提供了完整的代码示例,帮助读者理解和实现这些功能。

六、参考资料

  1. TouchSocket官网
  2. System.Threading.RateLimiting库
  3. Announcing Rate Limiting for .NET

七、本文示例Demo