自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

RPC框架編寫實(shí)踐-RPC常見限流方法的實(shí)現(xiàn)

開發(fā) 架構(gòu)
在微服務(wù)中, 雖然服務(wù)間的調(diào)用都是可信的, 但是服務(wù)端也需要堤防一些流量, 防止被意外的流量擊垮, 而通過限流可以防止問題的發(fā)生。此外, 使用不同的限流規(guī)則還能根據(jù)系統(tǒng)間不同服務(wù)的請(qǐng)求進(jìn)行限制, 解決某個(gè)函數(shù)被頻繁調(diào)用而拖垮整個(gè)系統(tǒng)的問題。

前記

在微服務(wù)中, 雖然服務(wù)間的調(diào)用都是可信的, 但是服務(wù)端也需要堤防一些流量, 防止被意外的流量擊垮, 而通過限流可以防止問題的發(fā)生。此外, 使用不同的限流規(guī)則還能根據(jù)系統(tǒng)間不同服務(wù)的請(qǐng)求進(jìn)行限制, 解決某個(gè)函數(shù)被頻繁調(diào)用而拖垮整個(gè)系統(tǒng)的問題。

NOTE: 雖然本文是在編寫RPC框架有感而發(fā), 但是也適用于常見的Web服務(wù)等有流量進(jìn)出的場(chǎng)景。

最新修訂見閱讀原文

1 限流的簡(jiǎn)介

1.1 限流的作用和場(chǎng)景

對(duì)于后端服務(wù)來說, 他們提供的服務(wù)都有一個(gè)極限的QPS(除代碼邏輯外,也跟機(jī)器配置有關(guān)), 當(dāng)服務(wù)端的壓力超過這個(gè)極限值的時(shí)候, 服務(wù)端的響應(yīng)性能就會(huì)快速的下降, 然后無法提供服務(wù), 所以服務(wù)端需要一個(gè)類似于可以限制請(qǐng)求數(shù)的功能, 使服務(wù)端能犧牲掉部分請(qǐng)求, 保證還能處理一定量的請(qǐng)求, 防止服務(wù)端出現(xiàn)壓力瓶頸,無法處理所有請(qǐng)求。

不過這個(gè)功能還需要盡量的智能, 在設(shè)計(jì)時(shí)可以根據(jù)流量場(chǎng)景不同來做有差別的限制, 使其在不影響其它請(qǐng)求的情況下, 實(shí)現(xiàn)部分請(qǐng)求的網(wǎng)絡(luò)流量整形, 達(dá)到減少系統(tǒng)資源消耗的效果, 常見的幾種需要做差別限制的場(chǎng)景如下:

場(chǎng)景 可能造成的影響 限流的作用
總體的API有大量的并發(fā)調(diào)用, 導(dǎo)致系統(tǒng)QPS超過設(shè)計(jì)值 機(jī)器可能會(huì)扛不住, 造成系統(tǒng)崩潰 減少進(jìn)入業(yè)務(wù)的流量, 保證QPS被限制在某個(gè)合理值, 其它請(qǐng)求會(huì)被丟棄
某個(gè)API耗時(shí)比較長(zhǎng), 其它API的QPS位于合理范圍內(nèi) 由于API耗時(shí)較長(zhǎng), 該API的調(diào)用次數(shù)變多的情況下, 會(huì)明顯消耗系統(tǒng)資源, 同時(shí)也可能造成數(shù)據(jù)競(jìng)爭(zhēng)的情況 針對(duì)性的限制耗時(shí)API, 防止該API引起系統(tǒng)崩潰
總體API的QPS位于合理范圍內(nèi), 但是有部分參數(shù)會(huì)引起較大的系統(tǒng)資源消耗 比如某個(gè)篩選參數(shù)造成查全表的情況, 此時(shí)可能造成數(shù)據(jù)庫處理能力下降,進(jìn)而造成后端服務(wù)無響應(yīng) 針對(duì)性的根據(jù)耗時(shí)API的參數(shù)進(jìn)行限制限制, 防止該API引起系統(tǒng)崩潰
總體API的QPS位于合理范圍內(nèi), 但某個(gè)API的某個(gè)參數(shù)被大多數(shù)人調(diào)用, 導(dǎo)致整個(gè)API無法提供服務(wù), 比如微博的話題功能, 如果有個(gè)爆炸性話題, 這個(gè)話題就會(huì)成為熱點(diǎn)參數(shù) 造成整個(gè)API無法使用, 嚴(yán)重時(shí)會(huì)造成整個(gè)服務(wù)不可用 通過對(duì)熱點(diǎn)參數(shù)的限制, 保證其它功能能正常使用

1.2 限流的組件

通過上述場(chǎng)景可以看到, 在這些場(chǎng)景中限流的作用是差不多的, 一般只涉及到兩個(gè)維度:

  • 時(shí)間:對(duì)某個(gè)時(shí)間窗口進(jìn)行限流
  • 資源:針對(duì)某個(gè)API或者某個(gè)API的參數(shù)進(jìn)行限流,達(dá)到保護(hù)后方對(duì)應(yīng)的資源。

