JUC - CountDownLach原理分析
作者:一角錢(qián)技術(shù)
CountDownLatch 和 Semaphore 一樣都是共享模式下資源問(wèn)題,這些源碼實(shí)現(xiàn)AQS的模版方法,然后使用CAS+循環(huán)重試實(shí)現(xiàn)自己的功能。
CountDownLach閉鎖
背景
- CountDownLatch是在Java1.5被引入,跟它一起被引入的工具類(lèi)還有CyclicBarrier、Semaphore、ConcurrenthashMap和BlockingQueue。
- 在java.util.cucurrent包下。
概念
- CountDownLatch這個(gè)類(lèi)使一個(gè)線程等待其它線程各自執(zhí)行完畢后再執(zhí)行。
- 是通過(guò)一個(gè)計(jì)數(shù)器來(lái)實(shí)現(xiàn)的,計(jì)數(shù)器的初始值是線程的數(shù)量。每當(dāng)一個(gè)線程執(zhí)行完畢后,計(jì)數(shù)器的值就-1,當(dāng)計(jì)數(shù)器的值為0時(shí),表示所有線程都執(zhí)行完畢,然后在閉鎖上等待的線程就可以恢復(fù)工作來(lái)。
源碼
- countDownLatch類(lèi)中只提供了一個(gè)構(gòu)造器
- public CountDownLatch(int count) {
- if (count < 0) throw new IllegalArgumentException("count < 0");
- this.sync = new Sync(count);
- }
- 類(lèi)中有三個(gè)方法是最重要的
- // 調(diào)用await()方法的線程會(huì)被掛起,它會(huì)等待直到count值為0才繼續(xù)執(zhí)行
- public void await() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);
- }//和await()方法類(lèi)似,只不過(guò)等待一定的時(shí)間后count值還沒(méi)變?yōu)?的化就會(huì)繼續(xù)執(zhí)行
- public boolean await(long timeout, TimeUnit unit)
- throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
- }//將count值減1
- public void countDown() { sync.releaseShared(1);
- }
示例
普通示例:
- public class CountDownLatchTest {
- public static void main(String[] args) {
- final CountDownLatch latch = new CountDownLatch(2);
- System.out.println("主線程開(kāi)始執(zhí)行…… ……");
- //第一個(gè)子線程執(zhí)行
- ExecutorService es1 = Executors.newSingleThreadExecutor();
- es1.execute(new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(3000);
- System.out.println("子線程:"+Thread.currentThread().getName()+"執(zhí)行");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- latch.countDown();
- }
- });
- es1.shutdown();
- //第二個(gè)子線程執(zhí)行
- ExecutorService es2 = Executors.newSingleThreadExecutor();
- es2.execute(new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("子線程:"+Thread.currentThread().getName()+"執(zhí)行");
- latch.countDown();
- }
- });
- es2.shutdown();
- System.out.println("等待兩個(gè)線程執(zhí)行完畢…… ……");
- try {
- latch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("兩個(gè)子線程都執(zhí)行完畢,繼續(xù)執(zhí)行主線程");
- }
- }
結(jié)果集:
- 主線程開(kāi)始執(zhí)行…… ……
- 等待兩個(gè)線程執(zhí)行完畢…… ……子線程:pool-1-thread-1執(zhí)行子線程:pool-2-thread-1執(zhí)行兩個(gè)子線程都執(zhí)行完畢,繼續(xù)執(zhí)行主線程
模擬并發(fā)示例:
- public class Parallellimit {
- public static void main(String[] args) {
- ExecutorService pool = Executors.newCachedThreadPool(); CountDownLatch cdl = new CountDownLatch(100);
- for (int i = 0; i < 100; i++) {
- CountRunnable runnable = new CountRunnable(cdl);
- pool.execute(runnable); } }} class CountRunnable implements Runnable {
- private CountDownLatch countDownLatch;
- public CountRunnable(CountDownLatch countDownLatch) {
- this.countDownLatch = countDownLatch;
- } @Override
- public void run() {
- try {
- synchronized (countDownLatch) { /*** 每次減少一個(gè)容量*/
- countDownLatch.countDown(); System.out.println("thread counts = " + (countDownLatch.getCount()));
- } countDownLatch.await();
- System.out.println("concurrency counts = " + (100 - countDownLatch.getCount()));
- } catch (InterruptedException e) {
- e.printStackTrace(); } }}
源碼分析
- public class CountDownLatch {
- //繼承AQS來(lái)實(shí)現(xiàn)他的模板方法(tryAcquireShared,tryReleaseShared)
- private static final class Sync extends AbstractQueuedSynchronizer { //計(jì)數(shù)個(gè)數(shù)Count
- Sync(int count) {
- setState(count); } int getCount() {
- return getState();
- } //AQS方法getState(),返回同步狀態(tài),這里指計(jì)數(shù)器值 protected int tryAcquireShared(int acquires) {
- return (getState() == 0) ? 1 : -1;
- } //循環(huán)+cas重試 直到計(jì)數(shù)器為0 跳出,則release(實(shí)現(xiàn)aqs共享模式釋放方法)
- protected boolean tryReleaseShared(int releases) {
- // Decrement count; signal when transition to zero
- for (;;) {
- int c = getState();
- if (c == 0)
- return false;
- int nextc = c-1;
- if (compareAndSetState(c, nextc))
- return nextc == 0;
- } } } private final Sync sync; //實(shí)例化
- public CountDownLatch(int count) {
- if (count < 0) throw new IllegalArgumentException("count < 0");
- this.sync = new Sync(count); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1);
- } //帶有一個(gè)超時(shí)時(shí)間的awit public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
- } public void countDown() { sync.releaseShared(1);
- } public long getCount() { return sync.getCount();
- }}
總結(jié)
CountDownLatch 和 Semaphore 一樣都是共享模式下資源問(wèn)題,這些源碼實(shí)現(xiàn)AQS的模版方法,然后使用CAS+循環(huán)重試實(shí)現(xiàn)自己的功能。在RT多個(gè)資源調(diào)用,或者執(zhí)行某種操作依賴(lài)其他操作完成下可以發(fā)揮這個(gè)計(jì)數(shù)器的作用。
責(zé)任編輯:姜華
來(lái)源:
今日頭條