兩種基于時間窗口的限流器的簡單實(shí)現(xiàn)
之前開發(fā)的一款基于OpenTelemetry的Tracing組件需要使用基于速率限制(Rate Limiting)的跟蹤采樣策略,本想使用現(xiàn)有的解決方案,比如System.Threading.RateLimiting命名空間下的RateLimiter。大體看了RateLimiter的三種實(shí)現(xiàn)(固定窗口、滑動窗口和令牌桶),覺得過于相對復(fù)雜了點(diǎn),代碼還涉及到鎖,而且提供的功能我也不太需要,于是嘗試實(shí)現(xiàn)一種簡單且無鎖解決方案。
一、滑動時間窗口
我為RateLimiter定義了如下這個簡單的IRateLimiter接口,唯一的無參方法TryAcquire利用返回的布爾值確定當(dāng)前是否超出設(shè)定的速率限制。我只提供的兩種基于時間窗口的實(shí)現(xiàn),如下所示的基于“滑動時間窗口”的實(shí)現(xiàn)類型SliddingWindowRateLimiter,我們在構(gòu)造的時候指定時間窗口和閾值。SliddingWindowRateLimiter采用一種“討巧”的實(shí)現(xiàn),它直接利用了BoundedChannel<DateTimeOffset>對象,我們將指定的閾值作為它的最大容量。
public interface IRateLimiter
{
bool TryAcquire();
}
public sealed class SliddingWindowRateLimiter: IRateLimiter
{
private readonly TimeSpan _window;
private readonly ChannelReader<DateTimeOffset> _reader;
private readonly ChannelWriter<DateTimeOffset> _writer;
public SliddingWindowRateLimiter(TimeSpan window, int permit)
{
_window = window;
var options = new BoundedChannelOptions (permit)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false,
SingleWriter = true
};
var channel = Channel
.CreateBounded<DateTimeOffset>(options);
_reader = channel.Reader;
_writer = channel.Writer;
Task.Factory.StartNew(
Trim,TaskCreationOptions.LongRunning);
}
public bool TryAcquire()
=> _writer.TryWrite(DateTimeOffset.UtcNow);
private void Trim()
{
if (!_reader.TryPeek(out var timestamp))
{
Task.Delay(_window).Wait();
Trim();
}
else
{
var delay = _window
- (DateTimeOffset.UtcNow - timestamp);
if (delay > TimeSpan.Zero)
{
Task.Delay(delay).Wait();
Trim();
}
else
{
var valueTask = _reader.ReadAsync();
if (!valueTask.IsCompleted)
_ = valueTask.Result;
Trim();
}
}
}
}
在實(shí)現(xiàn)的TryAcquire方法中,我們試著將當(dāng)前時間戳寫入這個Channel,并將寫入的結(jié)果(成功或者失?。┳鳛榉祷刂怠榱俗孋hannel中只包含指定時間窗口的時間戳,我們利用一個LongRuning的Task執(zhí)行Trim方法對過期的時間戳進(jìn)行“裁剪”。Trim會調(diào)用ChannelReader的TRyPeek方法,如果返回False,意味著Channel為空,此時會等待一段窗口時間再進(jìn)行“裁剪”。如果提取出來時間戳在Now-Window與當(dāng)前時間之間,意味著Channel里面的時間戳均在設(shè)定的窗口內(nèi),此時同樣需要等待,等待時間為Window - (Now - Timestamp);只有在提取的時間超出窗口范圍,我們才需要將其從Channel中移除。
var limiter = new SliddingWindowRateLimiter(
TimeSpan.FromSeconds(2),2);
var index = 0;
await Task.WhenAll( Enumerable.Range(1, 100)
.Select(_ => Task.Run(() => {
while (true)
{
if (limiter.TryAcquire())
{
Console.WriteLine(
$"[{DateTimeOffset.Now}]{Interlocked.Increment(ref index)}");
}
}
})));
我們在上面的演示程序中使用這個SliddingWindowRateLimiter,設(shè)定的限速規(guī)則為 2/2s。我們創(chuàng)建了100個Task并發(fā)地調(diào)用這個SliddingWindowRateLimiter,并將它返回True時的時間戳顯示出來,具體輸出如下所示。
圖片
二、固定時間窗口
如下這個FixedWindowRateLimiter類型是針對“固定窗口”的實(shí)現(xiàn),字段_windowTicks和_permit同樣表示時間窗口的時長(這里我們使用Int64類型的Ticks屬性)和閾值。_nextWindowStartTimeTicks表示下一次固定窗口的起始時間,這個需要動態(tài)調(diào)整,為了確保只有一個線程能夠修改它,我們定義了_windowReseting這個“信號量”。_count是一個計數(shù)器,我們使用它確定是否“超速”。
public sealed class FixedWindowRateLimiter : IRateLimiter
{
private readonly long _windowTicks;
private readonly int _permit;
private long _nextWindowStartTimeTicks;
private volatile int _count = 0;
public FixedWindowRateLimiter(TimeSpan window, int permit)
{
_windowTicks = window.Ticks;
_permit = permit;
_nextWindowStartTimeTicks
= DateTimeOffset.UtcNow.Add(window).Ticks;
}
public bool TryAcquire()
{
// 超出時間窗口,重置計數(shù)器,并調(diào)整下一個時間窗口的開始時間
var now = DateTimeOffset.UtcNow.Ticks;
var nextWindowStartTimeTicks = nextWindowStartTimeTicks;
if (now >= nextWindowStartTimeTicks
&& Interlocked.CompareExchange(
ref _nextWindowStartTimeTicks
, now + _windowTicks, nextWindowStartTimeTicks)
== nextWindowStartTimeTicks)
{
Interlocked.Exchange(ref _count, 1);
return true;
}
return _count < _permit
&& Interlocked.Increment(ref _count) <= _permit;
}
}
在實(shí)現(xiàn)的TryAcquire方法中,我們先確定當(dāng)前時間是否超過了設(shè)定的“下一個窗口開始時間”,如果是則調(diào)用Interlocked.CompareExchange方法修改__nextWindowStartTimeTicks字段。成功修改__nextWindowStartTimeTicks的線程會調(diào)整窗口開始時間,并重置計數(shù)器_count為1,并返回True。如果計數(shù)器大于等于設(shè)定閾值,方法返回False。否則我們讓計數(shù)器+1,如果該值<=閾值,返回True,否則返回False。
IRateLimiter limiter = new FixedWindowRateLimiter(
window: TimeSpan.FromSeconds(2), permit: 2);
var index = 0;
await Task.WhenAll( Enumerable.Range(1, 100)
.Select(_ => Task.Run(() => {
while (true)
{
if (limiter.TryAcquire())
{
Console.WriteLine(
$"[{DateTimeOffset.Now}]{Interlocked.Increment(ref index)}");
}
}
})));
將FixedWindowRateLimiter應(yīng)用到上面的演示程序,依然能得到我們希望的輸出結(jié)果。