限流可以保證在某段時(shí)間內(nèi)的某個(gè)資源的請(qǐng)求數(shù)量不會(huì)超過設(shè)計(jì)值, 達(dá)到保護(hù)系統(tǒng)的作用, 不過不同場(chǎng)景主要差別是限制的資源維度不一樣, 資源維度的變化從總體服務(wù)到某個(gè)API到某個(gè)API的某個(gè)參數(shù), 資源維度越來越細(xì), 而這個(gè)資源維度區(qū)分也就是我們要實(shí)現(xiàn)限流的第一步--流量匹配, 只要流量匹配了, 限流系統(tǒng)就可以開始工作了, 一般的限流系統(tǒng)流程圖如下(其中他服務(wù)核心代表微服務(wù)核心):

限流

流程圖中第一步是規(guī)則匹配, 它會(huì)通過一個(gè)函數(shù)把流量提取出來, 當(dāng)做Key, 這個(gè)Key等于某個(gè)資源, 然后判斷這個(gè)Key是否匹配到規(guī)則, 如果命中規(guī)則就開始執(zhí)行規(guī)則并結(jié)合這段規(guī)則和限流算法來判斷該流量是否限流, 如果限流就丟棄或者等待, 如果沒被限流, 就直接放行。

此外, 流程圖的最下層有一個(gè)很大的Backend, 它可以用來存儲(chǔ)規(guī)則以及存儲(chǔ)一些限流相關(guān)的計(jì)算變量。其中,限流相關(guān)的計(jì)算變量都是跟時(shí)間相關(guān)的, 且每次都要進(jìn)行讀寫, 最好的情況是放在內(nèi)存之中,不過它不能跟請(qǐng)求綁定在一起, 因?yàn)楦?dāng)前請(qǐng)求的生命周期不一樣, 不能在發(fā)送請(qǐng)求結(jié)束后就把變量回收了, 這些變量也需要有個(gè)容器可以存儲(chǔ), 供不同的請(qǐng)求讀寫, 但是在一個(gè)集群服務(wù)中, 每個(gè)機(jī)器都只存儲(chǔ)自己的計(jì)算變量則會(huì)導(dǎo)致多臺(tái)機(jī)器沒辦法共享數(shù)據(jù)而造成限流失敗。

比如針對(duì)某個(gè)用戶可以調(diào)用某個(gè)API的規(guī)則是一秒內(nèi)可以請(qǐng)求十次, 目前有十臺(tái)機(jī)器, 他們不會(huì)互相共享自己的限流計(jì)算變量, 那么在最壞的情況下, 用戶可以在1秒內(nèi)訪問100次請(qǐng)求而不被限流, 這樣是達(dá)不了限流的效果的, 所以限流必定是一個(gè)中心化的應(yīng)用。目前兩個(gè)比較主流的限流方案分別是網(wǎng)關(guān)限流和中間件限流, 網(wǎng)關(guān)限流場(chǎng)景下所有入站流量都會(huì)經(jīng)過網(wǎng)關(guān)這個(gè)單體, 然后由網(wǎng)關(guān)決定是否放行;而中間件限流則是把計(jì)算變量都存在某個(gè)中間件存儲(chǔ)中, 然后每個(gè)服務(wù)的限流組件都可以從中間件實(shí)時(shí)寫入和讀取數(shù)據(jù), 其中最常用的中間件是Redis, 因?yàn)镽edis的速度快, 能讓限流組件很快的判斷是否需要限流, 對(duì)機(jī)器的性能開銷占比也不是很多, 同時(shí)Redis支持的數(shù)據(jù)結(jié)構(gòu)和功能非常的多, 我們可以很容易的基于它來實(shí)現(xiàn)不同的限流算法。

至于限流的規(guī)則, 由于它只要寫入一次, 后面都是以讀為主, 所以在網(wǎng)關(guān)場(chǎng)景下都存在于內(nèi)存之中, 但在中間件場(chǎng)景下規(guī)則都是存在一個(gè)集中式存儲(chǔ)中, 如Etcd, 然后每個(gè)服務(wù)會(huì)同步集中式存儲(chǔ)的規(guī)則, 并寫入到自己的內(nèi)存中。

在實(shí)際的落地要選擇網(wǎng)關(guān)限流還是中間件限流主要還是取決于是服務(wù)的應(yīng)用場(chǎng)景, 比如接口外層都有加一層網(wǎng)關(guān), 那采用網(wǎng)關(guān)限流即可, 如果是內(nèi)部服務(wù)或者該服務(wù)的通信協(xié)議是自定義的, 則采用中間件方式, 有比較強(qiáng)的自定義性。

在使用Redis下的某些情況下(取決于搭建方式), 有可能造成數(shù)據(jù)不準(zhǔn)的情況, 但是限流的頻率是允許有些許誤差的, 比如限流的規(guī)則是1秒可以訪問100次, 但在某些時(shí)候只實(shí)現(xiàn)了1秒訪問110次也是沒太大關(guān)系的。

1.3 限流算法

上面所說的都是一些簡(jiǎn)單的概念, 而限流的核心是在于限流算法的實(shí)現(xiàn), 常見的限流算法有以下幾種(由于大多數(shù)都限流backend默認(rèn)是Redis, 所以以可以在Redis運(yùn)行的lua代碼示例):

