面試官:CountDownLatch有了解過嗎?
前言
Java提供了一些非常好用的并發(fā)工具類,不需要我們重復(fù)造輪子,本節(jié)我們講解CountDownLatch,一起來看下吧!
CountDownLatch
首先我們來看下這玩意是干啥用的。CountDownLatch同樣的也是java.util.concurrent并發(fā)包下的工具類,通常我們會(huì)叫它是并發(fā)計(jì)數(shù)器,這個(gè)計(jì)數(shù)不是記12345,主要的使用場景是當(dāng)一個(gè)任務(wù)被拆分成多個(gè)子任務(wù)時(shí),需要等待子任務(wù)全部完成后,不然會(huì)阻塞線程,每完成一個(gè)任務(wù)計(jì)數(shù)器會(huì)-1,直到?jīng)]有。這個(gè)有點(diǎn)類似go語言中的的sync.WaitGroup。
廢話不多說,我們通過例子帶大家快速入門, 在這之前,還需給大家補(bǔ)充一下它的常用方法:
- public CountDownLatch(int count) {...}構(gòu)造函數(shù)。
- void await()是當(dāng)前線程等待直到鎖存儲(chǔ)器計(jì)到0,或者線程被中斷。
- boolean await(long timeout, TimeUnit unit)是當(dāng)前線程等待直到鎖存儲(chǔ)器計(jì)到0,或者線程被中斷, 如果為0返回true, 可以指定等待的超時(shí)時(shí)間。
- countDown()遞減鎖存器的計(jì)數(shù),如果到0則釋放所有等待的線程。
- getCount()獲取鎖存器的計(jì)數(shù)。
下面我們看下具體的使用:
public class CountDownLaunchTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(10);
IntStream.range(0, 10).forEach(i -> {
new Thread(() -> {
try {
Thread.sleep(2000);
System.out.println("worker ------> " + i);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
}).start();
});
countDownLatch.await();
System.out.println("completed !");
}
}
時(shí)間輸出:
worker ------> 1
worker ------> 4
worker ------> 5
worker ------> 7
worker ------> 8
worker ------> 0
worker ------> 2
worker ------> 3
worker ------> 9
worker ------> 6
completed !
進(jìn)程已結(jié)束,退出代碼0
可以看到任務(wù)沒有完全結(jié)束之前,主線程是阻塞狀態(tài)。
源碼剖析
首先看下構(gòu)造函數(shù)。
private final Sync sync;
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
這個(gè)sync有沒有很熟悉,這里又遇到了CAS,幾乎涉及到多線程的實(shí)現(xiàn)類都會(huì)有。
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
countDown
首先在構(gòu)造函數(shù)中初始化狀態(tài),對(duì)應(yīng)的setState(count);, 其實(shí)它的底層實(shí)現(xiàn)就是依賴AQS。CountDownLatch主要有兩個(gè)方法一個(gè)是countDown一個(gè)是await,下面我們就來看下是如何實(shí)現(xiàn)的。
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared()方法的實(shí)現(xiàn)在countDownLatch,自旋操作判斷值是否為0,為0說明都執(zhí)行完了,之前說的遞減就是在這完成的,就會(huì)走到doReleaseShared也就是釋放操作。有想過為啥c==0 返回false嗎?可以回顧上一步操作if (tryReleaseShared)才會(huì)去doReleaseShared,也就是任務(wù)全部執(zhí)行完才會(huì)去釋放,釋放的過程其實(shí)是一個(gè)隊(duì)列去完成的。
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
doReleaseShared是`AbstractQueuedSynchronizer'的內(nèi)部方法。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
這個(gè)方法之前給大家講過,其實(shí)就是釋放鎖的操作??梢钥吹皆谶@里只喚醒了頭節(jié)點(diǎn)的后繼節(jié)點(diǎn),然后就返回了,為啥是后繼節(jié)點(diǎn),繼續(xù)看unparkSuccessor。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 后繼節(jié)點(diǎn)
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
那么剩余的其它線程怎么去釋放呢?
await
再看下await(),同樣的也調(diào)用了內(nèi)部方法acquireSharedInterruptibly。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// CountDownLatch
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
重點(diǎn)在 doAcquireSharedInterruptibly。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 以共享模式添加到等待隊(duì)列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 返回前一個(gè)節(jié)點(diǎn)
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null;
failed = false;
return;
}
}
// 檢查并更新未能獲取的節(jié)點(diǎn)的狀態(tài)。如果線程應(yīng)該阻塞,則返回 true
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
// 失敗就取消
if (failed)
cancelAcquire(node);
}
}