CyclicBarrier:人齊了,老司機(jī)就發(fā)車了!
上一篇咱講了 CountDownLatch 可以解決多個(gè)線程同步的問題,相比于 join 來說它的應(yīng)用范圍更廣,不僅可以應(yīng)用在線程上,還可以應(yīng)用在線程池上。然而 CountDownLatch 卻是一次性的計(jì)數(shù)器,以王者農(nóng)藥來說,咱們不可能一場(chǎng)團(tuán)戰(zhàn)就決定比賽的輸贏,所以在某些場(chǎng)景下,咱們是需要重復(fù)使用某個(gè)等待功能的,這就是我們今天要介紹的另一個(gè)主角——CyclicBarrier。
CyclicBarrier
CyclicBarrier 翻譯為中文是循環(huán)(Cyclic)柵欄(Barrier)的意思,它的大概含義是實(shí)現(xiàn)一個(gè)可循環(huán)利用的屏障。
CyclicBarrier 作用是讓一組線程相互等待,當(dāng)達(dá)到一個(gè)共同點(diǎn)時(shí),所有之前等待的線程再繼續(xù)執(zhí)行,且 CyclicBarrier 功能可重復(fù)使用。
舉個(gè)栗子
比如磊哥要坐班車回老家,因?yàn)橹型静辉试S上、下乘客,所以營(yíng)運(yùn)的公司為了收益最大化,就會(huì)等人滿之后再發(fā)車。像這種等人坐滿就發(fā)一班車的場(chǎng)景,就是 CyclicBarrier 所擅長(zhǎng)的,因?yàn)樗梢灾貜?fù)使用(不像 CountDownLatch 那樣只能用一次)。
CyclicBarrier VS CountDownLatch
CountDownLatch:一個(gè)或者多個(gè)線程,等待另外 N 個(gè)線程完成某個(gè)事情之后才能執(zhí)行。
CountDownLatch 就像玩王者農(nóng)藥開局的加載一樣,所有人要等待其他人都加載 100% 之后才能開始游戲。
CyclicBrrier:N 個(gè)線程相互等待,直到有足夠數(shù)量的線程都到達(dá)屏障點(diǎn)之后,之前等待的線程就可以繼續(xù)執(zhí)行了。
CyclicBrrier 就像老司機(jī)開車一樣,如果車上還有空余的座位,那么所有人都得等著,直到座位被坐滿之后,老司機(jī)才會(huì)發(fā)車。
CyclicBarrier使用
- import java.util.Date;
- import java.util.Random;
- import java.util.concurrent.*;
- public class CyclicBarrierExample {
- public static void main(String[] args) {
- // 創(chuàng)建 CyclicBarrier
- final CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
- @Override
- public void run() {
- System.out.println("人滿了,準(zhǔn)備發(fā)車:" + new Date());
- }
- });
- // 線程調(diào)用的任務(wù)
- Runnable runnable = new Runnable() {
- @Override
- public void run() {
- // 生成隨機(jī)數(shù) 1-3
- int randomNumber = new Random().nextInt(3) + 1;
- // 進(jìn)入任務(wù)
- System.out.println(String.format("我是:%s 再走:%d 秒就到車站了,現(xiàn)在時(shí)間:%s",
- Thread.currentThread().getName(), randomNumber, new Date()));
- try {
- // 模擬執(zhí)行
- TimeUnit.SECONDS.sleep(randomNumber);
- // 調(diào)用 CyclicBarrier
- cyclicBarrier.await();
- // 任務(wù)執(zhí)行
- System.out.println(String.format("線程:%s 上車,時(shí)間:%s",
- Thread.currentThread().getName(), new Date()));
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (BrokenBarrierException e) {
- e.printStackTrace();
- }
- }
- };
- // 創(chuàng)建線程池
- ExecutorService threadPool = Executors.newFixedThreadPool(10);
- // 執(zhí)行任務(wù) 1
- threadPool.submit(runnable);
- // 執(zhí)行任務(wù) 2
- threadPool.submit(runnable);
- // 執(zhí)行任務(wù) 3
- threadPool.submit(runnable);
- // 執(zhí)行任務(wù) 4
- threadPool.submit(runnable);
- // 等待所有任務(wù)執(zhí)行完終止線程池
- threadPool.shutdown();
- }
- }
以上代碼執(zhí)行結(jié)果如下:
從上述結(jié)果可以看出:當(dāng) CyclicBarrier 的計(jì)數(shù)器設(shè)置為 2 時(shí),線程 2 和 線程 3 都到屏障點(diǎn)之后,老司機(jī)才會(huì)發(fā)第一波車,再 2s 之后,線程 1 和線程 4 也同時(shí)進(jìn)入了屏障點(diǎn),這時(shí)候老司機(jī)又可以再發(fā)一波車了。
實(shí)現(xiàn)原理
我們先來看下 CyclicBarrier 的類圖:
由上圖可知 CyclicBarrier 是基于獨(dú)占鎖 ReentrantLock 實(shí)現(xiàn)的,其底層也是基于 AQS 的。
在 CyclicBarrier 類的內(nèi)部有一個(gè)計(jì)數(shù)器 count,當(dāng) count 不為 0 時(shí),每個(gè)線程在到達(dá)屏障點(diǎn)會(huì)先調(diào)用 await 方法將自己阻塞,此時(shí)計(jì)數(shù)器會(huì)減 1,直到計(jì)數(shù)器減為 0 的時(shí)候,所有因調(diào)用 await 方法而被阻塞的線程就會(huì)被喚醒繼續(xù)執(zhí)行。當(dāng) count 計(jì)數(shù)器變成 0 之后,就會(huì)進(jìn)入下一輪阻塞,此時(shí) parties(parties 是在 new CyclicBarrier(parties) 時(shí)設(shè)置的值)會(huì)將它的值賦值給 count 從而實(shí)現(xiàn)復(fù)用。
常用方法
CyclicBarrier(parties):初始化相互等待的線程數(shù)量的構(gòu)造方法。
CyclicBarrier(parties,Runnable barrierAction):初始化相互等待的線程數(shù)量以及屏障線程的構(gòu)造方法,當(dāng) CyclicBarrier 的計(jì)數(shù)器變?yōu)?0 時(shí),會(huì)執(zhí)行 barrierAction 構(gòu)造方法。
getParties():獲取 CyclicBarrier 打開屏障的線程數(shù)量,也稱為方數(shù)。
getNumberWaiting():獲取正在CyclicBarrier上等待的線程數(shù)量。
await():在 CyclicBarrier 上進(jìn)行阻塞等待,直到發(fā)生以下情形之一:在 CyclicBarrier 上等待的線程數(shù)量達(dá)到 parties,則所有線程被釋放,繼續(xù)執(zhí)行;
- 當(dāng)前線程被中斷,則拋出 InterruptedException 異常,并停止等待,繼續(xù)執(zhí)行;
- 其他等待的線程被中斷,則當(dāng)前線程拋出 BrokenBarrierException 異常,并停止等待,繼續(xù)執(zhí)行;
- 其他等待的線程超時(shí),則當(dāng)前線程拋出 BrokenBarrierException 異常,并停止等待,繼續(xù)執(zhí)行;
- 其他線程調(diào)用 CyclicBarrier.reset() 方法,則當(dāng)前線程拋出 BrokenBarrierException 異常,并停止等待,繼續(xù)執(zhí)行。
await(timeout,TimeUnit):在CyclicBarrier上進(jìn)行限時(shí)的阻塞等待,直到發(fā)生以下情形之一:
- 在 CyclicBarrier 上等待的線程數(shù)量達(dá)到 parties,則所有線程被釋放,繼續(xù)執(zhí)行;
- 當(dāng)前線程被中斷,則拋出 InterruptedException 異常,并停止等待,繼續(xù)執(zhí)行;
- 當(dāng)前線程等待超時(shí),則拋出 TimeoutException 異常,并停止等待,繼續(xù)執(zhí)行;
- 其他等待的線程被中斷,則當(dāng)前線程拋出 BrokenBarrierException 異常,并停止等待,繼續(xù)執(zhí)行;
- 其他等待的線程超時(shí),則當(dāng)前線程拋出 BrokenBarrierException 異常,并停止等待,繼續(xù)執(zhí)行;
- 其他線程調(diào)用 CyclicBarrier.reset() 方法,則當(dāng)前線程拋出 BrokenBarrierException 異常,并停止等待,繼續(xù)執(zhí)行。
isBroken():獲取是否破損標(biāo)志位 broken 的值,此值有以下幾種情況:
- CyclicBarrier 初始化時(shí),broken=false,表示屏障未破損;
- 如果正在等待的線程被中斷,則 broken=true,表示屏障破損;
- 如果正在等待的線程超時(shí),則 broken=true,表示屏障破損;
- 如果有線程調(diào)用 CyclicBarrier.reset() 方法,則 broken=false,表示屏障回到未破損狀態(tài)。
reset():使得CyclicBarrier回歸初始狀態(tài),直觀來看它做了兩件事:
- 如果有正在等待的線程,則會(huì)拋出 BrokenBarrierException 異常,且這些線程停止等待,繼續(xù)執(zhí)行。
- 將是否破損標(biāo)志位 broken 置為 false。
總結(jié)
CyclicBrrier 是通過獨(dú)占鎖 ReentrantLock 實(shí)現(xiàn)計(jì)數(shù)器的原子性更新的,CyclicBrrier 最常用的是 await() 方法,使用此方法會(huì)將計(jì)數(shù)器 -1,并判斷當(dāng)前的計(jì)數(shù)器是否為 0,如果不為 0 就會(huì)阻塞等待,并計(jì)時(shí)器為 0 之后,才能繼續(xù)執(zhí)行剩余任務(wù)。CyclicBrrier 相比于 CountDownLatch 來說,它的優(yōu)勢(shì)在于可以重復(fù)使用。
參考 & 鳴謝
- blog.csdn.net/qq_39241239/article/details/87030142
- blog.csdn.net/zzg1229059735/article/details/61191679
- www.cnblogs.com/yaochunhui/p/13494689.html