1.3.1 固定窗口

固定窗口的原理比較簡(jiǎn)單,就是將時(shí)間切分成若干個(gè)時(shí)間片,每個(gè)時(shí)間片內(nèi)固定處理若干個(gè)請(qǐng)求。比如限流規(guī)則是10秒內(nèi)最多處理5個(gè)請(qǐng)求, 那么就會(huì)有一個(gè)容器來統(tǒng)計(jì)這10秒內(nèi)的請(qǐng)求數(shù), 如果容器的統(tǒng)計(jì)數(shù)量大于5, 那么后續(xù)的請(qǐng)求都會(huì)被拒絕, 然后每隔10秒重置這個(gè)容器的統(tǒng)計(jì)。這種實(shí)現(xiàn)非常簡(jiǎn)單, 但不是非常嚴(yán)謹(jǐn), 假如限制規(guī)則是1秒限制100個(gè), 但在最壞的情況下, 在第一個(gè)窗口的0.5秒后到第二個(gè)窗口的0.5秒前的這個(gè)時(shí)間點(diǎn)共計(jì)會(huì)放行200個(gè)請(qǐng)求, 所以固定窗口只適用于一些要求不嚴(yán)格的場(chǎng)景, 通過下圖的左圖可以看到限流的流程, 通過下圖的右圖可以看到整個(gè)限流曲線不平滑。

固定窗口

固定窗口的實(shí)現(xiàn)很簡(jiǎn)單, 在Redis中的lua代碼如下:

  1. -- keys為傳入的命令, 其中keys[1]為限流的key  
  2. -- argv為傳入的參數(shù), ARGV[1]為窗口限制量, ARGV[2]為窗口時(shí)間  
  3. local count 
  4. local limit = ARGV[1] 
  5. count = redis.call("incr",KEYS[1]) 
  6. if tonumber(count) == 1 then 
  7.     -- 返回1代表是第一個(gè), 該key剛被創(chuàng)建, 需要設(shè)置過期時(shí)間 
  8.     redis.call("expire",KEYS[1], ARGV[2]]) 
  9. end 
  10. return count<limit  

1.3.2 滑動(dòng)窗口

滑動(dòng)窗口是固定窗口的改進(jìn)方法, 他是通過增加窗口數(shù)量使限流算法更順滑, 本身從一個(gè)窗口變?yōu)橐粋€(gè)先進(jìn)先出的隊(duì)列, 隊(duì)列的內(nèi)容是更加精細(xì)的窗口,比如原來是10秒一個(gè)窗口, 現(xiàn)在會(huì)改為1秒一個(gè)窗口, 然后每隔一秒鐘滑動(dòng)一個(gè)窗口。只寫入最新的窗口而讀取判斷時(shí)都是取最近10個(gè)窗口, 這樣就可以通過減小粒度來讓限流算法更加精細(xì), 可以看到波動(dòng)幅度會(huì)變小(取決于精細(xì)程度):

滑動(dòng)窗口的實(shí)現(xiàn)也是很簡(jiǎn)單的, 具體見:RPC框架編寫實(shí)踐--服務(wù)治理的基石, 在Redis可以采用Zset數(shù)據(jù)結(jié)構(gòu)進(jìn)行實(shí)現(xiàn), 這里就不做代碼示例了?;瑒?dòng)窗口是犧牲一定的內(nèi)存來讓限流變得平滑,窗口數(shù)量越多, 限流速率越精細(xì), 占用的內(nèi)存就越大, 同時(shí)獲取數(shù)據(jù)時(shí)都是獲取一批窗口的數(shù)據(jù), 相比于固定窗口來說,它的時(shí)間復(fù)雜度也會(huì)跟著變多(O(k))。

1.3.3 漏桶

漏桶的出現(xiàn)可以完美的解決參差不齊的速率限制問題, 漏桶算法的核心原理是進(jìn)入漏桶的請(qǐng)求量不限制, 但能漏出去的速率請(qǐng)求是恒定的, 這樣就能完美的控制請(qǐng)求的速率, 如果桶滿了, 在漏桶里的請(qǐng)求就會(huì)溢出去, 達(dá)到丟棄請(qǐng)求的目的, 如下圖, 整個(gè)請(qǐng)求的速率都是很平滑的, 沒有多少毛尖:

