盤點并發(fā)編程中幾個實用的線程同步技術
01、背景介紹
在前幾篇文章中,我們講到了線程池實現(xiàn)原理、阻塞隊列技術等核心組件,其實 JDK 給開發(fā)者還提供了比synchronized更加高級的線程同步組件,比如 CountDownLatch、CyclicBarrier、Semaphore、Exchanger 等并發(fā)工具類。
下面我們一起來了解一下這些常用的并發(fā)工具類!
02、常用并發(fā)工具類
2.1、CountDownLatch
CountDownLatch是 JDK5 之后加入的一種并發(fā)流程控制工具類,它允許一個或多個線程一直等待,直到其他線程運行完成后再執(zhí)行。
它的工作原理主要是通過一個計數(shù)器來實現(xiàn),初始化的時候需要指定線程的數(shù)量;每當一個線程完成了自己的任務,計數(shù)器的值就相應得減 1;當計數(shù)器到達 0 時,表示所有的線程都已經(jīng)執(zhí)行完畢,處于等待的線程就可以恢復繼續(xù)執(zhí)行任務。
根據(jù)CountDownLatch的工作原理,它的應用場景一般可以劃分為兩種:
場景一:某個線程需要在其他 n 個線程執(zhí)行完畢后,再繼續(xù)執(zhí)行
場景二:多個工作線程等待某個線程的命令,同時執(zhí)行同一個任務
下面我們先來看下兩個簡單的示例。
示例1:某個線程等待 n 個工作線程
比如某項任務,先采用多線程去執(zhí)行,最后需要在主線程中進行匯總處理,這個時候CountDownLatch就可以發(fā)揮作用了,具體應用如下!
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
// 采用 10 個工作線程去執(zhí)行任務
final int threadCount = 10;
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
new Thread(new Runnable() {
@Override
public void run() {
// 執(zhí)行具體任務
System.out.println("thread name:" + Thread.currentThread().getName() + ",執(zhí)行完畢!");
// 計數(shù)器減 1
countDownLatch.countDown();
}
}).start();
}
// 阻塞等待 10 個工作線程執(zhí)行完畢
countDownLatch.await();
System.out.println("所有任務線程已執(zhí)行完畢,準備進行結果匯總");
}
}
運行結果如下:
thread name:Thread-0,執(zhí)行完畢!
thread name:Thread-2,執(zhí)行完畢!
thread name:Thread-1,執(zhí)行完畢!
thread name:Thread-3,執(zhí)行完畢!
thread name:Thread-4,執(zhí)行完畢!
thread name:Thread-5,執(zhí)行完畢!
thread name:Thread-6,執(zhí)行完畢!
thread name:Thread-7,執(zhí)行完畢!
thread name:Thread-8,執(zhí)行完畢!
thread name:Thread-9,執(zhí)行完畢!
所有任務線程執(zhí)行完畢,準備進行結果匯總
示例2:n 個工作線程等待某個線程
比如田徑賽跑,10 個同學準備開跑,但是需要等工作人員發(fā)出槍聲才允許開跑,使用CountDownLatch可以實現(xiàn)這一功能,具體應用如下!
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
// 使用一個計數(shù)器
CountDownLatch countDownLatch = new CountDownLatch(1);
final int threadCount = 10;
// 采用 10 個工作線程去執(zhí)行任務
for (int i = 0; i < threadCount; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
// 阻塞等待計數(shù)器為 0
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 發(fā)起某個服務請求,省略
System.out.println("thread name:" + Thread.currentThread().getName() + ",開始執(zhí)行!");
}
}).start();
}
Thread.sleep(1000);
System.out.println("thread name:" + Thread.currentThread().getName() + " 準備開始!");
// 將計數(shù)器減 1,運行完成后為 0
countDownLatch.countDown();
}
}
運行結果如下:
thread name:main 準備開始!
thread name:Thread-0,開始執(zhí)行!
thread name:Thread-1,開始執(zhí)行!
thread name:Thread-2,開始執(zhí)行!
thread name:Thread-3,開始執(zhí)行!
thread name:Thread-5,開始執(zhí)行!
thread name:Thread-6,開始執(zhí)行!
thread name:Thread-8,開始執(zhí)行!
thread name:Thread-7,開始執(zhí)行!
thread name:Thread-4,開始執(zhí)行!
thread name:Thread-9,開始執(zhí)行!
從上面的示例可以很清晰的看到,CountDownLatch類似于一個倒計數(shù)器,當計數(shù)器為 0 的時候,調(diào)用await()方法的線程會被解除等待狀態(tài),然后繼續(xù)執(zhí)行。
CountDownLatch類的主要方法,有以下幾個:
- public CountDownLatch(int count):核心構造方法,初始化的時候需要指定線程數(shù)
- countDown():每調(diào)用一次,計數(shù)器值 -1,直到 count 被減為 0,表示所有線程全部執(zhí)行完畢
- await():等待計數(shù)器變?yōu)?0,即等待所有異步線程執(zhí)行完畢,否則一直阻塞
- await(long timeout, TimeUnit unit):支持指定時間內(nèi)的等待,避免永久阻塞,await()的一個重載方法
從以上的分析可以得出,當計數(shù)器為 1 的時候,即由一個線程來通知其他線程,效果等同于對象的wait()和notifyAll();當計時器大于 1 的時候,可以實現(xiàn)多個工作線程完成任務后通知一個或者多個等待線程繼續(xù)工作,CountDownLatch可以看成是一種進階版的等待/通知機制,在實際中應用比較多見。
2.2、CyclicBarrier
CyclicBarrier從字面上很容易理解,表示可循環(huán)使用的屏障,它真正的作用是讓一組線程到達一個屏障時被阻塞,直到滿足要求的線程數(shù)都到達屏障時,屏障才會解除,此時所有被屏障阻塞的線程就可以繼續(xù)執(zhí)行。
下面我們還是先看一個簡單的示例,以便于更好的理解這個工具類。
public class CyclicBarrierTest {
public static void main(String[] args) {
// 設定參與線程的個數(shù)為 5
int threadCount = 5;
CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount, new Runnable() {
@Override
public void run() {
System.out.println("所有的線程都已經(jīng)準備就緒...");
}
});
for (int i = 0; i < threadCount; i++) {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("thread name:" + Thread.currentThread().getName() + ",已達到屏障!");
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("thread name:" + Thread.currentThread().getName() + ",阻塞解除,繼續(xù)執(zhí)行!");
}
}).start();
}
}
}
輸出結果:
thread name:Thread-0,已達到屏障!
thread name:Thread-1,已達到屏障!
thread name:Thread-2,已達到屏障!
thread name:Thread-3,已達到屏障!
thread name:Thread-4,已達到屏障!
所有的線程都已經(jīng)準備就緒...
thread name:Thread-4,阻塞解除,繼續(xù)執(zhí)行!
thread name:Thread-0,阻塞解除,繼續(xù)執(zhí)行!
thread name:Thread-3,阻塞解除,繼續(xù)執(zhí)行!
thread name:Thread-1,阻塞解除,繼續(xù)執(zhí)行!
thread name:Thread-2,阻塞解除,繼續(xù)執(zhí)行!
從上面的示例可以很清晰的看到,CyclicBarrier中設定的線程數(shù)相當于一個屏障,當所有的線程數(shù)達到時,此時屏障就會解除,線程繼續(xù)執(zhí)行剩下的邏輯。
CyclicBarrier類的主要方法,有以下幾個:
- public CyclicBarrier(int parties):構造方法,parties參數(shù)表示參與線程的個數(shù)
- public CyclicBarrier(int parties, Runnable barrierAction):核心構造方法,barrierAction參數(shù)表示線程到達屏障時的回調(diào)方法
- public void await():核心方法,每個線程調(diào)用await()方法告訴CyclicBarrier我已經(jīng)到達了屏障,然后當前線程被阻塞,直到屏障解除,繼續(xù)執(zhí)行剩下的邏輯
從以上的示例中,可以看到CyclicBarrier與CountDownLatch有很多的相似之處,都能夠實現(xiàn)線程之間的等待,但是它們的側重點不同:
- CountDownLatch一般用于一個或多個線程,等待其他的線程執(zhí)行完任務后再執(zhí)行
- CyclicBarrier一般用于一組線程等待至某個狀態(tài),當狀態(tài)解除之后,這一組線程再繼續(xù)執(zhí)行
- CyclicBarrier中的計數(shù)器可以反復使用,而CountDownLatch用完之后只能重新初始化
2.3、Semaphore
Semaphore通常我們把它稱之為信號計數(shù)器,它可以保證同一時刻最多有 N 個線程能訪問某個資源,比如同一時刻最多允許 10 個用戶訪問某個服務,同一時刻最多創(chuàng)建 100 個數(shù)據(jù)庫連接等等。
Semaphore可以用于控制并發(fā)的線程數(shù),實際應用場景非常的廣,比如流量控制、服務限流等等。
下面我們看一個簡單的示例。
public class SemaphoreTest {
public static void main(String[] args) {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 同一時刻僅允許最多3個線程獲取許可
final Semaphore semaphore = new Semaphore(3);
// 初始化 5 個線程生成
for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
// 如果超過了許可數(shù)量,其他線程將在此等待
semaphore.acquire();
System.out.println(format.format(new Date()) + " thread name:" + Thread.currentThread().getName() + " 獲取許可,開始執(zhí)行任務");
// 假設執(zhí)行某項任務的耗時
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 使用完后釋放許可
semaphore.release();
}
}
}).start();
}
}
}
輸出結果:
2023-11-22 17:32:01 thread name:Thread-0 獲取許可,開始執(zhí)行任務
2023-11-22 17:32:01 thread name:Thread-1 獲取許可,開始執(zhí)行任務
2023-11-22 17:32:01 thread name:Thread-2 獲取許可,開始執(zhí)行任務
2023-11-22 17:32:03 thread name:Thread-4 獲取許可,開始執(zhí)行任務
2023-11-22 17:32:03 thread name:Thread-3 獲取許可,開始執(zhí)行任務
從上面的示例可以很清晰的看到,同一時刻前 3 個線程獲得了許可優(yōu)先執(zhí)行, 2 秒過后許可被釋放,剩下的 2 個線程獲取釋放的許可繼續(xù)執(zhí)行。
Semaphore類的主要方法,有以下幾個:
- public Semaphore(int permits):構造方法,permits參數(shù)表示同一時間能訪問某個資源的線程數(shù)量
- acquire():獲取一個許可,在獲取到許可之前或者被其他線程調(diào)用中斷之前,線程將一直處于阻塞狀態(tài)
- tryAcquire(long timeout, TimeUnit unit):表示在指定時間內(nèi)嘗試獲取一個許可,如果獲取成功,返回true;反之false
- release():釋放一個許可,同時喚醒一個獲取許可不成功的阻塞線程。
通過permits參數(shù)的設定,可以實現(xiàn)限制多個線程同時訪問服務的效果,當permits參數(shù)為 1 的時候,表示同一時刻只有一個線程能訪問服務,相當于一個互斥鎖,效果等同于synchronized。
使用Semaphore的時候,通常需要先調(diào)用acquire()或者tryAcquire()獲取許可,然后通過try ... finally模塊在finally中釋放許可。
例如如下方式,嘗試在 3 秒內(nèi)獲取許可,如果沒有獲取就退出,防止程序一直阻塞。
// 嘗試 3 秒內(nèi)獲取許可
if(semaphore.tryAcquire(3, TimeUnit.SECONDS)){
try {
// ...業(yè)務邏輯
} finally {
// 釋放許可
semaphore.release();
}
}
2.4、Exchanger
Exchanger從字面上很容易理解表示交換,它主要用途在兩個線程之間進行數(shù)據(jù)交換,注意也只能在兩個線程之間進行數(shù)據(jù)交換。
Exchanger提供了一個exchange()同步交換方法,當兩個線程調(diào)用exchange()方法時,無論調(diào)用時間先后,會互相等待線程到達exchange()方法同步點,此時兩個線程進行交換數(shù)據(jù),將本線程產(chǎn)出數(shù)據(jù)傳遞給對方。
簡單的示例如下。
public class ExchangerTest {
public static void main(String[] args) {
// 交換同步器
Exchanger<String> exchanger = new Exchanger<>();
// 線程1
new Thread(new Runnable() {
@Override
public void run() {
try {
String value = "A";
System.out.println("thread name:" + Thread.currentThread().getName() + " 原數(shù)據(jù):" + value);
String newValue = exchanger.exchange(value);
System.out.println("thread name:" + Thread.currentThread().getName() + " 交換后的數(shù)據(jù):" + newValue);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// 線程2
new Thread(new Runnable() {
@Override
public void run() {
try {
String value = "B";
System.out.println("thread name:" + Thread.currentThread().getName() + " 原數(shù)據(jù):" + value);
String newValue = exchanger.exchange(value);
System.out.println("thread name:" + Thread.currentThread().getName() + " 交換后的數(shù)據(jù):" + newValue);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
輸出結果:
thread name:Thread-0 原數(shù)據(jù):A
thread name:Thread-1 原數(shù)據(jù):B
thread name:Thread-0 交換后的數(shù)據(jù):B
thread name:Thread-1 交換后的數(shù)據(jù):A
從上面的示例可以很清晰的看到,當線程Thread-0和Thread-1都到達了exchange()方法的同步點時,進行了數(shù)據(jù)交換。
Exchanger類的主要方法,有以下幾個:
- exchange(V x):等待另一個線程到達此交換點,然后將給定的對象傳送給該線程,并接收該線程的對象,除非當前線程被中斷,否則一直阻塞等待
- exchange(V x, long timeout, TimeUnit unit):表示在指定的時間內(nèi)等待另一個線程到達此交換點,如果超時會自動退出并拋超時異常
如果多個線程調(diào)用exchange()方法,數(shù)據(jù)交換可能會出現(xiàn)混亂,因此實際上Exchanger應用并不多見。
03、小結
本文主要圍繞 Java 多線程中常見的并發(fā)工具類進行了簡單的用例介紹,這些工具類都可以實現(xiàn)線程同步的效果,底層原理實現(xiàn)主要是基于 AQS 隊列式同步器來實現(xiàn),關于 AQS 我們會在后期的文章中再次介紹。
本文篇幅稍有所長,內(nèi)容難免有所遺漏,歡迎大家留言指出!
04、參考
1.https://www.cnblogs.com/xrq730/p/4869671.html
2.https://zhuanlan.zhihu.com/p/97055716