服務(wù)限流,我有六種實(shí)現(xiàn)方式…
哈嘍大家好啊,我是Hydra,今天來(lái)和大家聊聊服務(wù)的限流。
服務(wù)限流,是指通過(guò)控制請(qǐng)求的速率或次數(shù)來(lái)達(dá)到保護(hù)服務(wù)的目的,在微服務(wù)中,我們通常會(huì)將它和熔斷、降級(jí)搭配在一起使用,來(lái)避免瞬時(shí)的大量請(qǐng)求對(duì)系統(tǒng)造成負(fù)荷,來(lái)達(dá)到保護(hù)服務(wù)平穩(wěn)運(yùn)行的目的。下面就來(lái)看一看常見(jiàn)的6種限流方式,以及它們的實(shí)現(xiàn)與使用。
固定窗口算法
固定窗口算法通過(guò)在單位時(shí)間內(nèi)維護(hù)一個(gè)計(jì)數(shù)器,能夠限制在每個(gè)固定的時(shí)間段內(nèi)請(qǐng)求通過(guò)的次數(shù),以達(dá)到限流的效果。
算法實(shí)現(xiàn)起來(lái)也比較簡(jiǎn)單,可以通過(guò)構(gòu)造方法中的參數(shù)指定時(shí)間窗口大小以及允許通過(guò)的請(qǐng)求數(shù)量,當(dāng)請(qǐng)求進(jìn)入時(shí)先比較當(dāng)前時(shí)間是否超過(guò)窗口上邊界,未越界且未超過(guò)計(jì)數(shù)器上限則可以放行請(qǐng)求。
@Slf4j
public class FixedWindowRateLimiter {
// 時(shí)間窗口大小,單位毫秒
private long windowSize;
// 允許通過(guò)請(qǐng)求數(shù)
private int maxRequestCount;
// 當(dāng)前窗口通過(guò)的請(qǐng)求計(jì)數(shù)
private AtomicInteger count=new AtomicInteger(0);
// 窗口右邊界
private long windowBorder;
public FixedWindowRateLimiter(long windowSize,int maxRequestCount){
this.windowSize = windowSize;
this.maxRequestCount = maxRequestCount;
windowBorder = System.currentTimeMillis()+windowSize;
}
public synchronized boolean tryAcquire(){
long currentTime = System.currentTimeMillis();
if (windowBorder < currentTime){
log.info("window reset");
do {
windowBorder += windowSize;
}while(windowBorder < currentTime);
count=new AtomicInteger(0);
}
if (count.intValue() < maxRequestCount){
count.incrementAndGet();
log.info("tryAcquire success");
return true;
}else {
log.info("tryAcquire fail");
return false;
}
}
}
進(jìn)行測(cè)試,允許在1000毫秒內(nèi)通過(guò)5個(gè)請(qǐng)求:
void test() throws InterruptedException {
FixedWindowRateLimiter fixedWindowRateLimiter
= new FixedWindowRateLimiter(1000, 5);
for (int i = 0; i < 10; i++) {
if (fixedWindowRateLimiter.tryAcquire()) {
System.out.println("執(zhí)行任務(wù)");
}else{
System.out.println("被限流");
TimeUnit.MILLISECONDS.sleep(300);
}
}
}
運(yùn)行結(jié)果:
固定窗口算法的優(yōu)點(diǎn)是實(shí)現(xiàn)簡(jiǎn)單,但是可能無(wú)法應(yīng)對(duì)突發(fā)流量的情況,比如每秒允許放行100個(gè)請(qǐng)求,但是在0.9秒前都沒(méi)有請(qǐng)求進(jìn)來(lái),這就造成了在0.9秒到1秒這段時(shí)間內(nèi)要處理100個(gè)請(qǐng)求,而在1秒到1.1秒間可能會(huì)再進(jìn)入100個(gè)請(qǐng)求,這就造成了要在0.2秒內(nèi)處理200個(gè)請(qǐng)求,這種流量激增就可能導(dǎo)致后端服務(wù)出現(xiàn)異常。
滑動(dòng)窗口算法
滑動(dòng)窗口算法在固定窗口的基礎(chǔ)上,進(jìn)行了一定的升級(jí)改造。它的算法的核心在于將時(shí)間窗口進(jìn)行了更精細(xì)的分片,將固定窗口分為多個(gè)小塊,每次僅滑動(dòng)一小塊的時(shí)間。
并且在每個(gè)時(shí)間段內(nèi)都維護(hù)了單獨(dú)的計(jì)數(shù)器,每次滑動(dòng)時(shí),都減去前一個(gè)時(shí)間塊內(nèi)的請(qǐng)求數(shù)量,并再添加一個(gè)新的時(shí)間塊到末尾,當(dāng)時(shí)間窗口內(nèi)所有小時(shí)間塊的計(jì)數(shù)器之和超過(guò)了請(qǐng)求閾值時(shí),就會(huì)觸發(fā)限流操作。
看一下算法的實(shí)現(xiàn),核心就是通過(guò)一個(gè)int類(lèi)型的數(shù)組循環(huán)使用來(lái)維護(hù)每個(gè)時(shí)間片內(nèi)獨(dú)立的計(jì)數(shù)器:
@Slf4j
public class SlidingWindowRateLimiter {
// 時(shí)間窗口大小,單位毫秒
private long windowSize;
// 分片窗口數(shù)
private int shardNum;
// 允許通過(guò)請(qǐng)求數(shù)
private int maxRequestCount;
// 各個(gè)窗口內(nèi)請(qǐng)求計(jì)數(shù)
private int[] shardRequestCount;
// 請(qǐng)求總數(shù)
private int totalCount;
// 當(dāng)前窗口下標(biāo)
private int shardId;
// 每個(gè)小窗口大小,毫秒
private long tinyWindowSize;
// 窗口右邊界
private long windowBorder;
public SlidingWindowRateLimiter(long windowSize, int shardNum, int maxRequestCount) {
this.windowSize = windowSize;
this.shardNum = shardNum;
this.maxRequestCount = maxRequestCount;
shardRequestCount = new int[shardNum];
tinyWindowSize = windowSize/ shardNum;
windowBorder=System.currentTimeMillis();
}
public synchronized boolean tryAcquire() {
long currentTime = System.currentTimeMillis();
if (currentTime > windowBorder){
do {
shardId = (++shardId) % shardNum;
totalCount -= shardRequestCount[shardId];
shardRequestCount[shardId]=0;
windowBorder += tinyWindowSize;
}while (windowBorder < currentTime);
}
if (totalCount < maxRequestCount){
log.info("tryAcquire success,{}",shardId);
shardRequestCount[shardId]++;
totalCount++;
return true;
}else{
log.info("tryAcquire fail,{}",shardId);
return false;
}
}
}
進(jìn)行一下測(cè)試,對(duì)第一個(gè)例子中的規(guī)則進(jìn)行修改,每1秒允許100個(gè)請(qǐng)求通過(guò)不變,在此基礎(chǔ)上再把每1秒等分為10個(gè)0.1秒的窗口。
void test() throws InterruptedException {
SlidingWindowRateLimiter slidingWindowRateLimiter
= new SlidingWindowRateLimiter(1000, 10, 10);
TimeUnit.MILLISECONDS.sleep(800);
for (int i = 0; i < 15; i++) {
boolean acquire = slidingWindowRateLimiter.tryAcquire();
if (acquire){
System.out.println("執(zhí)行任務(wù)");
}else{
System.out.println("被限流");
}
TimeUnit.MILLISECONDS.sleep(10);
}
}
查看運(yùn)行結(jié)果:
程序啟動(dòng)后,在先休眠了一段時(shí)間后再發(fā)起請(qǐng)求,可以看到在0.9秒到1秒的時(shí)間窗口內(nèi)放行了6個(gè)請(qǐng)求,在1秒到1.1秒內(nèi)放行了4個(gè)請(qǐng)求,隨后就進(jìn)行了限流,解決了在固定窗口算法中相鄰時(shí)間窗口內(nèi)允許通過(guò)大量請(qǐng)求的問(wèn)題。
滑動(dòng)窗口算法通過(guò)將時(shí)間片進(jìn)行分片,對(duì)流量的控制更加精細(xì)化,但是相應(yīng)的也會(huì)浪費(fèi)一些存儲(chǔ)空間,用來(lái)維護(hù)每一塊時(shí)間內(nèi)的單獨(dú)計(jì)數(shù),并且還沒(méi)有解決固定窗口中可能出現(xiàn)的流量激增問(wèn)題。
漏桶算法
為了應(yīng)對(duì)流量激增的問(wèn)題,后續(xù)又衍生出了漏桶算法,用專(zhuān)業(yè)一點(diǎn)的詞來(lái)說(shuō),漏桶算法能夠進(jìn)行流量整形和流量控制。
漏桶是一個(gè)很形象的比喻,外部請(qǐng)求就像是水一樣不斷注入水桶中,而水桶已經(jīng)設(shè)置好了最大出水速率,漏桶會(huì)以這個(gè)速率勻速放行請(qǐng)求,而當(dāng)水超過(guò)桶的最大容量后則被丟棄。
看一下代碼實(shí)現(xiàn):
@Slf4j
public class LeakyBucketRateLimiter {
// 桶的容量
private int capacity;
// 桶中現(xiàn)存水量
private AtomicInteger water=new AtomicInteger(0);
// 開(kāi)始漏水時(shí)間
private long leakTimeStamp;
// 水流出的速率,即每秒允許通過(guò)的請(qǐng)求數(shù)
private int leakRate;
public LeakyBucketRateLimiter(int capacity,int leakRate){
this.capacity=capacity;
this.leakRate=leakRate;
}
public synchronized boolean tryAcquire(){
// 桶中沒(méi)有水,重新開(kāi)始計(jì)算
if (water.get()==0){
log.info("start leaking");
leakTimeStamp = System.currentTimeMillis();
water.incrementAndGet();
return water.get() < capacity;
}
// 先漏水,計(jì)算剩余水量
long currentTime = System.currentTimeMillis();
int leakedWater= (int) ((currentTime-leakTimeStamp)/1000 * leakRate);
log.info("lastTime:{}, currentTime:{}. LeakedWater:{}",leakTimeStamp,currentTime,leakedWater);
// 可能時(shí)間不足,則先不漏水
if (leakedWater != 0){
int leftWater = water.get() - leakedWater;
// 可能水已漏光,設(shè)為0
water.set(Math.max(0,leftWater));
leakTimeStamp=System.currentTimeMillis();
}
log.info("剩余容量:{}",capacity-water.get());
if (water.get() < capacity){
log.info("tryAcquire success");
water.incrementAndGet();
return true;
}else {
log.info("tryAcquire fail");
return false;
}
}
}
進(jìn)行一下測(cè)試,先初始化一個(gè)漏桶,設(shè)置桶的容量為3,每秒放行1個(gè)請(qǐng)求,在代碼中每500毫秒嘗試請(qǐng)求1次:
void test() throws InterruptedException {
LeakyBucketRateLimiter leakyBucketRateLimiter
=new LeakyBucketRateLimiter(3,1);
for (int i = 0; i < 15; i++) {
if (leakyBucketRateLimiter.tryAcquire()) {
System.out.println("執(zhí)行任務(wù)");
}else {
System.out.println("被限流");
}
TimeUnit.MILLISECONDS.sleep(500);
}
}
查看運(yùn)行結(jié)果,按規(guī)則進(jìn)行了放行:
但是,漏桶算法同樣也有缺點(diǎn),不管當(dāng)前系統(tǒng)的負(fù)載壓力如何,所有請(qǐng)求都得進(jìn)行排隊(duì),即使此時(shí)服務(wù)器的負(fù)載處于相對(duì)空閑的狀態(tài),這樣會(huì)造成系統(tǒng)資源的浪費(fèi)。由于漏桶的缺陷比較明顯,所以在實(shí)際業(yè)務(wù)場(chǎng)景中,使用的比較少。
令牌桶算法
令牌桶算法是基于漏桶算法的一種改進(jìn),主要在于令牌桶算法能夠在限制服務(wù)調(diào)用的平均速率的同時(shí),還能夠允許一定程度內(nèi)的突發(fā)調(diào)用。
它的主要思想是系統(tǒng)以恒定的速度生成令牌,并將令牌放入令牌桶中,當(dāng)令牌桶中滿了的時(shí)候,再向其中放入的令牌就會(huì)被丟棄。而每次請(qǐng)求進(jìn)入時(shí),必須從令牌桶中獲取一個(gè)令牌,如果沒(méi)有獲取到令牌則被限流拒絕。
假設(shè)令牌的生成速度是每秒100個(gè),并且第一秒內(nèi)只使用了70個(gè)令牌,那么在第二秒可用的令牌數(shù)量就變成了130,在允許的請(qǐng)求范圍上限內(nèi),擴(kuò)大了請(qǐng)求的速率。當(dāng)然,這里要設(shè)置桶容量的上限,避免超出系統(tǒng)能夠承載的最大請(qǐng)求數(shù)量。
Guava中的RateLimiter就是基于令牌桶實(shí)現(xiàn)的,可以直接拿來(lái)使用,先引入依賴:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
進(jìn)行測(cè)試,設(shè)置每秒產(chǎn)生5個(gè)令牌:
void acquireTest(){
RateLimiter rateLimiter=RateLimiter.create(5);
for (int i = 0; i < 10; i++) {
double time = rateLimiter.acquire();
log.info("等待時(shí)間:{}s",time);
}
}
運(yùn)行結(jié)果:
可以看到,每200ms左右產(chǎn)生一個(gè)令牌并放行請(qǐng)求,也就是1秒放行5個(gè)請(qǐng)求,使用RateLimiter能夠很好的實(shí)現(xiàn)單機(jī)的限流。
那么再回到我們前面提到的突發(fā)流量情況,令牌桶是怎么解決的呢?RateLimiter中引入了一個(gè)預(yù)消費(fèi)的概念。在源碼中,有這么一段注釋?zhuān)?/p>
* <p>It is important to note that the number of permits requested <i>never</i> affects the
* throttling of the request itself (an invocation to {@code acquire(1)} and an invocation to {@code
* acquire(1000)} will result in exactly the same throttling, if any), but it affects the throttling
* of the <i>next</i> request. I.e., if an expensive task arrives at an idle RateLimiter, it will be
* granted immediately, but it is the <i>next</i> request that will experience extra throttling,
* thus paying for the cost of the expensive task.
大意就是,申請(qǐng)令牌的數(shù)量不同不會(huì)影響這個(gè)申請(qǐng)令牌這個(gè)動(dòng)作本身的響應(yīng)時(shí)間,acquire(1)和acquire(1000)這兩個(gè)請(qǐng)求會(huì)消耗同樣的時(shí)間返回結(jié)果,但是會(huì)影響下一個(gè)請(qǐng)求的響應(yīng)時(shí)間。
如果一個(gè)消耗大量令牌的任務(wù)到達(dá)空閑的RateLimiter,會(huì)被立即批準(zhǔn)執(zhí)行,但是當(dāng)下一個(gè)請(qǐng)求進(jìn)來(lái)時(shí),將會(huì)額外等待一段時(shí)間,用來(lái)支付前一個(gè)請(qǐng)求的時(shí)間成本。
至于為什么要這么做,通過(guò)舉例來(lái)引申一下。當(dāng)一個(gè)系統(tǒng)處于空閑狀態(tài)時(shí),突然來(lái)了1個(gè)需要消耗100個(gè)令牌的任務(wù),那么白白等待100秒是毫無(wú)意義的浪費(fèi)資源行為,那么可以先允許它執(zhí)行,并對(duì)后續(xù)請(qǐng)求進(jìn)行限流時(shí)間上的延長(zhǎng),以此來(lái)達(dá)到一個(gè)應(yīng)對(duì)突發(fā)流量的效果。
看一下具體的代碼示例:
void acquireMultiTest(){
RateLimiter rateLimiter=RateLimiter.create(1);
for (int i = 0; i <3; i++) {
int num = 2 * i + 1;
log.info("獲取{}個(gè)令牌", num);
double cost = rateLimiter.acquire(num);
log.info("獲取{}個(gè)令牌結(jié)束,耗時(shí){}ms",num,cost);
}
}
運(yùn)行結(jié)果:
可以看到,在第二次請(qǐng)求時(shí)需要3個(gè)令牌,但是并沒(méi)有等3秒后才獲取成功,而是在等第一次的1個(gè)令牌所需要的1秒償還后,立即獲得了3個(gè)令牌得到了放行。同樣,第三次獲取5個(gè)令牌時(shí)等待的3秒是償還的第二次獲取令牌的時(shí)間,償還完成后立即獲取5個(gè)新令牌,而并沒(méi)有等待全部重新生成完成。
除此之外RateLimiter還具有平滑預(yù)熱功能,下面的代碼就實(shí)現(xiàn)了在啟動(dòng)3秒內(nèi),平滑提高令牌發(fā)放速率到每秒5個(gè)的功能:
void acquireSmoothly(){
RateLimiter rateLimiter=RateLimiter.create(5,3, TimeUnit.SECONDS);
long startTimeStamp = System.currentTimeMillis();
for (int i = 0; i < 15; i++) {
double time = rateLimiter.acquire();
log.info("等待時(shí)間:{}s, 總時(shí)間:{}ms"
,time,System.currentTimeMillis()-startTimeStamp);
}
}
查看運(yùn)行結(jié)果:
可以看到,令牌發(fā)放時(shí)間從最開(kāi)始的500ms多逐漸縮短,在3秒后達(dá)到了200ms左右的勻速發(fā)放。
總的來(lái)說(shuō),基于令牌桶實(shí)現(xiàn)的RateLimiter功能還是非常強(qiáng)大的,在限流的基礎(chǔ)上還可以把請(qǐng)求平均分散在各個(gè)時(shí)間段內(nèi),因此在單機(jī)情況下它是使用比較廣泛的限流組件。
中間件限流
前面討論的四種方式都是針對(duì)單體架構(gòu),無(wú)法跨JVM進(jìn)行限流,而在分布式、微服務(wù)架構(gòu)下,可以借助一些中間件進(jìn)行限。Sentinel是Spring Cloud Alibaba中常用的熔斷限流組件,為我們提供了開(kāi)箱即用的限流方法。
使用起來(lái)也非常簡(jiǎn)單,在service層的方法上添加@SentinelResource注解,通過(guò)value指定資源名稱,blockHandler指定一個(gè)方法,該方法會(huì)在原方法被限流、降級(jí)、系統(tǒng)保護(hù)時(shí)被調(diào)用。
@Service
public class QueryService {
public static final String KEY="query";
@SentinelResource(value = KEY,
blockHandler ="blockHandlerMethod")
public String query(String name){
return "begin query,name="+name;
}
public String blockHandlerMethod(String name, BlockException e){
e.printStackTrace();
return "blockHandlerMethod for Query : " + name;
}
}
配置限流規(guī)則,這里使用直接編碼方式配置,指定QPS到達(dá)1時(shí)進(jìn)行限流:
@Component
public class SentinelConfig {
@PostConstruct
private void init(){
List<FlowRule> rules = new ArrayList<>();
FlowRule rule = new FlowRule(QueryService.KEY);
rule.setCount(1);
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule.setLimitApp("default");
rules.add(rule);
FlowRuleManager.loadRules(rules);
}
}
在application.yml中配置sentinel的端口及dashboard地址:
spring:
application:
name: sentinel-test
cloud:
sentinel:
transport:
port: 8719
dashboard: localhost:8088
啟動(dòng)項(xiàng)目后,啟動(dòng)sentinel-dashboard:
java -Dserver.port=8088 -jar sentinel-dashboard-1.8.0.jar
在瀏覽器打開(kāi)dashboard就可以看見(jiàn)我們?cè)O(shè)置的流控規(guī)則:
進(jìn)行接口測(cè)試,在超過(guò)QPS指定的限制后,則會(huì)執(zhí)行blockHandler()方法中的邏輯:
Sentinel在微服務(wù)架構(gòu)下得到了廣泛的使用,能夠提供可靠的集群流量控制、服務(wù)斷路等功能。在使用中,限流可以結(jié)合熔斷、降級(jí)一起使用,成為有效應(yīng)對(duì)三高系統(tǒng)的三板斧,來(lái)保證服務(wù)的穩(wěn)定性。
網(wǎng)關(guān)限流
網(wǎng)關(guān)限流也是目前比較流行的一種方式,這里我們介紹采用Spring Cloud的gateway組件進(jìn)行限流的方式。
在項(xiàng)目中引入依賴,gateway的限流實(shí)際使用的是Redis加lua腳本的方式實(shí)現(xiàn)的令牌桶,因此還需要引入redis的相關(guān)依賴:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
對(duì)gateway進(jìn)行配置,主要就是配一下令牌的生成速率、令牌桶的存儲(chǔ)量上限,以及用于限流的鍵的解析器。這里設(shè)置的桶上限為2,每秒填充1個(gè)令牌:
spring:
application:
name: gateway-test
cloud:
gateway:
routes:
- id: limit_route
uri: lb://sentinel-test
predicates:
- Path=/sentinel-test/**
filters:
- name: RequestRateLimiter
args:
# 令牌桶每秒填充平均速率
redis-rate-limiter.replenishRate: 1
# 令牌桶上限
redis-rate-limiter.burstCapacity: 2
# 指定解析器,使用spEl表達(dá)式按beanName從spring容器中獲取
key-resolver: "#{@pathKeyResolver}"
- StripPrefix=1
redis:
host: 127.0.0.1
port: 6379
我們使用請(qǐng)求的路徑作為限流的鍵,編寫(xiě)對(duì)應(yīng)的解析器:
@Slf4j
@Component
public class PathKeyResolver implements KeyResolver {
public Mono<String> resolve(ServerWebExchange exchange) {
String path = exchange.getRequest().getPath().toString();
log.info("Request path: {}",path);
return Mono.just(path);
}
}
啟動(dòng)gateway,使用jmeter進(jìn)行測(cè)試,設(shè)置請(qǐng)求間隔為500ms,因?yàn)槊棵肷梢粋€(gè)令牌,所以后期達(dá)到了每?jī)蓚€(gè)請(qǐng)求放行1個(gè)的限流效果,在被限流的情況下,http請(qǐng)求會(huì)返回429狀態(tài)碼。
除了上面的根據(jù)請(qǐng)求路徑限流外,我們還可以靈活設(shè)置各種限流的維度,例如根據(jù)請(qǐng)求header中攜帶的用戶信息、或是攜帶的參數(shù)等等。當(dāng)然,如果不想用gateway自帶的這個(gè)Redis的限流器的話,我們也可以自己實(shí)現(xiàn)RateLimiter接口來(lái)實(shí)現(xiàn)一個(gè)自己的限流工具。
gateway實(shí)現(xiàn)限流的關(guān)鍵是spring-cloud-gateway-core包中的RedisRateLimiter類(lèi),以及META-INF/scripts中的request-rate-limiter.lua這個(gè)腳本,如果有興趣可以看一下具體是如何實(shí)現(xiàn)的。
總結(jié)
總的來(lái)說(shuō),要保證系統(tǒng)的抗壓能力,限流是一個(gè)必不可少的環(huán)節(jié),雖然可能會(huì)造成某些用戶的請(qǐng)求被丟棄,但相比于突發(fā)流量造成的系統(tǒng)宕機(jī)來(lái)說(shuō),這些損失一般都在可以接受的范圍之內(nèi)。前面也說(shuō)過(guò),限流可以結(jié)合熔斷、降級(jí)一起使用,多管齊下,保證服務(wù)的可用性與健壯性。