從圖中可以看到, 漏桶的原理很像一個(gè)FIFO隊(duì)列, 然后有個(gè)定時(shí)器會(huì)以恒定的速率把請(qǐng)求取出來, 使用Python代碼實(shí)現(xiàn)如下:

  1. import asyncio 
  2.  
  3. # 假設(shè)容量只有10 
  4. import time 
  5.  
  6. leaky_bucket: asyncio.Queue = asyncio.Queue(10) 
  7. loop: asyncio.AbstractEventLoop = asyncio.get_event_loop() 
  8.  
  9.  
  10. async def demo_request(cnt: int) -> None: 
  11.     """模仿請(qǐng)求""" 
  12.     msg: str = f"I'm mock request:{cnt}" 
  13.     future: asyncio.Future = asyncio.Future() 
  14.     try: 
  15.         leaky_bucket.put_nowait(future) 
  16.     except asyncio.QueueFull: 
  17.         # 代表桶滿了, 溢出來, 該請(qǐng)求要提前拋棄 
  18.         print(f"Fail Request:{msg}"
  19.     else
  20.         # 等待放行 
  21.         await future 
  22.         print(time.time(), msg) 
  23.  
  24.  
  25. def timer() -> None: 
  26.     """定時(shí)放行請(qǐng)求""" 
  27.     try: 
  28.         # 放行該請(qǐng)求 
  29.         future: asyncio.Future = leaky_bucket.get_nowait() 
  30.         future.set_result(True
  31.     except asyncio.QueueEmpty: 
  32.         pass 
  33.     # 一秒執(zhí)行一次 
  34.     loop.call_later(1, timer) 
  35.  
  36. timer() 
  37. # 模擬并發(fā)12個(gè)請(qǐng)求 
  38. loop.run_until_complete(asyncio.gather(*[demo_request(i) for i in range(12)])) 

但是, 這樣實(shí)現(xiàn)的漏桶算法依然需要占用一些空間用來存儲(chǔ)等待放行的請(qǐng)求, 直到放行才被釋放。為了解決空間占用的問題, 可以采用GCRA算法, 它從另外一個(gè)角度看起來跟漏桶算法很像(GRRA應(yīng)該被認(rèn)為是計(jì)量器實(shí)現(xiàn)的漏桶版本, 而不是上面所說的隊(duì)列形漏桶), 但很省空間占用, 因?yàn)闊o論漏桶多大, 它的空間占用都是恒定的, 只需要存漏水速率(可以認(rèn)為是該時(shí)間段可以放行的請(qǐng)求量)以及桶目前的容量即可。

使用GCRA算法之所以能這樣省空間, 主要還是它是基于虛擬調(diào)度實(shí)現(xiàn)的, 它只需要存一個(gè)漏水速率,然后每次有請(qǐng)求進(jìn)來時(shí)判斷現(xiàn)在可否可以漏水, 如果可以就放行, 如果不可以則判斷桶是否滿, 滿則拋棄請(qǐng)求, 沒滿則讓請(qǐng)求等待, 直到可以放行為止。常見的GCRA限流實(shí)現(xiàn)一般都考慮使用redis-cell, 它的使用方法如下:

  1. # 第一個(gè)參數(shù)為命令, 第二個(gè)參數(shù)是要限流的key, 第三個(gè)參數(shù)是桶容量, 第四第五綜合起來為漏桶速率, 第五個(gè)為每次漏多少 
  2.  
  3. # 第一次請(qǐng)求放行, 可以發(fā)現(xiàn)容量變多了一個(gè) 
  4. 127.0.0.1:6379> CL.THROTTLE demo_leaky_bucket 2 1 10 1 
  5. 1) "0"     # 0表示允許, 1表示拒絕 
  6. 2) "3"     # 漏桶容量(會(huì)比輸入的多1) 
  7. 3) "2"     # 漏斗剩余空間 
  8. 4) "-1"    # 如果拒絕, 需要多長(zhǎng)時(shí)間后重試, 單位秒 
  9. 5) "10"    # 多少時(shí)間后,漏桶完全空了, 單位秒 
  10.  
  11. # 第二次請(qǐng)求被放行, 但是漏斗已經(jīng)被占了一個(gè)空位 
  12. 127.0.0.1:6379> CL.THROTTLE demo_leaky_bucket 2 1 10 1 
  13. 1) "0" 
  14. 2) "3" 
  15. 3) "1" 
  16. 4) "-1" 
  17. 5) "18" 
  18. # 第三次請(qǐng)求被放行, 但是漏斗已經(jīng)被占了兩個(gè)個(gè)空位 
  19. 127.0.0.1:6379> CL.THROTTLE demo_leaky_bucket 2 1 10 1 
  20. 1) "0" 
  21. 2) "3" 
  22. 3) "0" 
  23. 4) "-1" 
  24. 5) "27" 
  25. # 第四次請(qǐng)求不被放行, 但是漏斗沒有空位了 
  26. 127.0.0.1:6379> CL.THROTTLE demo_leaky_bucket 2 1 10 1 
  27. 1) "1" 
  28. 2) "3" 
  29. 3) "0" 
  30. 4) "6" 
  31. 5) "26" 

從命令可以看出, 即使漏斗中還有數(shù)據(jù)沒漏出去, 返回值得第一個(gè)也還是0, 表示放行, 這樣并不是一個(gè)完善的GCRA。為了實(shí)現(xiàn)一個(gè)完備的GCRA, 我們需要額外的在代碼判斷漏桶是否全空, 如果放行且桶不是全空, 則需要在代碼判斷多久后才能會(huì)為空, 這個(gè)時(shí)間也就是請(qǐng)求的等待放行時(shí)間, 在等待這段時(shí)間后才能放行請(qǐng)求, 如果不是放行, 則直接丟棄請(qǐng)求即可。

不過, 如果不做任何判斷, 直接使用返回值的第一個(gè)值來判斷是否放行請(qǐng)求, 那這個(gè)實(shí)現(xiàn)就很像下面所說的令牌桶的實(shí)現(xiàn)。

