Java高并發(fā)編程基礎(chǔ)三大利器之CyclicBarrier
引言
前面一篇文章我們《Java高并發(fā)編程基礎(chǔ)三大利器之CountDownLatch》它有一個(gè)缺點(diǎn),就是它的計(jì)數(shù)器只能夠使用一次,也就是說(shuō)當(dāng)計(jì)數(shù)器(state)減到為 0的時(shí)候,如果 再有線程調(diào)用去 await() 方法,該線程會(huì)直接通過(guò),不會(huì)再起到等待其他線程執(zhí)行結(jié)果起到同步的作用。為了解決這個(gè)問(wèn)題CyclicBarrier就應(yīng)運(yùn)而生了。
什么是CyclicBarrier
CyclicBarrier是什么?把它拆開(kāi)來(lái)翻譯就是循環(huán)(Cycle)和屏障(Barrier)
它的主要作用其實(shí)和CountDownLanch差不多,都是讓一組線程到達(dá)一個(gè)屏障時(shí)被阻塞,直到最后一個(gè)線程到達(dá)屏障時(shí),屏障會(huì)被打開(kāi),所有被屏障阻塞的線程才會(huì)繼續(xù)執(zhí)行,不過(guò)它是可以循環(huán)執(zhí)行的,這是它與CountDownLanch最大的不同。CountDownLanch是只有當(dāng)最后一個(gè)線程把計(jì)數(shù)器置為0的時(shí)候,其他阻塞的線程才會(huì)繼續(xù)執(zhí)行。學(xué)習(xí)CyclicBarrier之前建議先去看看這幾篇文章:
《Java高并發(fā)編程基礎(chǔ)三大利器之Semaphore》
《Java高并發(fā)編程基礎(chǔ)三大利器之CountDownLatch》
如何使用
我們首先先來(lái)看下關(guān)于使用CyclicBarrier的一個(gè)demo:比如游戲中有個(gè)關(guān)卡的時(shí)候,每次進(jìn)入下一關(guān)的時(shí)候都需要進(jìn)行加載一些地圖、特效背景音樂(lè)什么的只有全部加載完了才能夠進(jìn)行游戲:
- /**demo 來(lái)源https://blog.csdn.net/lstcui/article/details/107389371
- * 公眾號(hào)【java金融】
- */
- public class CyclicBarrierExample {
- static class PreTaskThread implements Runnable {
- private String task;
- private CyclicBarrier cyclicBarrier;
- public PreTaskThread(String task, CyclicBarrier cyclicBarrier) {
- this.task = task;
- this.cyclicBarrier = cyclicBarrier;
- }
- @Override
- public void run() {
- for (int i = 0; i < 4; i++) {
- Random random = new Random();
- try {
- Thread.sleep(random.nextInt(1000));
- System.out.println(String.format("關(guān)卡 %d 的任務(wù) %s 完成", i, task));
- cyclicBarrier.await();
- } catch (InterruptedException | BrokenBarrierException e) {
- e.printStackTrace();
- }
- }
- }
- public static void main(String[] args) {
- CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
- System.out.println("本關(guān)卡所有的前置任務(wù)完成,開(kāi)始游戲... ...");
- });
- new Thread(new PreTaskThread("加載地圖數(shù)據(jù)", cyclicBarrier)).start();
- new Thread(new PreTaskThread("加載人物模型", cyclicBarrier)).start();
- new Thread(new PreTaskThread("加載背景音樂(lè)", cyclicBarrier)).start();
- }
- }
- }
輸出結(jié)果如下:
我們可以看到每次游戲開(kāi)始都會(huì)等當(dāng)前關(guān)卡把游戲的人物模型,地圖數(shù)據(jù)、背景音樂(lè)加載完成后才會(huì)開(kāi)始進(jìn)行游戲。并且還是可以循環(huán)控制的。
源碼分析
結(jié)構(gòu)組成
- /** The lock for guarding barrier entry */
- private final ReentrantLock lock = new ReentrantLock();
- /** Condition to wait on until tripped */
- private final Condition trip = lock.newCondition();
- /** The number of parties */
- private final int parties;
- /* The command to run when tripped */
- private final Runnable barrierCommand;
- /** The current generation */
- private Generation generation = new Generation();
- lock:用于保護(hù)屏障入口的鎖
- trip :達(dá)到屏障并且不能放行的線程在trip條件變量上等待
- parties :柵欄開(kāi)啟需要的到達(dá)線程總數(shù)
- barrierCommand:最后一個(gè)線程到達(dá)屏障后執(zhí)行的回調(diào)任務(wù)
- generation:這是一個(gè)內(nèi)部類(lèi),通過(guò)它實(shí)現(xiàn)CyclicBarrier重復(fù)利用,每當(dāng)await達(dá)到最大次數(shù)的時(shí)候,就會(huì)重新new 一個(gè),表示進(jìn)入了下一個(gè)輪回。里面只有一個(gè)boolean型屬性,用來(lái)表示當(dāng)前輪回是否有線程中斷。
主要方法
- public int await() throws InterruptedException, BrokenBarrierException {
- try {
- return dowait(false, 0L);
- } catch (TimeoutException toe) {
- throw new Error(toe); // cannot happen
- }
- }
- * Main barrier code, covering the various policies.
- */
- private int dowait(boolean timed, long nanos)
- throws InterruptedException, BrokenBarrierException,
- TimeoutException {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- //獲取barrier當(dāng)前的 “代”也就是當(dāng)前循環(huán)
- final Generation g = generation;
- if (g.broken)
- throw new BrokenBarrierException();
- if (Thread.interrupted()) {
- breakBarrier();
- throw new InterruptedException();
- }
- // 每來(lái)一個(gè)線程調(diào)用await方法都會(huì)進(jìn)行減1
- int index = --count;
- if (index == 0) { // tripped
- boolean ranAction = false;
- try {
- final Runnable command = barrierCommand;
- // new CyclicBarrier 傳入 的barrierCommand, command.run()這個(gè)方法是同步的,如果耗時(shí)比較多的話,是否執(zhí)行的時(shí)候需要考慮下是否異步來(lái)執(zhí)行。
- if (command != null)
- command.run();
- ranAction = true;
- // 這個(gè)方法1. 喚醒所有阻塞的線程,2. 重置下count(count 每來(lái)一個(gè)線程都會(huì)進(jìn)行減1)和generation,以便于下次循環(huán)。
- nextGeneration();
- return 0;
- } finally {
- if (!ranAction)
- breakBarrier();
- }
- }
- // loop until tripped, broken, interrupted, or timed out
- for (;;) {
- try {
- // 進(jìn)入if條件,說(shuō)明是不帶超時(shí)的await
- if (!timed)
- // 當(dāng)前線程會(huì)釋放掉lock,然后進(jìn)入到trip條件隊(duì)列的尾部,然后掛起自己,等待被喚醒。
- trip.await();
- else if (nanos > 0L)
- //說(shuō)明當(dāng)前線程調(diào)用await方法時(shí) 是指定了 超時(shí)時(shí)間的!
- nanos = trip.awaitNanos(nanos);
- } catch (InterruptedException ie) {
- //Node節(jié)點(diǎn)在 條件隊(duì)列內(nèi) 時(shí) 收到中斷信號(hào)時(shí) 會(huì)拋出中斷異常!
- //g == generation 成立,說(shuō)明當(dāng)前代并沒(méi)有變化。
- //! g.broken 當(dāng)前代如果沒(méi)有被打破,那么當(dāng)前線程就去打破,并且拋出異常..
- if (g == generation && ! g.broken) {
- breakBarrier();
- throw ie;
- } else {
- // We're about to finish waiting even if we had not
- // been interrupted, so this interrupt is deemed to
- // "belong" to subsequent execution.
- //執(zhí)行到else有幾種情況?
- //1.代發(fā)生了變化,這個(gè)時(shí)候就不需要拋出中斷異常了,因?yàn)?nbsp;代已經(jīng)更新了,這里喚醒后就走正常邏輯了..只不過(guò)設(shè)置下 中斷標(biāo)記。
- //2.代沒(méi)有發(fā)生變化,但是代被打破了,此時(shí)也不用返回中斷異常,執(zhí)行到下面的時(shí)候會(huì)拋出 brokenBarrier異常。也記錄下中斷標(biāo)記位。
- Thread.currentThread().interrupt();
- }
- }
- //喚醒后,執(zhí)行到這里,有幾種情況?
- //1.正常情況,當(dāng)前barrier開(kāi)啟了新的一代(trip.signalAll())
- //2.當(dāng)前Generation被打破,此時(shí)也會(huì)喚醒所有在trip上掛起的線程
- //3.當(dāng)前線程trip中等待超時(shí),然后主動(dòng)轉(zhuǎn)移到 阻塞隊(duì)列 然后獲取到鎖 喚醒。
- if (g.broken)
- throw new BrokenBarrierException();
- //喚醒后,執(zhí)行到這里,有幾種情況?
- //1.正常情況,當(dāng)前barrier開(kāi)啟了新的一代(trip.signalAll())
- //2.當(dāng)前線程trip中等待超時(shí),然后主動(dòng)轉(zhuǎn)移到 阻塞隊(duì)列 然后獲取到鎖 喚醒。
- if (g != generation)
- return index;
- //喚醒后,執(zhí)行到這里,有幾種情況?
- //.當(dāng)前線程trip中等待超時(shí),然后主動(dòng)轉(zhuǎn)移到 阻塞隊(duì)列 然后獲取到鎖 喚醒。
- if (timed && nanos <= 0L) {
- breakBarrier();
- throw new TimeoutException();
- }
- }
- } finally {
- lock.unlock();
- }
- }
小結(jié)
到了這里我們是不是可以知道為啥CyclicBarrier可以進(jìn)行循環(huán)計(jì)數(shù)?
- CyclicBarrier采用一個(gè)內(nèi)部類(lèi)Generation來(lái)維護(hù)當(dāng)前循環(huán),每一個(gè)await方法都會(huì)存儲(chǔ)當(dāng)前的generation,獲取到相同generation對(duì)象的屬于同一組,每當(dāng)count的次數(shù)耗盡就會(huì)重新new一個(gè)Generation并且重新設(shè)置count的值為parties,表示進(jìn)入下一次新的循環(huán)。
從這個(gè)await方法我們是不是可以知道只要有一個(gè)線程被中斷了,當(dāng)代的 generation的broken 就會(huì)被設(shè)置為true,所以會(huì)導(dǎo)致其他的線程也會(huì)被拋出BrokenBarrierException。相當(dāng)于一個(gè)失敗其他也必須失敗,感覺(jué)有“強(qiáng)一致性“的味道。
總結(jié)
- CountDownLanch是為計(jì)數(shù)器是設(shè)置一個(gè)值,當(dāng)多次執(zhí)行countdown后,計(jì)數(shù)器減為0的時(shí)候所有線程被喚醒,然后CountDownLanch失效,只能夠使用一次。
- CyclicBarrier是當(dāng)count為0時(shí)同樣喚醒全部線程,同時(shí)會(huì)重新設(shè)置count為parties,重新new一個(gè)generation來(lái)實(shí)現(xiàn)重復(fù)利用。
本文轉(zhuǎn)載自微信公眾號(hào)「java金融」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系java金融公眾號(hào)。