Java并發(fā)編程:深入剖析CyclicBarrier源碼
引言
CyclicBarrier中文叫做循環(huán)柵欄,用來控制線程的執(zhí)行速率。
適用場景:一組線程在到達柵欄之前,需要相互等待,到達柵欄之后(滿足了特定條件),再一起執(zhí)行。
適用場景好像跟CountDownLatch一樣,前面介紹過CountDownLatch的適用場景,跟第二種場景很像,不過還是有點區(qū)別:
- CountDownLatch需要手動調(diào)用countDown()方法,這組線程才能一起執(zhí)行,而CyclicBarrier無需調(diào)用調(diào)用任何方法,線程會自動執(zhí)行。
- CountDownLatch只能使用一次,而CyclicBarrier可以循環(huán)使用。
再提一下CountDownLatch的兩個適用場景:
- 當前線程等待其他線程都執(zhí)行完成之后,再執(zhí)行。
- 所有線程滿足條件后,再一起執(zhí)行。
使用示例
CyclicBarrier常用的方法就一個await()方法,調(diào)用await()方法之后,會阻塞當前線程,直到柵欄前的所有線程都調(diào)用了await()方法,才會放行,并且一起執(zhí)行。
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author 一燈架構(gòu)
* @apiNote CyclicBarrier測試類
**/
@Slf4j
public class CyclicBarrierTest {
public static void main(String[] args) throws InterruptedException {
// 1. 創(chuàng)建一個線程池,用來執(zhí)行任務
ExecutorService executorService = Executors.newCachedThreadPool();
// 2. 創(chuàng)建一個循環(huán)柵欄,線程數(shù)是3
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
// 3. 提交9個任務,剛好可以循環(huán)3輪
for (int i = 0; i < 9; i++) {
// 4. 睡眠100ms再提交任務,避免并發(fā)提交
Thread.sleep(100);
executorService.execute(() -> {
try {
// 5. 睡眠1秒,模擬任務準備階段
Thread.sleep(1000);
log.info(Thread.currentThread().getName() + " 準備 " + cyclicBarrier.getNumberWaiting());
// 6. 阻塞當前任務,直到3個線程都到達柵欄
cyclicBarrier.await();
log.info(Thread.currentThread().getName() + " 執(zhí)行完成");
} catch (Exception e) {
}
});
}
// 7. 關(guān)閉線程池
executorService.shutdown();
}
}
輸出結(jié)果:
10:00:00.001 [pool-1-thread-1] INFO com.yideng.CyclicBarrierTest - pool-1-thread-1 準備 0
10:00:00.002 [pool-1-thread-2] INFO com.yideng.CyclicBarrierTest - pool-1-thread-2 準備 1
10:00:00.003 [pool-1-thread-3] INFO com.yideng.CyclicBarrierTest - pool-1-thread-3 準備 2
10:00:00.003 [pool-1-thread-3] INFO com.yideng.CyclicBarrierTest - pool-1-thread-3 執(zhí)行完成
10:00:00.003 [pool-1-thread-1] INFO com.yideng.CyclicBarrierTest - pool-1-thread-1 執(zhí)行完成
10:00:00.004 [pool-1-thread-2] INFO com.yideng.CyclicBarrierTest - pool-1-thread-2 執(zhí)行完成
10:00:00.010 [pool-1-thread-4] INFO com.yideng.CyclicBarrierTest - pool-1-thread-4 準備 0
10:00:00.011 [pool-1-thread-5] INFO com.yideng.CyclicBarrierTest - pool-1-thread-5 準備 1
10:00:01.003 [pool-1-thread-6] INFO com.yideng.CyclicBarrierTest - pool-1-thread-6 準備 2
10:00:01.004 [pool-1-thread-6] INFO com.yideng.CyclicBarrierTest - pool-1-thread-6 執(zhí)行完成
10:00:01.004 [pool-1-thread-4] INFO com.yideng.CyclicBarrierTest - pool-1-thread-4 執(zhí)行完成
10:00:01.004 [pool-1-thread-5] INFO com.yideng.CyclicBarrierTest - pool-1-thread-5 執(zhí)行完成
10:00:01.114 [pool-1-thread-7] INFO com.yideng.CyclicBarrierTest - pool-1-thread-7 準備 0
10:00:01.213 [pool-1-thread-8] INFO com.yideng.CyclicBarrierTest - pool-1-thread-8 準備 1
10:00:01.317 [pool-1-thread-9] INFO com.yideng.CyclicBarrierTest - pool-1-thread-9 準備 2
10:00:01.318 [pool-1-thread-9] INFO com.yideng.CyclicBarrierTest - pool-1-thread-9 執(zhí)行完成
10:00:01.318 [pool-1-thread-7] INFO com.yideng.CyclicBarrierTest - pool-1-thread-7 執(zhí)行完成
10:00:01.319 [pool-1-thread-8] INFO com.yideng.CyclicBarrierTest - pool-1-thread-8 執(zhí)行完成
示例中CyclicBarrier包含3個線程,提交9個任務,每3個任務為一組,調(diào)用await()方法后會相互等待,直到3個線程都調(diào)用了await()方法,然后放行,并且一起執(zhí)行,9個任務會循環(huán)3輪,從輸出結(jié)果中可以看出。
示例中g(shù)etNumberWaiting()方法可以查看CyclicBarrier中已經(jīng)等待的線程數(shù)。
看完了CyclicBarrier的使用方式,再看一下CyclicBarrier的源碼實現(xiàn)。
類屬性
public class CyclicBarrier {
/**
* 互斥鎖,用來保證線程安全
*/
private final ReentrantLock lock = new ReentrantLock();
/**
* 柵欄條件操作
*/
private final Condition trip = lock.newCondition();
/**
* 柵欄初始線程數(shù)
*/
private final int parties;
/**
* 到達柵欄后的操作
*/
private final Runnable barrierCommand;
/**
* 柵欄前未到達的線程數(shù)
*/
private int count;
/**
* 當前循環(huán)輪數(shù)
*/
private Generation generation = new Generation();
private static class Generation {
boolean broken = false;
}
}
CyclicBarrier內(nèi)部使用了ReentrantLock來保證線程安全,又使用了Condition來實現(xiàn)線程的等待與喚醒操作。
初始化
CyclicBarrier初始化的可以指定線程數(shù)和到達柵欄后的操作。
/**
* 指定線程數(shù)
*/
public CyclicBarrier(int parties) {
this(parties, null);
}
/**
* 指定線程數(shù)和到達柵欄后的操作
* @param parties 線程數(shù)
* @param barrierAction 到達柵欄后的操作
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) {
throw new IllegalArgumentException();
}
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
比如到達柵欄后,關(guān)閉線程池:
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> executorService.shutdown());
看一下await()方法源碼。
await方法源碼
/**
* await方法入口
*/
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
/**
* await方法核心邏輯
* @param timed 是否允許超時,false表示不允許
* @param nanos 超時時間
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException, TimeoutException {
// 1. 加鎖
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 2. 獲取當前循環(huán)輪數(shù)
final Generation g = generation;
if (g.broken) {
throw new BrokenBarrierException();
}
// 3. 如果當前線程已中斷,就打破柵欄
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 4. 計數(shù)器減一,如果計數(shù)器為零,表示所有線程都到達了柵欄
int index = --count;
if (index == 0) {
boolean ranAction = false;
try {
// 5. 如果初始化時指定了barrierCommand,就執(zhí)行
final Runnable command = barrierCommand;
if (command != null) {
command.run();
}
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction) {
breakBarrier();
}
}
}
for (; ; ) {
try {
// 6. 如果不允許超時,就阻塞當前線程
if (!timed) {
trip.await();
} else if (nanos > 0L) {
nanos = trip.awaitNanos(nanos);
}
} catch (InterruptedException ie) {
if (g == generation && !g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken) {
throw new BrokenBarrierException();
}
if (g != generation) {
return index;
}
// 7. 如果已超時,就打破柵欄
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 8. 釋放鎖
lock.unlock();
}
}
await()方法源碼很長,但是邏輯很簡單,主要分為以下四步:
- 加鎖,保證線程安全。
- 統(tǒng)計柵欄前等待的線程數(shù),如果所有線程都到達了柵欄,就執(zhí)行初始化時指定的barrierCommand。
- 如果線程沒有指定了超時時間,就直接阻塞當前線程。如果指定了超時時間,就等待直到超時,如果已超時,就打破柵欄。
- 釋放鎖
再看一下打破柵欄的源碼:
/**
* 打破柵欄
*/
private void breakBarrier() {
// 1. 設置當前循環(huán)輪數(shù)的狀態(tài)為已打破
generation.broken = true;
// 2. 重置線程數(shù)
count = parties;
// 3. 喚醒所有等待的線程
trip.signalAll();
}
其他常用方法
CyclicBarrier還有一些常用的方法:
/**
* 等待(帶超時時間)
* @param timeout 超時時間
* @param unit 時間單位
*/
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
...
}
/**
* 重置柵欄(當柵欄出現(xiàn)異常情況時使用)
*/
public void reset() {
...
}
總結(jié)
看完了CyclicBarrier的所有源碼,是不是覺得CyclicBarrier邏輯很簡單。
CyclicBarrier主要用來控制線程的執(zhí)行速率,初始化時指定線程數(shù),線程調(diào)用await()方法時會阻塞,直到到達的線程數(shù)等于初始線程數(shù),才會放行,并且一起執(zhí)行。與CountDownLatch區(qū)別是,CyclicBarrier可以循環(huán)執(zhí)行,而CountDownLatch只能執(zhí)行一次。