1.3.4 令牌桶

漏桶能很好的控制速率, 使其變得平滑, 但是它沒辦法應(yīng)對(duì)突發(fā)流量, 比如我們把規(guī)則定義為10秒內(nèi)可以請(qǐng)求10次, 對(duì)于漏桶來說, 它會(huì)控制為1秒放行一個(gè)請(qǐng)求, 如果同時(shí)收到10個(gè)請(qǐng)求時(shí)它則會(huì)分開10秒放行每個(gè)請(qǐng)求。然而10秒內(nèi)可以請(qǐng)求10次的含義是10秒內(nèi)總共可以請(qǐng)求10次, 也就是允許在這10秒內(nèi)的某個(gè)瞬間同時(shí)放行10個(gè)請(qǐng)求, 對(duì)于這個(gè)問題可以使用令牌桶來解決, 令牌桶和漏桶很像, 只是漏桶控制的是請(qǐng)求, 令牌桶控制的是令牌發(fā)放速度。

令牌桶算法規(guī)定每個(gè)請(qǐng)求需要從桶里拿到并消耗一個(gè)令牌才可以放行, 拿不到則會(huì)被拋棄, 同時(shí)令牌桶本身會(huì)以恒定的速率產(chǎn)生令牌, 直到桶滿為止, 這樣就可以保證限流的平緩, 同時(shí)又能應(yīng)對(duì)突發(fā)請(qǐng)求, 令牌桶的原理圖和限流曲線圖如下, 其中限流曲線圖表示初始時(shí)桶里面放滿了令牌, 所以放行的請(qǐng)求很多, 隨著令牌被逐漸消耗并消耗光了, 限流的曲率會(huì)穩(wěn)定在一條線上, 也就是令牌的生產(chǎn)速率:

