阿里面試:說說自適應(yīng)限流?
限流想必大家都不陌生,它是一種控制資源訪問速率的策略,用于保護(hù)系統(tǒng)免受過載和崩潰的風(fēng)險(xiǎn)。限流可以控制某個(gè)服務(wù)、接口或系統(tǒng)在一段時(shí)間內(nèi)能夠處理的請求或數(shù)據(jù)量,以防止系統(tǒng)資源耗盡、性能下降或服務(wù)不可用。
常見的限流策略有以下幾種:
- 令牌桶算法:基于令牌桶的方式,限制每個(gè)單位時(shí)間內(nèi)允許通過的請求量,請求量超出限制的將被拒絕或等待。
- 漏桶算法:基于漏桶的方式,限制系統(tǒng)處理請求的速率,請求速率過快時(shí)將被限制或拒絕。
- 計(jì)數(shù)器算法:通過計(jì)數(shù)器記錄單位時(shí)間內(nèi)的請求次數(shù),并根據(jù)設(shè)定的閾值進(jìn)行限制。
通過合理的限流策略,可以保護(hù)系統(tǒng)免受惡意攻擊、突發(fā)流量和資源濫用的影響,確保系統(tǒng)穩(wěn)定和可靠運(yùn)行。在實(shí)際應(yīng)用中,限流常用于接口防刷、防止 DDoS 攻擊、保護(hù)關(guān)鍵服務(wù)等場景。
1.限流實(shí)現(xiàn)
在 Java 中,限流的實(shí)現(xiàn)方式有很多種,例如以下這些:
- 單機(jī)限流:使用 JUC 下的 Semaphore 限流,或一些常用的框架,例如 Google 的 Guava 框架進(jìn)行限流,但這種限流方式都是基于 JVM 層面的內(nèi)存級別的單臺機(jī)器限流。
- 組件限流:單機(jī)限流往往不適用于分布式系統(tǒng),而分布式系統(tǒng)可以通過組件 Sentinel、Hystrix 對整個(gè)集群進(jìn)行限流。
- 反向代理限流(Nginx 限流):通常在網(wǎng)關(guān)層的上游,我們會使用 Nginx(反向代理)一起來配合使用,也就是用戶請求會先到 Nginx(或 Nginx 集群),然后再將請求轉(zhuǎn)發(fā)給網(wǎng)關(guān),網(wǎng)關(guān)再調(diào)用其他的微服務(wù),從而實(shí)現(xiàn)整個(gè)流程的請求調(diào)用,因此 Nginx 限流也是分布式系統(tǒng)中常用的限流手段。
2.自適應(yīng)限流
所謂的自適應(yīng)限流是結(jié)合應(yīng)用的 Load、CPU 使用率、總體平均 RT、入口 QPS 和并發(fā)線程數(shù)等幾個(gè)維度的監(jiān)控指標(biāo),通過自適應(yīng)的流控策略,讓系統(tǒng)的入口流量和系統(tǒng)的負(fù)載達(dá)到一個(gè)平衡,讓系統(tǒng)盡可能跑在最大吞吐量的同時(shí)保證系統(tǒng)整體的穩(wěn)定性。
類似的實(shí)現(xiàn)思路還有很多,如,自適應(yīng)自旋鎖、還有 K8S 中根據(jù)負(fù)載進(jìn)行動態(tài)擴(kuò)容等。
3.實(shí)現(xiàn)思路
以 Sentinel 中的自適應(yīng)限流來說,它的實(shí)現(xiàn)思路是用負(fù)載(load1)作為啟動控制流量的值,而允許通過的流量由處理請求的能力,即請求的響應(yīng)時(shí)間以及當(dāng)前系統(tǒng)正在處理的請求速率來決定。
為什么要這樣設(shè)計(jì)?
長期以來,系統(tǒng)自適應(yīng)保護(hù)的思路是根據(jù)硬指標(biāo),即系統(tǒng)的負(fù)載 (load1) 來做系統(tǒng)過載保護(hù)。當(dāng)系統(tǒng)負(fù)載高于某個(gè)閾值,就禁止或者減少流量的進(jìn)入;當(dāng) load 開始好轉(zhuǎn),則恢復(fù)流量的進(jìn)入。這個(gè)思路給我們帶來了不可避免的兩個(gè)問題:
- load 是一個(gè)“果”,如果根據(jù) load 的情況來調(diào)節(jié)流量的通過率,那么就始終有延遲性。也就意味著通過率的任何調(diào)整,都會過一段時(shí)間才能看到效果。當(dāng)前通過率是使 load 惡化的一個(gè)動作,那么也至少要過 1 秒之后才能觀測到;同理,如果當(dāng)前通過率調(diào)整是讓 load 好轉(zhuǎn)的一個(gè)動作,也需要 1 秒之后才能繼續(xù)調(diào)整,這樣就浪費(fèi)了系統(tǒng)的處理能力。所以我們看到的曲線,總是會有抖動。
- 恢復(fù)慢。想象一下這樣的一個(gè)場景(真實(shí)),出現(xiàn)了這樣一個(gè)問題,下游應(yīng)用不可靠,導(dǎo)致應(yīng)用 RT 很高,從而 load 到了一個(gè)很高的點(diǎn)。過了一段時(shí)間之后下游應(yīng)用恢復(fù)了,應(yīng)用 RT 也相應(yīng)減少。這個(gè)時(shí)候,其實(shí)應(yīng)該大幅度增大流量的通過率;但是由于這個(gè)時(shí)候 load 仍然很高,通過率的恢復(fù)仍然不高。
TCP BBR 的思想給了我們一個(gè)很大的啟發(fā)。我們應(yīng)該根據(jù)系統(tǒng)能夠處理的請求,和允許進(jìn)來的請求,來做平衡,而不是根據(jù)一個(gè)間接的指標(biāo)(系統(tǒng) load)來做限流。最終我們追求的目標(biāo)是 在系統(tǒng)不被拖垮的情況下,提高系統(tǒng)的吞吐率,而不是 load 一定要到低于某個(gè)閾值。如果我們還是按照固有的思維,超過特定的 load 就禁止流量進(jìn)入,系統(tǒng) load 恢復(fù)就放開流量,這樣做的結(jié)果是無論我們怎么調(diào)參數(shù),調(diào)比例,都是按照果來調(diào)節(jié)因,都無法取得良好的效果。 所以,Sentinel 在系統(tǒng)自適應(yīng)限流的做法是,用 load1 作為啟動控制流量的值,而允許通過的流量由處理請求的能力,即請求的響應(yīng)時(shí)間以及當(dāng)前系統(tǒng)正在處理的請求速率來決定。
4.支持規(guī)則
Sentinel 是從單臺機(jī)器的總體 Load、RT、入口 QPS 和線程數(shù)四個(gè)維度監(jiān)控應(yīng)用數(shù)據(jù),讓系統(tǒng)盡可能跑在最大吞吐量的同時(shí)保證系統(tǒng)整體的穩(wěn)定性。
系統(tǒng)保護(hù)規(guī)則是應(yīng)用整體維度的,而不是資源維度的,并且僅對入口流量生效。入口流量指的是進(jìn)入應(yīng)用的流量(EntryType.IN),比如 Web 服務(wù)或 Dubbo 服務(wù)端接收的請求,都屬于入口流量。
注意:系統(tǒng)規(guī)則只對入口流量起作用(調(diào)用類型為 EntryType.IN),對出口流量無效。可通過 SphU.entry(res, entryType) 指定調(diào)用類型,如果不指定,默認(rèn)是 EntryType.OUT。
Sentinel 支持以下的閾值規(guī)則:
- Load(僅對 Linux/Unix-like 機(jī)器生效):當(dāng)系統(tǒng) load1 超過閾值,且系統(tǒng)當(dāng)前的并發(fā)線程數(shù)超過系統(tǒng)容量時(shí)才會觸發(fā)系統(tǒng)保護(hù)。系統(tǒng)容量由系統(tǒng)的 maxQps * minRt 計(jì)算得出。設(shè)定參考值一般是 CPU cores * 2.5。
- CPU usage(1.5.0+ 版本):當(dāng)系統(tǒng) CPU 使用率超過閾值即觸發(fā)系統(tǒng)保護(hù)(取值范圍 0.0-1.0)。
- RT:當(dāng)單臺機(jī)器上所有入口流量的平均 RT 達(dá)到閾值即觸發(fā)系統(tǒng)保護(hù),單位是毫秒。
- 線程數(shù):當(dāng)單臺機(jī)器上所有入口流量的并發(fā)線程數(shù)達(dá)到閾值即觸發(fā)系統(tǒng)保護(hù)。
- 入口 QPS:當(dāng)單臺機(jī)器上所有入口流量的 QPS 達(dá)到閾值即觸發(fā)系統(tǒng)保護(hù)。
5.設(shè)置自適應(yīng)限流
在 Sentinel 中,可以通過系統(tǒng)規(guī)則 -> 新增系統(tǒng)規(guī)則,設(shè)置閾值以實(shí)現(xiàn)自適應(yīng)限流功能,如下圖所示:
6.原理分析
先用經(jīng)典圖來鎮(zhèn)樓:
我們把系統(tǒng)處理請求的過程想象為一個(gè)水管,到來的請求是往這個(gè)水管灌水,當(dāng)系統(tǒng)處理順暢的時(shí)候,請求不需要排隊(duì),直接從水管中穿過,這個(gè)請求的RT是最短的;反之,當(dāng)請求堆積的時(shí)候,那么處理請求的時(shí)間則會變?yōu)椋号抨?duì)時(shí)間 + 最短處理時(shí)間。
推論一:如果我們能夠保證水管里的水量,能夠讓水順暢的流動,則不會增加排隊(duì)的請求;也就是說,這個(gè)時(shí)候的系統(tǒng)負(fù)載不會進(jìn)一步惡化。
我們用 T 來表示(水管內(nèi)部的水量),用 RT 來表示請求的處理時(shí)間,用P來表示進(jìn)來的請求數(shù),那么一個(gè)請求從進(jìn)入水管道到從水管出來,這個(gè)水管會存在 P * RT 個(gè)請求。換一句話來說,當(dāng) T ≈ QPS * Avg(RT) 的時(shí)候,我們可以認(rèn)為系統(tǒng)的處理能力和允許進(jìn)入的請求個(gè)數(shù)達(dá)到了平衡,系統(tǒng)的負(fù)載不會進(jìn)一步惡化。
接下來的問題是,水管的水位是可以達(dá)到了一個(gè)平衡點(diǎn),但是這個(gè)平衡點(diǎn)只能保證水管的水位不再繼續(xù)增高,但是還面臨一個(gè)問題,就是在達(dá)到平衡點(diǎn)之前,這個(gè)水管里已經(jīng)堆積了多少水。如果之前水管的水已經(jīng)在一個(gè)量級了,那么這個(gè)時(shí)候系統(tǒng)允許通過的水量可能只能緩慢通過,RT 會大,之前堆積在水管里的水會滯留;反之,如果之前的水管水位偏低,那么又會浪費(fèi)了系統(tǒng)的處理能力。
推論二:當(dāng)保持入口的流量使水管出來的流量達(dá)到最大值的時(shí)候,可以最大利用水管的處理能力。
然而,和 TCP BBR 的不一樣的地方在于,還需要用一個(gè)系統(tǒng)負(fù)載的值(load1)來激發(fā)這套機(jī)制啟動。
注:這種系統(tǒng)自適應(yīng)算法對于低 load 的請求,它的效果是一個(gè)“兜底”的角色。對于不是應(yīng)用本身造成的 load 高的情況(如其它進(jìn)程導(dǎo)致的不穩(wěn)定的情況),效果不明顯。
7.實(shí)現(xiàn)代碼
以 Sentinel 官方提供的自適應(yīng)限流代碼為例,我們可以再來了解一下它的具體使用:
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.demo.system;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.alibaba.csp.sentinel.util.TimeUtil;
import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.system.SystemRule;
import com.alibaba.csp.sentinel.slots.system.SystemRuleManager;
/**
* @author jialiang.linjl
*/
public class SystemGuardDemo {
private static AtomicInteger pass = new AtomicInteger();
private static AtomicInteger block = new AtomicInteger();
private static AtomicInteger total = new AtomicInteger();
private static volatile boolean stop = false;
private static final int threadCount = 100;
private static int seconds = 60 + 40;
public static void main(String[] args) throws Exception {
tick();
initSystemRule();
for (int i = 0; i < threadCount; i++) {
Thread entryThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
Entry entry = null;
try {
entry = SphU.entry("methodA", EntryType.IN);
pass.incrementAndGet();
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
// ignore
}
} catch (BlockException e1) {
block.incrementAndGet();
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
// ignore
}
} catch (Exception e2) {
// biz exception
} finally {
total.incrementAndGet();
if (entry != null) {
entry.exit();
}
}
}
}
});
entryThread.setName("working-thread");
entryThread.start();
}
}
private static void initSystemRule() {
SystemRule rule = new SystemRule();
// max load is 3
rule.setHighestSystemLoad(3.0);
// max cpu usage is 60%
rule.setHighestCpuUsage(0.6);
// max avg rt of all request is 10 ms
rule.setAvgRt(10);
// max total qps is 20
rule.setQps(20);
// max parallel working thread is 10
rule.setMaxThread(10);
SystemRuleManager.loadRules(Collections.singletonList(rule));
}
private static void tick() {
Thread timer = new Thread(new TimerTask());
timer.setName("sentinel-timer-task");
timer.start();
}
static class TimerTask implements Runnable {
@Override
public void run() {
System.out.println("begin to statistic!!!");
long oldTotal = 0;
long oldPass = 0;
long oldBlock = 0;
while (!stop) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
long globalTotal = total.get();
long oneSecondTotal = globalTotal - oldTotal;
oldTotal = globalTotal;
long globalPass = pass.get();
long oneSecondPass = globalPass - oldPass;
oldPass = globalPass;
long globalBlock = block.get();
long oneSecondBlock = globalBlock - oldBlock;
oldBlock = globalBlock;
System.out.println(seconds + ", " + TimeUtil.currentTimeMillis() + ", total:"
+ oneSecondTotal + ", pass:"
+ oneSecondPass + ", block:" + oneSecondBlock);
if (seconds-- <= 0) {
stop = true;
}
}
System.exit(0);
}
}
}