Java實(shí)現(xiàn)系統(tǒng)限流
限流是保障系統(tǒng)高可用的方式之一,也是大廠高頻面試題,如果面試官問一句,“如何實(shí)現(xiàn)每秒鐘1000個(gè)請(qǐng)求的限流?”,你要是分分鐘給他寫上幾種限流方案,那豈不香哉,哈哈!話不多說,我來列幾種常用限流實(shí)現(xiàn)方式。
1、Guava RateLimiter
Guava是Java領(lǐng)域很優(yōu)秀的開源項(xiàng)目,包含了日常開發(fā)常用的集合、String、緩存等, 其中RateLimiter是常用限流工具。
RateLimiter是基于令牌桶算法實(shí)現(xiàn)的,如果每秒10個(gè)令牌,內(nèi)部實(shí)現(xiàn),會(huì)每100ms生產(chǎn)1個(gè)令牌。
使用Guava RateLimiter,如下:
(1) 引入pom依賴:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>23.0</version>
</dependency>
(2) 代碼:
public class GuavaRateLimiterTest {
//比如每秒生產(chǎn)10個(gè)令牌,相當(dāng)于每100ms生產(chǎn)1個(gè)令牌
private RateLimiter rateLimiter = RateLimiter.create(10);
/**
* 模擬執(zhí)行業(yè)務(wù)方法
*/
public void exeBiz() {
if (rateLimiter.tryAcquire(1)) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("線程" + Thread.currentThread().getName() + ":執(zhí)行業(yè)務(wù)邏輯");
} else {
System.out.println("線程" + Thread.currentThread().getName() + ":被限流");
}
}
public static void main(String[] args) throws InterruptedException {
GuavaRateLimiterTest limiterTest = new GuavaRateLimiterTest();
Thread.sleep(500);//等待500ms,讓limiter生產(chǎn)一些令牌
//模擬瞬間生產(chǎn)100個(gè)線程請(qǐng)求
for (int i = 0; i < 100; i++) {
new Thread(limiterTest::exeBiz).start();
}
}
}
2、滑窗計(jì)數(shù)
打個(gè)比方,某接口每秒允許100個(gè)請(qǐng)求,設(shè)置一個(gè)滑窗,窗口中有10個(gè)格子,每個(gè)格子占100ms,每100ms移動(dòng)一次。滑動(dòng)窗口的格子劃分的越多,滑動(dòng)窗口的滾動(dòng)就越平滑,限流的統(tǒng)計(jì)就會(huì)越精確。
代碼如下:
/**
* 滑窗計(jì)數(shù)器
*/
public class SliderWindowRateLimiter implements Runnable {
//每秒允許的最大訪問數(shù)
private final long maxVisitPerSecond;
//將每秒時(shí)間劃分N個(gè)塊
private final int block;
//每個(gè)塊存儲(chǔ)的數(shù)量
private final AtomicLong[] countPerBlock;
//滑動(dòng)窗口劃到了哪個(gè)塊兒,可以理解為滑動(dòng)窗口的起始下標(biāo)位置
private volatile int index;
//目前總的數(shù)量
private AtomicLong allCount;
/**
* 構(gòu)造函數(shù)
*
* @param block,每秒鐘劃分N個(gè)窗口
* @param maxVisitPerSecond 每秒最大訪問數(shù)量
*/
public SliderWindowRateLimiter(int block, long maxVisitPerSecond) {
this.block = block;
this.maxVisitPerSecond = maxVisitPerSecond;
countPerBlock = new AtomicLong[block];
for (int i = 0; i < block; i++) {
countPerBlock[i] = new AtomicLong();
}
allCount = new AtomicLong(0);
}
/**
* 判斷是否超過最大允許數(shù)量
*
* @return
*/
public boolean isOverLimit() {
return currentQPS() > maxVisitPerSecond;
}
/**
* 獲取目前總的訪問數(shù)
*
* @return
*/
public long currentQPS() {
return allCount.get();
}
/**
* 請(qǐng)求訪問進(jìn)來,判斷是否可以執(zhí)行業(yè)務(wù)邏輯
*/
public void visit() {
countPerBlock[index].incrementAndGet();
allCount.incrementAndGet();
if (isOverLimit()) {
System.out.println(Thread.currentThread().getName() + "被限流" + ",currentQPS:" + currentQPS() + ",index:" + index);
} else {
System.out.println(Thread.currentThread().getName() + "執(zhí)行業(yè)務(wù)邏輯" + ",currentQPS:" + currentQPS() + ",index:" + index);
}
}
/**
* 定時(shí)執(zhí)行器,
* 每N毫秒滑塊移動(dòng)一次,然后再設(shè)置下新滑塊的初始化數(shù)字0,然后新的請(qǐng)求會(huì)落到新的滑塊上
* 同時(shí)總數(shù)減掉新滑塊上的數(shù)字,并且重置新的滑塊上的數(shù)量
*/
@Override
public void run() {
index = (index + 1) % block;
long val = countPerBlock[index].getAndSet(0);
allCount.addAndGet(-val);
}
public static void main(String[] args) {
SliderWindowRateLimiter sliderWindowRateLimiter = new SliderWindowRateLimiter(10, 100);
//固定的速率移動(dòng)滑塊
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleAtFixedRate(sliderWindowRateLimiter, 100, 100, TimeUnit.MILLISECONDS);
//模擬不同速度的請(qǐng)求
new Thread(() -> {
while (true) {
sliderWindowRateLimiter.visit();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
//模擬不同速度的請(qǐng)求
new Thread(() -> {
while (true) {
sliderWindowRateLimiter.visit();
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
3、信號(hào)量
利用Semaphore,每隔固定速率,釋放Semaphore的資源。線程獲取到資源,則執(zhí)行業(yè)務(wù)代碼。
代碼如下:
public class SemaphoreOne {
private static Semaphore semaphore = new Semaphore(10);
public static void bizMethod() throws InterruptedException {
if (!semaphore.tryAcquire()) {
System.out.println(Thread.currentThread().getName() + "被拒絕");
return;
}
System.out.println(Thread.currentThread().getName() + "執(zhí)行業(yè)務(wù)邏輯");
Thread.sleep(500);//模擬處理業(yè)務(wù)邏輯需要1秒
semaphore.release();
}
public static void main(String[] args) {
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
semaphore.release(10);
System.out.println("釋放所有鎖");
}
}, 1000, 1000);
for (int i = 0; i < 10000; i++) {
try {
Thread.sleep(10);//模擬每隔10ms就有1個(gè)請(qǐng)求進(jìn)來
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
try {
SemaphoreOne.bizMethod();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}
4、令牌桶
令牌桶算法:一個(gè)存放固定容量令牌的桶,按照固定速率往桶里添加令牌,如有剩余容量則添加,沒有則放棄。如果有請(qǐng)求進(jìn)來,則需要先從桶里獲取令牌,當(dāng)桶里沒有令牌可取時(shí),則拒絕任務(wù)。
令牌桶的優(yōu)點(diǎn)是:可以改變添加令牌的速率,一旦提高速率,則可以處理突發(fā)流量。
代碼如下:
public class TokenBucket {
/**
* 定義的桶
*/
public class Bucket {
//容量
int capacity;
//速率,每秒放多少
int rateCount;
//目前token個(gè)數(shù)
AtomicInteger curCount = new AtomicInteger(0);
public Bucket(int capacity, int rateCount) {
this.capacity = capacity;
this.rateCount = rateCount;
}
public void put() {
if (curCount.get() < capacity) {
System.out.println("目前數(shù)量==" + curCount.get() + ", 我還可以繼續(xù)放");
curCount.addAndGet(rateCount);
}
}
public boolean get() {
if (curCount.get() >= 1) {
curCount.decrementAndGet();
return true;
}
return false;
}
}
@Test
public void testTokenBucket() throws InterruptedException {
Bucket bucket = new Bucket(5, 2);
//固定線程,固定的速率往桶里放數(shù)據(jù),比如每秒N個(gè)
ScheduledThreadPoolExecutor scheduledCheck = new ScheduledThreadPoolExecutor(1);
scheduledCheck.scheduleAtFixedRate(() -> {
bucket.put();
}, 0, 1, TimeUnit.SECONDS);
//先等待一會(huì)兒,讓桶里放點(diǎn)token
Thread.sleep(6000);
//模擬瞬間10個(gè)線程進(jìn)來拿token
for (int i = 0; i < 10; i++) {
new Thread(() -> {
if (bucket.get()) {
System.out.println(Thread.currentThread() + "獲取到了資源");
} else {
System.out.println(Thread.currentThread() + "被拒絕");
}
}).start();
}
//等待,往桶里放token
Thread.sleep(3000);
//繼續(xù)瞬間10個(gè)線程進(jìn)來拿token
for (int i = 0; i < 10; i++) {
new Thread(() -> {
if (bucket.get()) {
System.out.println(Thread.currentThread() + "獲取到了資源");
} else {
System.out.println(Thread.currentThread() + "被拒絕");
}
}).start();
}
}
}
5、總結(jié)
本文主要介紹了幾種限流方法:Guava RateLimiter、簡(jiǎn)單計(jì)數(shù)、滑窗計(jì)數(shù)、信號(hào)量、令牌桶,當(dāng)然,限流算法還有漏桶算法、nginx限流等等。我所寫的這些方法只是個(gè)人在實(shí)際項(xiàng)目總使用過的,或者是早年參加阿里筆試時(shí)寫過的方式。