同樣的, 在實(shí)現(xiàn)令牌桶時(shí)為了減少空間的占用, 也會(huì)使用虛擬調(diào)度方法, 只存一個(gè)時(shí)間和容量到內(nèi)存中, 每次收到請(qǐng)求時(shí)都會(huì)根據(jù)請(qǐng)求的時(shí)間和在內(nèi)存中的時(shí)間差值再乘以速率計(jì)算這段時(shí)間應(yīng)該產(chǎn)生的令牌數(shù)量并存到內(nèi)存中, 然后再判斷是否有足夠的令牌來判斷是否放行請(qǐng)求, 具體的Redis lua代碼實(shí)現(xiàn)如下:

  1. local key = KEYS[1] -- key 
  2. local current_time = redis.call('TIME')[1] -- redis時(shí)間戳 
  3. local interval_per_token = tonumber(ARGV[1]) --每個(gè)單位產(chǎn)生多少個(gè)token 
  4. local max_token = tonumber(ARGV[2]) -- 桶最大的量 
  5. local init_token = tonumber(ARGV[3]) -- 桶初始量 
  6. local tokens 
  7. -- 上次請(qǐng)求時(shí)保留的桶數(shù)據(jù) 
  8. local bucket = redis.call("hmget"key"last_time""last_token"
  9. local last_time= bucket[1] 
  10. local last_token = bucket[2] 
  11. if last_time == false or last_token == false then 
  12.     -- 如果沒數(shù)據(jù), 則代表該資源是第一次訪問, 進(jìn)行初始化 
  13.     tokens = init_token 
  14.     redis.call('hset'key'last_time'current_time
  15. else 
  16.     -- 算出間隔時(shí)間 
  17.     local this_interval = current_time - tonumber(last_time) 
  18.     if this_interval > 1 then 
  19.         -- 算出該時(shí)間應(yīng)該產(chǎn)生的令牌 
  20.         local tokens_to_add = math.floor(this_interval * interval_per_token) 
  21.         -- 算出真實(shí)可以擁有的令牌 
  22.         tokens = math.min(last_token + tokens_to_add, max_token) 
  23.         -- 保存數(shù)據(jù) 
  24.         redis.call('hset'key'last_time'current_time
  25.     else 
  26.         tokens = tonumber(last_token) 
  27.     end 
  28. end 
  29. if tokens < 1 then 
  30.     -- 令牌不夠消費(fèi) 
  31.     redis.call('hset'key'last_token', tokens) 
  32.     return -1 
  33. else 
  34.     -- 消費(fèi)令牌并返回令牌數(shù), 代表可以消費(fèi) 
  35.     tokens = tokens - 1 
  36.     redis.call('hset'key'last_token', tokens) 
  37.     return tokens 
  38. end 

2.具體實(shí)現(xiàn)

上面說完了算法實(shí)現(xiàn)后, 接下來來看看該如何結(jié)合算法進(jìn)行實(shí)現(xiàn), 由于代碼會(huì)隨時(shí)更新,具體源碼更新見:https://github.com/so1n/rap/tree/master/rap/server/plugin/processor/limit

項(xiàng)目的代碼結(jié)構(gòu)如下, 在常見的后端服務(wù)中需要占用空間少, 然后速度盡量快點(diǎn)限流組件, 所以一般只用漏桶或者令牌桶且基于Redis的實(shí)現(xiàn), 這里就不會(huì)去實(shí)現(xiàn)窗口相關(guān)的限流了:

  1. ├── backend  # 算法 
  2. │   ├── base.py  # 封裝的協(xié)議 
  3. │   └── redis.py  # 基于redis當(dāng)做banckend的算法實(shí)現(xiàn) 
  4. ├── core.py  # 核心判斷代碼, 實(shí)際上是一個(gè)中間流量處理 
  5. ├── rule.py  # 規(guī)則聲明 
  6. └── util.py  # 其它小代碼 

首先是rule.py里的規(guī)則類, 它主要是聲明了限流速率, 初始化token數(shù)量, 最多的tokens數(shù)量以及停用時(shí)間, 其中停用時(shí)間是用來防止惡意用戶頻繁刷新, 它的邏輯是當(dāng)漏桶已經(jīng)滿了或者令牌桶沒有令牌的時(shí)候, 限流組件會(huì)在停用時(shí)間內(nèi)不再提供服務(wù)。

然后就是backend.base.py, 它是一個(gè)限流算法的統(tǒng)一封裝, 代碼如下:

  1. from typing import Any, Coroutine, Union 
  2.  
  3. from rap.server.plugin.processor.limit.rule import Rule 
  4.  
  5.  
  6. class BaseLimitBackend(object): 
  7.     def can_requests(self, key: str, ruleRule, token_num: int = 1) -> Union[bool, Coroutine[AnyAny, bool]]: 
  8.         raise NotImplementedError 
  9.  
  10.     def expected_time(self, key: str, ruleRule) -> Union[float, Coroutine[AnyAnyfloat]]: 
  11.         raise NotImplementedError 

這個(gè)類它聲明了兩個(gè)方法, 一個(gè)是can_request, 它會(huì)根據(jù)算法來判斷是否放行, 如果需要等待, 則會(huì)在這個(gè)方法里進(jìn)行等待, 直到到時(shí)間后才返回放行標(biāo)記, 其中can_request還內(nèi)嵌了一個(gè)block_time的邏輯;另外一個(gè)是expected_time, 用來獲取下次可用的時(shí)間, 具體的實(shí)現(xiàn)以RedisCellBackend為例子, 它是一個(gè)子類。

它的最上層實(shí)現(xiàn)是BaseLimitBackend, 然后就是繼承于BaseLimitBackend的BaseRedisBackend, 這個(gè)組件Redis限流算法的基礎(chǔ)實(shí)現(xiàn), 主要是實(shí)現(xiàn)了一個(gè)停用時(shí)間的邏輯, 當(dāng)發(fā)現(xiàn)不放行請(qǐng)求的時(shí)候, 會(huì)啟用停用邏輯, 以停用后續(xù)相同key的請(qǐng)求:

  1. class BaseRedisBackend(BaseLimitBackend, ABC): 
  2.     def __init__(self, redis: Union[StrictRedis, StrictRedisCluster]): 
  3.         # 初始化Redis模塊 
  4.         self._redis: "Union[StrictRedis, StrictRedisCluster]" = redis 
  5.  
  6.     async def _block_time_handle(self, key: str, ruleRule, func: Callable[..., Awaitable[bool]]) -> bool: 
  7.         """處理block_time邏輯""" 
  8.         block_time_key: str = f"{key}:block_time" 
  9.         bucket_block_time: Optional[int] = rule.block_time 
  10.  
  11.         if bucket_block_time is not None and await self._redis.exists(block_time_key): 
  12.             # 啟用block_time邏輯, 且key已經(jīng)存在, 那么直接返回False告訴該請(qǐng)求應(yīng)該被拒絕  
  13.             return False 
  14.  
  15.         # 執(zhí)行正真的判斷是否限流邏輯 
  16.         can_requests: bool = await func() 
  17.  
  18.         if not can_requests and bucket_block_time is not None: 
  19.             # 啟用block_time邏輯且被限流時(shí), 正式啟用block time邏輯  
  20.             await self._redis.set(block_time_key, bucket_block_time, ex=bucket_block_time) 
  21.  
  22.         return can_requests 

接著就是繼承于BaseRedisBackend的BaseRedisCellBackend, 它主要是提供一個(gè)命令調(diào)用的封裝以及獲取還有多久后才能請(qǐng)求的封裝:

  1. class BaseRedisCellBackend(BaseRedisBackend): 
  2.     ""
  3.     use redis-cell module 
  4.     learn more:https://github.com/brandur/redis-cell 
  5.  
  6.     input: CL.THROTTLE user123 15 30 60 1 
  7.         # param  |  desc 
  8.         # user123 key 
  9.         # 15 maxburst 
  10.         # 30 token 
  11.         # 60 seconds 
  12.         # 1 apply 1token 
  13.     output
  14.         1) (integer) 0        # is allowed 
  15.         2) (integer) 16       # total bucket num 
  16.         3) (integer) 15       # the remaining limit of the key
  17.         4) (integer) -1       # the number of seconds until the user should retry, 
  18.                               #   and always -1 if the action was allowed. 
  19.         5) (integer) 2        # The number of seconds until the limit will reset to its maximum capacity 
  20.     ""
  21.  
  22.     async def _call_cell(self, key: str, ruleRule, token_num: int = 1) -> List[int]: 
  23.         """調(diào)用redis_cell""" 
  24.         result: List[int] = await self._redis.execute_command( 
  25.             "CL.THROTTLE"keyrule.max_token - 1, rule.gen_token, int(rule.total_second), token_num 
  26.         ) 
  27.         return result 
  28.  
  29.     def expected_time(self, key: str, ruleRule) -> Union[float, Coroutine[AnyAnyfloat]]: 
  30.         """獲取下次可請(qǐng)求時(shí)間""" 
  31.         async def _expected_time() -> float
  32.             block_time_key: str = key + ":block_time" 
  33.             block_time = await self._redis.get(block_time_key) 
  34.             if block_time: 
  35.                 return await self._redis.ttl(block_time_key) 
  36.  
  37.             result: List[int] = await self._call_cell(keyrule, 0) 
  38.             return float(max(result[3], 0)) 
  39.  
  40.         return _expected_time() 

