Java 并發(fā)流程工具的實戰(zhàn)探索
在當(dāng)今數(shù)字化時代,軟件系統(tǒng)面臨著日益增長的高并發(fā)需求。無論是大型電商平臺在促銷活動時處理海量的訂單請求,還是在線游戲服務(wù)器同時承載眾多玩家的交互操作,高效的并發(fā)處理能力都成為了系統(tǒng)性能和穩(wěn)定性的關(guān)鍵因素。
Java作為一門廣泛應(yīng)用于企業(yè)級開發(fā)的編程語言,提供了豐富且強(qiáng)大的并發(fā)流程工具。這些工具猶如精巧的齒輪,相互配合,使開發(fā)者能夠在多線程環(huán)境下有條不紊地控制程序的執(zhí)行流程,確保各個線程之間協(xié)調(diào)運作,高效完成復(fù)雜的任務(wù)。
本文將踏上Java并發(fā)流程工具的實戰(zhàn)探索之旅。我們不僅會深入剖析這些工具的核心原理,更會通過實際代碼示例,詳細(xì)展示它們在不同應(yīng)用場景中的具體應(yīng)用。從簡單的線程同步控制,到復(fù)雜的多階段任務(wù)協(xié)調(diào),一步步揭開Java并發(fā)流程工具的神秘面紗,幫助讀者掌握在實際項目中運用這些工具優(yōu)化程序性能、提升系統(tǒng)可靠性的技巧。
一、CountDownLatch
1. 詳解CountDownLatch工作流程
筆者一般稱CountDownLatch為倒計時門閂,它主要用于需要某些條件下才能喚醒的需求場景,例如我們線程1必須等到線程2做完某些事,那么就可以設(shè)置一個CountDownLatch并將數(shù)值設(shè)置為1,一旦線程2完成業(yè)務(wù)邏輯后,將數(shù)值修改為0,此時線程1就會被喚醒:
2. 模擬等待工作完成
通過上述的描述可能有點抽象,我們直接通過幾個例子演示一下,我們現(xiàn)在有這樣一個需求,希望等待5個線程完成之后,打印輸出一句工作完成:
對應(yīng)的代碼示例如下,可以看到我們創(chuàng)建了數(shù)值為5的CountDownLatch ,一旦線程池里的線程完成工作后就調(diào)用countDown進(jìn)行扣減,一旦數(shù)值變?yōu)?,主線程await就會放行,執(zhí)行后續(xù)輸出:
int workerSize = 5;
CountDownLatch workCount = new CountDownLatch(workerSize);
ExecutorService threadPool = Executors.newFixedThreadPool(workerSize);
for (int i = 0; i < workerSize; i++) {
final int workerNum = i;
//5個工人輸出完成工作后,扣減倒計時門閂數(shù)
threadPool.submit(() -> {
log.info("worker[{}]完成手頭的工作", workerNum);
workCount.countDown();
});
}
try {
//阻塞當(dāng)前線程(主線程)往后走,只有倒計時門閂變?yōu)?之后才能繼續(xù)后續(xù)邏輯
log.info("等待worker工作完成");
workCount.await();
} catch (InterruptedException e) {
log.info("倒計時門閂阻塞失敗,失敗原因[{}]", e.getMessage(), e);
}
threadPool.shutdown();
while (!threadPool.isTerminated()) {
}
log.info("所有工人都完成手頭的工作了");
對應(yīng)的我們也給出輸出結(jié)果,可以看到主線程在線程池線程完成后才輸出:
3. 模擬運動員賽跑
實際上CountDownLatch可以讓多個線程進(jìn)行等待,我們不妨用線程模擬一下所有運動員就緒后,等待槍響后起跑的場景:
代碼如下,每當(dāng)運動員即線程池的線程準(zhǔn)備就緒,則調(diào)用await等待槍響,一旦所有運動員就緒之后,主線程調(diào)用countDown模擬槍響,然后運動員起跑:
public static void main(String[] args) {
log.info("百米跑比賽開始");
int playerNum = 3;
CountDownLatch gun = new CountDownLatch(1);
ExecutorService threadPool = Executors.newFixedThreadPool(playerNum);
for (int i = 0; i < playerNum; i++) {
final int playNo = i;
threadPool.submit(() -> {
log.info("[{}]號運動員已就緒", playNo);
try {
gun.await();
} catch (InterruptedException e) {
log.info("[{}]號運動員線程阻塞失敗,失敗原因[{}]", playNo, e.getMessage(), e);
}
log.info("[{}]號運動員已經(jīng)到達(dá)重點", playNo);
});
}
//按下槍 所有運動員起跑
gun.countDown();
threadPool.shutdown();
while (!threadPool.isTerminated()) {
}
log.info("百米賽跑已結(jié)束");
}
對應(yīng)的我們也給出相應(yīng)的輸出結(jié)果:
4. 從源碼角度分析CountDownLatch工作流程
我們以等待所有工人完成工作的例子進(jìn)行解析,實際上在CountDownLatch是通過state和一個抽象隊列即aqs完成多線程之間的流程調(diào)度,主線程調(diào)用await方法等待其他worker線程,如果其它worker線程沒有完成工作,那么CountDownLatch就會將其存入抽象隊列中。
一旦其他線程將state設(shè)置為0時,await對應(yīng)的線程就會從抽象隊列中釋放并喚醒:
對應(yīng)我們給出countDown的實現(xiàn),可以看到該方法底層就是將aqs隊列中的state進(jìn)行扣減:
public void countDown() {
sync.releaseShared(1);
}
//releaseShared內(nèi)部核心邏輯就是將state扣減1
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
//扣減state并通過cas修改賦值
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
而countDown本質(zhì)上就是查看這個state,如果state被扣減為0,則調(diào)用aqs底層doReleaseShared方法將隊列中等待線程喚醒:
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//查看是否扣減為0
if (tryReleaseShared(arg)) {
//如果是0則將當(dāng)前等待線程喚醒
doReleaseShared();
return true;
}
return false;
}
上文講解countDown涉及一些關(guān)于AQS的實用理解和設(shè)計,關(guān)于更多AQS的知識點,感興趣的讀者可以閱讀一下筆者的這篇文章:《AQS 源碼解析:原理與實踐》
二、Semaphore
1. 詳解Semaphore
信號量多用于限流的場景,例如我們希望單位時間內(nèi)只能有一個線程工作,我們就可以使用信號量,只有拿到線程的信號量才能工作,工作完成后釋放信號量,其余線程才能爭搶這個信號量并進(jìn)行進(jìn)一步的操作。 對應(yīng)我們給出下面這段代碼,可以看到生命信號量數(shù)值為6,每當(dāng)線程拿到3個信號量之后就會執(zhí)行業(yè)務(wù)操作,完成后調(diào)用release釋放3個令牌,讓其他線程繼續(xù)爭搶:
//設(shè)置可復(fù)用的信號量,令牌數(shù)為3
Semaphore semaphore = new Semaphore(6, true);
//創(chuàng)建5個線程
int workSize = 5;
ExecutorService executorService = Executors.newFixedThreadPool(workSize);
for (int i = 0; i < workSize; i++) {
executorService.submit(() -> {
try {
//拿3個令牌
semaphore.acquire(3);
log.info("進(jìn)行業(yè)務(wù)邏輯處理.......");
ThreadUtil.sleep(1000);
//釋放3個令牌
semaphore.release(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
while (!executorService.isTerminated()) {
}
對應(yīng)輸出結(jié)果如下,可以看到每個線程拿到令牌后都會休眠1秒,從輸出結(jié)果來看每秒只有兩個線程才工作,符合我們的限流需求:
2. 詳解Semaphore工作原理
Semaphore底層也是用到的aqs隊列,線程進(jìn)行資源獲取時也是通過查看state是否足夠,在明確足夠的情況下進(jìn)行state扣減,然后進(jìn)行工作。如果線程發(fā)現(xiàn)state數(shù)量不夠,那么就會被Semaphore存入aqs底層的抽象隊列中,直到state數(shù)量足夠后被喚醒:
對此我們給出Semaphore底層的acquire的邏輯可以看到,它會讀取state數(shù)值然后進(jìn)行扣減,如果剩余數(shù)量大于0則說明令牌獲取成功線程可以執(zhí)行后續(xù)邏輯,反之說明當(dāng)前令牌數(shù)不夠,外部邏輯會將該線程掛到等待隊列中,等待令牌足夠后將其喚醒:
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
//讀取可用的state
int available = getState();
//計算剩余的state
int remaining = available - acquires;
//如果小于0說明令牌數(shù)不足直接返回出去,讓外部將線程掛起,反之通過cas修改剩余數(shù),返回大于0的結(jié)果讓持有令牌的線程執(zhí)行后續(xù)邏輯
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
3. Semaphore使用注意事項
- 獲取和釋放的時候都可以指定數(shù)量,但是要保持一致。
- 公平性設(shè)置為true會更加合理
- 并不必須由獲取許可證的線程釋放許可證??梢允茿獲取,B釋放。
三、Condition
1. 詳解Condition
Condition即條件對象,不是很常用或者直接用到的對象,常用于線程等待喚醒操作,例如A線程需要等待某個條件的時候,我們可以通過condition.await()方法,A線程就會進(jìn)入阻塞狀態(tài)。
線程B執(zhí)行condition.signal()方法,則JVM就會從被阻塞線程中找到等待該condition的線程。線程A收到可執(zhí)行信號的時候,他的線程狀態(tài)就會變成Runnable可執(zhí)行狀態(tài)。
對此我們給出代碼示例,可以看到我們從ReentrantLock 中拿到一個Condition 對象,讓創(chuàng)建的線程進(jìn)入等待狀態(tài),隨后讓主線程調(diào)用condition 的signal將其喚醒:
private ReentrantLock lock = new ReentrantLock();
//條件對象,操控線程的等待和通知
private Condition condition = lock.newCondition();
public void waitCondition() throws InterruptedException {
lock.lock();
try {
log.info("等待達(dá)到條件后通知");
condition.await();
log.info("收到通知,開始執(zhí)行業(yè)務(wù)邏輯");
} finally {
lock.unlock();
log.info("執(zhí)行完成,釋放鎖");
}
}
public void notifyCondition() throws InterruptedException {
lock.lock();
try {
log.info("達(dá)到條件發(fā)起通知");
condition.signal();
log.info("發(fā)起通知結(jié)束");
} finally {
lock.unlock();
log.info("發(fā)起通知執(zhí)行完成,釋放鎖");
}
}
public static void main(String[] args) throws InterruptedException {
Main obj = new Main();
new Thread(() -> {
try {
obj.waitCondition();
//讓出CPU時間片,交給主線程發(fā)起通知
Thread.sleep(3000);
} catch (InterruptedException e) {
log.error("等待條件通知設(shè)置失敗,失敗原因 [{}]", e.getMessage(), e);
}
}).start();
//休眠3s喚醒等待線程
Thread.sleep(3000);
obj.notifyCondition();
}
對應(yīng)的我們也給出輸出結(jié)果:
2. 基于條件對象完成生產(chǎn)者、消費者模式
我們假設(shè)用一個隊列存放一波生產(chǎn)者生產(chǎn)的資源,當(dāng)資源滿了通知消費者消費。當(dāng)消費者消費空了,通知生產(chǎn)者生產(chǎn)。
所以這時候使用condition控制流程最合適(這也是阻塞的隊列內(nèi)部的實現(xiàn)),所以我們要定義兩個信號,分別為:
- 當(dāng)資源被耗盡,我們就使用資源未滿條件(notFull): 調(diào)用signal通知生產(chǎn)者消費,消費者調(diào)用await進(jìn)入等待。
- 當(dāng)資源被填滿,使用資源為空條件(notEmpty):將生產(chǎn)者用await方法掛起,消費者用signal喚醒消費告知非空。
很明顯生產(chǎn)者和消費者本質(zhì)上就是基于這兩個標(biāo)識分別標(biāo)志自己的等待時機(jī)和通知時機(jī),以生產(chǎn)者為例,即每生產(chǎn)一個資源后就可以調(diào)用notEmpty通知消費者消費,當(dāng)生產(chǎn)者速度過快,則用await等待未滿notFull條件阻塞:
首先我們給出生產(chǎn)者和消費者條件和資源隊列聲明,基于上述條件我們給出一個經(jīng)典的生產(chǎn)者和消費者模式的示例,我們首先給出生產(chǎn)者代碼,可以看到資源滿的時候調(diào)用notFull.await();將自己掛起等待未滿,生產(chǎn)資源后調(diào)用 notEmpty.signal();通知消費者消費。
對應(yīng)消費者示例代碼也是一樣,當(dāng)資源消費完全,調(diào)用notEmpty.await();等待不空,一旦消費定量資源調(diào)用notFull.signal();通知生產(chǎn)者生產(chǎn)。
最終代碼示例如下:
@Slf4j
public class ProducerMode {
//鎖
private static ReentrantLock lock = new ReentrantLock();
// 資源未滿
private Condition notFull = lock.newCondition();
//資源為空
private Condition notEmpty = lock.newCondition();
private Queue<Integer> queue = new PriorityQueue<>(10);
private int queueMaxSize = 10;
/**
* 生產(chǎn)者
*/
private class Producer extends Thread {
@Override
public void run() {
while (true) {
lock.lock();
try {
if (queueMaxSize == queue.size()) {
log.info("當(dāng)前隊列已滿,通知消費者消費");
//等待不滿條件觸發(fā)
notFull.await();
}
queue.offer(1);
log.info("生產(chǎn)者補(bǔ)貨,當(dāng)前隊列有 【{}】", queue.size());
//通知消費者隊列不空,可以消費
notEmpty.signal();
} catch (Exception e) {
log.error("生產(chǎn)者報錯,失敗原因 [{}]", e.getMessage(), e);
} finally {
lock.unlock();
}
}
}
}
/**
* 消費者
*/
private class Consumer extends Thread {
@Override
public void run() {
while (true) {
lock.lock();
try {
if (0 == queue.size()) {
log.info("當(dāng)前隊列已空,通知生產(chǎn)者補(bǔ)貨");
//等待不空條件達(dá)到
notEmpty.await();
}
queue.poll();
//通知消費者不滿了
notFull.signal();
log.info("消費者完成消費,當(dāng)前隊列還剩余 【{}】個元素", queue.size());
} catch (Exception e) {
log.error("生產(chǎn)者報錯,失敗原因 [{}]", e.getMessage(), e);
} finally {
lock.unlock();
}
}
}
}
public static void main(String[] args) {
ProducerMode mode = new ProducerMode();
Producer producer = mode.new Producer();
ProducerMode.Consumer consumer = mode.new Consumer();
producer.start();
consumer.start();
}
}
對應(yīng)的我們給出輸出結(jié)果:
四、CyclicBarrier
1. CyclicBarrier 原理和使用示例
CyclicBarrier 也就是循環(huán)柵欄對象,不是很常用,它主要用于等待線程數(shù)就緒后執(zhí)行公共邏輯的業(yè)務(wù)場景。 例如我們希望每湊齊5個線程后執(zhí)行后續(xù)邏輯,我們就可以說明CyclicBarrier 數(shù)值為5,然后每個線程到期后調(diào)用await等待其他線程就緒。
一旦到齊5個,CyclicBarrier 就會通知這些線程開始工作,對應(yīng)的代碼如下所示:
public static void main(String[] args) throws InterruptedException {
int threadCount = 5;
CyclicBarrier barrier = new CyclicBarrier(threadCount);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
System.out.println("線程 " + Thread.currentThread().getName() + " 開始執(zhí)行任務(wù)");
try {
// 模擬執(zhí)行任務(wù)
Thread.sleep(1000);
System.out.println("線程 " + Thread.currentThread().getName() + " 到達(dá)屏障");
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("所有線程都到達(dá)屏障,一起繼續(xù)執(zhí)行");
}).start();
}
}
對應(yīng)的我們給出相應(yīng)輸出示例:
2. CyclicBarrier 與CountDownLatch區(qū)別(重點)
- CountDownLatch用戶事件即主要是業(yè)務(wù)流程上的控制并不是針對線程,CyclicBarrier 循環(huán)柵欄作用于線程,如上代碼必須等待線程到齊后觸發(fā)。
- 循環(huán)柵欄可重復(fù)使用,CountDownLatch則不能。
五、小結(jié)
通過本次對Java并發(fā)流程工具的實戰(zhàn)探索,我們對Java并發(fā)編程領(lǐng)域有了更為深入且全面的認(rèn)知。 從CountDownLatch到CyclicBarrier,再到Semaphore和Exchanger等工具,每一個都在多線程協(xié)作場景中有著獨特的用途。
- CountDownLatch如同倒計時器,能讓一組線程等待某個特定事件完成后再繼續(xù)執(zhí)行;
- CyclicBarrier則像聚會的召集者,使多個線程在特定點上匯聚,然后一起繼續(xù)前行;
- Semaphore猶如資源的守護(hù)者,精確控制著對有限資源的訪問;
- Exchanger為兩個線程之間的數(shù)據(jù)交換提供了安全高效的通道。
在實際的代碼實踐中,我們看到這些工具如何巧妙地解決多線程協(xié)作中復(fù)雜的同步和通信問題,極大地提高了程序的并發(fā)處理能力和性能。不僅如此,我們還學(xué)會了根據(jù)不同的業(yè)務(wù)場景,如任務(wù)并行化、資源管理、數(shù)據(jù)交換等,選擇最合適的并發(fā)流程工具,以實現(xiàn)最優(yōu)的解決方案。
然而,Java并發(fā)編程是一個廣闊且復(fù)雜的領(lǐng)域,這些工具在帶來便利的同時,也要求我們對線程安全、資源競爭等問題保持高度警惕。在使用過程中,必須深入理解其原理和潛在風(fēng)險,確保代碼的正確性和穩(wěn)定性。
希望本次的探索能為你在Java并發(fā)編程的道路上點亮一盞明燈,在未來面對各種并發(fā)挑戰(zhàn)時,你能夠熟練運用這些工具,編寫出高效、可靠且易于維護(hù)的多線程程序,為構(gòu)建更強(qiáng)大、更具競爭力的軟件系統(tǒng)奠定堅實的基礎(chǔ)。