Java并發(fā)之CountDownLatch、CyclicBarrier和Semaphore
這次說一下 JUC 中的同步器三個主要的成員:CountDownLatch、CyclicBarrier 和 Semaphore(不知道有沒有初學者覺得這三個的名字不太好記)。這三個是 JUC 中較為常用的同步器,通過它們可以方便地實現(xiàn)很多線程之間協(xié)作的功能。(下面的代碼出自 JDK 文檔)
CountDownLatch
直譯過來就是倒計數(shù)(CountDown)門閂(Latch)。倒計數(shù)不用說,門閂的意思顧名思義就是阻止前進。在這里就是指 CountDownLatch.await() 方法在倒計數(shù)為0之前會阻塞當前線程。
作用
CountDownLatch 的作用和 Thread.join() 方法類似,可用于一組線程和另外一組線程的協(xié)作。例如,主線程在做一項工作之前需要一系列的準備工作,只有這些準備工作都完成,主線程才能繼續(xù)它的工作。這些準備工作彼此獨立,所以可以并發(fā)執(zhí)行以提高速度。在這個場景下就可以使用 CountDownLatch 協(xié)調(diào)線程之間的調(diào)度了。在直接創(chuàng)建線程的年代(Java 5.0 之前),我們可以使用 Thread.join()。在 JUC 出現(xiàn)后,因為線程池中的線程不能直接被引用,所以就必須使用 CountDownLatch 了。
示例
下面的這個例子可以理解為 F1 賽車的維修過程,只有 startSignal (可以表示停車,可能名字不太貼合)命令下達之后,維修工才開始干活,只有等所有工人完成工作之后,賽車才能繼續(xù)。
- class Driver { // ...
- void main() throws InterruptedException {
- CountDownLatch startSignal = new CountDownLatch(1);
- CountDownLatch doneSignal = new CountDownLatch(N);
- for (int i = 0; i < N; ++i) // create and start threads
- new Thread(new Worker(startSignal, doneSignal)).start();
- doSomethingElse(); // don't let run yet
- startSignal.countDown(); // let all threads proceed
- doSomethingElse();
- doneSignal.await(); // wait for all to finish
- }
- }
- class Worker implements Runnable {
- private final CountDownLatch startSignal;
- private final CountDownLatch doneSignal;
- Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
- this.startSignal = startSignal;
- this.doneSignal = doneSignal;
- }
- public void run() {
- try {
- startSignal.await();
- doWork();
- doneSignal.countDown();
- } catch (InterruptedException ex) {} // return;
- }
- void doWork() { ... }
- }
當 startSignal.await() 會阻塞線程,當 startSignal.countDown() 被調(diào)用之后,所有 Worker 線程開始執(zhí)行 doWork() 方法,所以 Worker。doWork() 是幾乎同時開始執(zhí)行的。當 Worker.doWork() 執(zhí)行完畢后,調(diào)用 doneSignal.countDown(),在所有 Worker 線程執(zhí)行完畢之后,主線程繼續(xù)執(zhí)行。
CyclicBarrier
CyclicBarrier 翻譯過來叫循環(huán)柵欄、循環(huán)障礙什么的(還是有點別扭的。所以還是別翻譯了,只可意會不可言傳啊)。它主要的方法就是一個:await()。await() 方法沒被調(diào)用一次,計數(shù)便會減少1,并阻塞住當前線程。當計數(shù)減至0時,阻塞解除,所有在此 CyclicBarrier 上面阻塞的線程開始運行。在這之后,如果再次調(diào)用 await() 方法,計數(shù)就又會變成 N-1,新一輪重新開始,這便是 Cyclic 的含義所在。
CyclicBarrier 的使用并不難,但需要主要它所相關(guān)的異常。除了常見的異常,CyclicBarrier.await() 方法會拋出一個獨有的 BrokenBarrierException。這個異常發(fā)生在當某個線程在等待本 CyclicBarrier 時被中斷或超時或被重置時,其它同樣在這個 CyclicBarrier 上等待的線程便會受到 BrokenBarrierException。意思就是說,同志們,別等了,有個小伙伴已經(jīng)掛了,咱們?nèi)绻^續(xù)等有可能會一直等下去,所有各回各家吧。
CyclicBarrier.await() 方法帶有返回值,用來表示當前線程是第幾個到達這個 Barrier 的線程。
和 CountDownLatch 一樣,CyclicBarrier 同樣可以可以在構(gòu)造函數(shù)中設(shè)定總計數(shù)值。與 CountDownLatch 不同的是,CyclicBarrier 的構(gòu)造函數(shù)還可以接受一個 Runnable,會在 CyclicBarrier 被釋放時執(zhí)行。
- “NOTE: CyclicBarrier 的功能也可以由 CountDownLatch 來實現(xiàn)
示例
CyclicBarrier 的應(yīng)用(當然,這個例子換成 CountDownLatch 也是可以實現(xiàn)的,很簡單,就不說怎么寫了)
- class Solver {
- final int N;
- final float[][] data;
- final CyclicBarrier barrier;
- class Worker implements Runnable {
- int myRow;
- Worker(int row) { myRow = row; }
- public void run() {
- while (!done()) {
- processRow(myRow);
- try {
- barrier.await();
- } catch (InterruptedException ex) {
- return;
- } catch (BrokenBarrierException ex) {
- return;
- }
- }
- }
- }
- public Solver(float[][] matrix) {
- data = matrix;
- N = matrix.length;
- barrier = new CyclicBarrier(N, new Runnable() {
- public void run() {
- mergeRows(...);
- }
- });
- for (int i = 0; i < N; ++i)
- new Thread(new Worker(i)).start();
- waitUntilDone();
- }
- }
CyclicBarrier 和 CountDownLatch 在用法上的不同
CountDownLatch 適用于一組線程和另一個主線程之間的工作協(xié)作。一個主線程等待一組工作線程的任務(wù)完畢才繼續(xù)它的執(zhí)行是使用 CountDownLatch 的主要場景;CyclicBarrier 用于一組或幾組線程,比如一組線程需要在一個時間點上達成一致,例如同時開始一個工作。另外,CyclicBarrier 的循環(huán)特性和構(gòu)造函數(shù)所接受的 Runnable 參數(shù)也是 CountDownLatch 所不具備的。
Semaphore
Semaphore 直譯是信號量,可能稱它是許可量更容易理解。當然,因為在計算機科學中這個名字由來已久,所以不能亂改。它的功能比較好理解,就是通過構(gòu)造函數(shù)設(shè)定一個數(shù)量的許可,然后通過 acquire 方法獲得許可,release 方法釋放許可。它還有 tryAcquire 和 acquireUninterruptibly 方法,可以根據(jù)自己的需要選擇
示例:Semaphore 控制資源訪問
- class Pool {
- private static final int MAX_AVAILABLE = 100;
- private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
- public Object getItem() throws InterruptedException {
- available.acquire();
- return getNextAvailableItem();
- }
- public void putItem(Object x) {
- if (markAsUnused(x))
- available.release();
- }
- // Not a particularly efficient data structure; just for demo
- protected Object[] items = ... whatever kinds of items being managed
- protected boolean[] used = new boolean[MAX_AVAILABLE];
- protected synchronized Object getNextAvailableItem() {
- for (int i = 0; i < MAX_AVAILABLE; ++i) {
- if (!used[i]) {
- used[i] = true;
- return items[i];
- }
- }
- return null; // not reached
- }
- protected synchronized boolean markAsUnused(Object item) {
- for (int i = 0; i < MAX_AVAILABLE; ++i) {
- if (item == items[i]) {
- if (used[i]) {
- used[i] = false;
- return true;
- } else
- return false;
- }
- }
- return false;
- }
- }
上面這個示例中 Semaphore 的用法沒什么可多講的。需要留言的是這里面有兩個同步方法,不過對吞吐應(yīng)該沒什么影響,因為主要是對一個 boolean 數(shù)組做一下 O(n) 的操作,而且每個循環(huán)里面的操作很簡單,所以速度很快。不過不知道 JUC 里面線程池的控制是怎么做的,本人不才,還沒看過那塊源代碼,得空看看,有知道的也可以說說。
最后一句話總結(jié)
CountDownLatch 是能使一組線程等另一組線程都跑完了再繼續(xù)跑;CyclicBarrier 能夠使一組線程在一個時間點上達到同步,可以是一起開始執(zhí)行全部任務(wù)或者一部分任務(wù)。同時,它是可以循環(huán)使用的;Semaphore 是只允許一定數(shù)量的線程同時執(zhí)行一段任務(wù)。