最后就是真正的對(duì)外使用的限流組件實(shí)現(xiàn), 這個(gè)實(shí)現(xiàn)是基于漏桶算法的, 它繼承于BaseRedisCellBackend(另外一個(gè)繼承于BaseRedisCellBackend的實(shí)現(xiàn)是基于令牌桶算法的, 可以通過源碼了解), 可以看到非常的簡(jiǎn)單, 本質(zhì)上是基于redis-cell的返回判斷是否放行。

  1. class RedisCellBackend(BaseRedisCellBackend): 
  2.  
  3.     def can_requests(self, key: str, ruleRule, token_num: int = 1) -> Union[bool, Coroutine[AnyAny, bool]]: 
  4.         """通過redis-cell判斷是否可以請(qǐng)求,以及是否需要休眠等待, 如果需要?jiǎng)t休眠固定的時(shí)間后再放行""" 
  5.         async def _can_requests() -> bool: 
  6.             result: List[int] = await self._call_cell(keyrule, token_num) 
  7.             can_requests: bool = result[0] == 0 
  8.             if can_requests and result[4]: 
  9.                 await asyncio.sleep(result[4]) 
  10.             return can_requests 
  11.  
  12.         return self._block_time_handle(keyrule, _can_requests) 

了解完算法的實(shí)現(xiàn)后, 接下來就是核心的判斷邏輯, 具體見注釋:

  1. class LimitProcessor(BaseProcessor): 
  2.     def __init__(self, backend: BaseLimitBackend, rule_list: List[Tuple[RULE_FUNC_TYPE, Rule]]): 
  3.         """初始化規(guī)則和算法邏輯, 這里的規(guī)則之所以是使用傳參的方式是僅供參考, 后續(xù)整個(gè)框架的配置都會(huì)抽離成一個(gè)config, 供其它組件調(diào)用""" 
  4.         self._backend: BaseLimitBackend = backend 
  5.         self._rule_list: List[Tuple[RULE_FUNC_TYPE, Rule]] = rule_list 
  6.  
  7.     async def process_request(self, request: Request) -> Request: 
  8.         # not limit client event 
  9.         if request.msg_type == constant.CLIENT_EVENT: 
  10.             # 屏蔽event請(qǐng)求 
  11.             return request 
  12.  
  13.         for func, rule in self._rule_list: 
  14.             # 獲取該請(qǐng)求的key 
  15.             if inspect.iscoroutinefunction(func): 
  16.                 key, is_ignore_limit = await func(request)  # type: ignore 
  17.             else
  18.                 key, is_ignore_limit = func(request) 
  19.             if is_ignore_limit: 
  20.                 # 如果該請(qǐng)求不應(yīng)該限流, 直接跳過限流邏輯 
  21.                 return request 
  22.             if key
  23.                 # 匹配到key, 進(jìn)入限流邏輯 
  24.                 break 
  25.         else
  26.             raise TooManyRequest() 
  27.  
  28.         # 通過backend判斷是否限流 
  29.         key = f"rap:processor:{self.__class__.__name__}:{key}" 
  30.         can_requests: Union[bool, Awaitable[bool]] = self._backend.can_requests(keyrule
  31.         if inspect.isawaitable(can_requests): 
  32.             can_requests = await can_requests  # type: ignore 
  33.         if not can_requests: 
  34.             # 如果被限流, 返回異常, 并告知要何時(shí)后才可以再次請(qǐng)求 
  35.             expected_time: Union[float, Awaitable[float]] = self._backend.expected_time(keyrule
  36.             if inspect.isawaitable(expected_time): 
  37.                 expected_time = await expected_time  # type: ignore 
  38.             raise TooManyRequest(extra_msg=f"expected time: {expected_time}"
  39.         return request 

至此, 整個(gè)限流邏輯實(shí)現(xiàn)完畢, 本章內(nèi)容完。

3.其它碎碎念

3.1.熱點(diǎn)參數(shù)實(shí)現(xiàn)

由于大部分的限流實(shí)現(xiàn)的backend都只要依賴于Redis, 所以代碼倉里面只有Redis一種類型的backend,但是有一些限流實(shí)現(xiàn)需要依賴于一些特殊的backend,比如熱點(diǎn)參數(shù)限流, 還有蜜罐之類的場(chǎng)景。

以熱點(diǎn)參數(shù)限流場(chǎng)景為例子, 熱點(diǎn)參數(shù)是一個(gè)寫大于讀的應(yīng)用場(chǎng)景, 而且跟時(shí)間強(qiáng)相關(guān), 所以選用時(shí)序數(shù)據(jù)庫做backend,之前選用過Graphite當(dāng)做backend, 具體實(shí)現(xiàn)如圖后端服務(wù)會(huì)把每次請(qǐng)求參數(shù)都記錄到時(shí)序數(shù)據(jù)庫中, 并使用一個(gè)定時(shí)腳本每隔一段時(shí)間把最近的熱點(diǎn)參數(shù)數(shù)據(jù)拉取到緩存中, 供后端服務(wù)的限流組件判斷是否該放行。其中, 這個(gè)間隔一般控制在1秒左右, 所以這是一個(gè)近實(shí)時(shí)的實(shí)現(xiàn), 具體的實(shí)現(xiàn)圖如下:

當(dāng)請(qǐng)求進(jìn)來的時(shí)候, 限流中間件會(huì)通過異步的方法把數(shù)據(jù)記錄到時(shí)序數(shù)據(jù)庫中, 比如一個(gè)請(qǐng)求為http://127.0.0.1:80?q=1&b=2,中間件就會(huì)發(fā)送一個(gè)以{prefix}.hot_param.b=2&b=2為key, value為1的標(biāo)準(zhǔn)Statsd的count類型數(shù)據(jù)到Statsd組件中。

這個(gè)Key采用標(biāo)準(zhǔn)的Statsd命令, 以.分割有三個(gè)值, 第一個(gè)是前綴它與業(yè)務(wù)相關(guān), 如業(yè)務(wù)名, 函數(shù)名,namespace等等; 第二個(gè)是代表是熱點(diǎn)參數(shù)的業(yè)務(wù);第三個(gè)是參數(shù)Key, 這里以&為分割號(hào), 然后按Key順序排序,重新拼接為一個(gè)字符串, 這樣即使請(qǐng)求時(shí)順序不一致也能識(shí)別到時(shí)同種請(qǐng)求。

Statsd組件收到了數(shù)據(jù)后會(huì)自行進(jìn)行統(tǒng)計(jì), 統(tǒng)計(jì)一個(gè)時(shí)間區(qū)間都數(shù)據(jù)并寫入到Graphite中, 然后通過定時(shí)腳本使用Graphite API拉取統(tǒng)計(jì)次數(shù)大于條件的數(shù)據(jù)寫入到Redis緩存中, 其中Statsd組件的時(shí)間區(qū)間和定時(shí)腳本的定時(shí)時(shí)間都會(huì)控制在一秒左右, 所以這是一個(gè)近實(shí)時(shí)的實(shí)現(xiàn), 在計(jì)算性能消耗和實(shí)現(xiàn)效果直接做取舍。

數(shù)據(jù)寫入到Redis后, 這個(gè)寫入數(shù)據(jù)和統(tǒng)計(jì)的異步流程就結(jié)束了, 中間件在記錄數(shù)據(jù)后, 會(huì)通過Redis判斷是否是熱點(diǎn)參數(shù), 并根據(jù)規(guī)則判斷是否放行, 到了這里就跟上面的限流流程差不多了。

3.2.限流算法拓展

 

限流算法不止是用于算法, 也可以用于別的地方,比如有一些游戲活動(dòng),體力值滿時(shí)為5, 然后玩家每次出發(fā)活動(dòng)會(huì)減少1個(gè)體力值, 然后可以使用限流算法每隔一個(gè)固定時(shí)間則增加一個(gè)體力值等等。在遇到業(yè)務(wù)需求有跟時(shí)間相關(guān)且像上述所說的體力值會(huì)恢復(fù)的情況時(shí), 可以往限流算法思考。

 

責(zé)任編輯:武曉燕 來源: 博海拾貝diary
相關(guān)推薦

2022-08-15 08:01:35

微服務(wù)框架RPC

2012-10-10 09:14:50

PHPRPCPHP框架

2023-01-18 08:32:13

2014-09-02 10:43:45

RedisRPC

2024-01-02 12:17:44

Go傳統(tǒng)遠(yuǎn)程

2022-02-14 21:17:21

RPC框架協(xié)議

2012-11-23 14:26:40

IBMdW

2011-04-06 09:39:49

mysql5存儲(chǔ)

2011-02-17 09:45:40

云計(jì)算RPC框架

2021-04-21 08:01:31

Googleprotobuf嵌入式系統(tǒng)

2024-07-02 10:40:35

2021-11-15 14:02:27

RPCSpringBootRabbitMQ

2018-08-02 15:24:05

RPCJava微服務(wù)

2022-01-10 17:18:26

框架 RPC架構(gòu)

2017-01-13 10:51:13

RPC模型解析

2020-11-02 08:19:18

RPC框架Java

2020-10-20 17:35:42

srpcRPC語言

2023-03-06 07:28:57

RPC框架序列化

2024-05-31 08:45:24

2019-08-21 08:44:52

RPC框架Java
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)