萬字圖解 Java 并發(fā)框架:Fork/Join、CountDownLatch、Semaphore、CyclicBarrier
Fork/Join 框架
Chaya:碼哥,什么是 Fork/Join 框架?
Fork/Join 是 Java 7 引入的并行計(jì)算框架,核心思想是 **"分而治之"**。它通過以下特性解決復(fù)雜計(jì)算問題:
- 自動(dòng)任務(wù)拆分:將大任務(wù)遞歸拆分為子任務(wù)
- 工作竊取算法(Work-Stealing):最大化線程利用率
- 輕量級(jí)線程管理:基于
ForkJoinPool
的優(yōu)化線程池
Fork 就是把一個(gè)大任務(wù)切分為若干子任務(wù)并行的執(zhí)行,Join 就是合并這些子任務(wù)的執(zhí)行結(jié)果,最后得到這個(gè)大任務(wù)的結(jié) 果。
比如計(jì)算 1+2+…+10000,可以分割成 10 個(gè)子任務(wù),每個(gè)子任務(wù)分別對(duì) 1000 個(gè)數(shù)進(jìn)行求和, 最終匯總這 10 個(gè)子任務(wù)的結(jié)果。
Fork/Join 的運(yùn)行流程圖如下:
圖片
工作竊取算法
Chaya:“碼哥。有任務(wù)要拆分,那必然會(huì)出現(xiàn)分配不均勻的情況?要如何實(shí)現(xiàn)負(fù)載均衡呢?”
這個(gè)問題問得好,Chaya 小姐姐。
我們?cè)O(shè)計(jì)一個(gè)工作竊取算法(Work-Stealing)來解決這個(gè)問題。每個(gè)工作線程維護(hù)一個(gè)雙端隊(duì)列(Deque):
- 頭部:執(zhí)行自己拆分出的任務(wù)(LIFO)
- 尾部:竊取其他線程的任務(wù)(FIFO)
圖片
工作竊取算法的優(yōu)點(diǎn):充分利用線程進(jìn)行并行計(jì)算,減少了線程間的競(jìng)爭(zhēng)。
工作竊取算法的缺點(diǎn):在某些情況下還是存在競(jìng)爭(zhēng),比如雙端隊(duì)列里只有一個(gè)任務(wù)時(shí)。并 且該算法會(huì)消耗了更多的系統(tǒng)資源,比如創(chuàng)建多個(gè)線程和多個(gè)雙端隊(duì)列。
任務(wù)拆分流程
圖片
使用場(chǎng)景
場(chǎng)景 1:大規(guī)模數(shù)據(jù)處理(并行排序)
需求:對(duì) 10 億條數(shù)據(jù)排序,要求內(nèi)存可控且充分利用多核性能。
代碼實(shí)現(xiàn):
public class ParallelMergeSort extends RecursiveAction {
privatefinalint[] array;
privatefinalint start;
privatefinalint end;
privatestaticfinalint THRESHOLD = 1_000_000; // 拆分閾值
@Override
protected void compute() {
if (end - start <= THRESHOLD) {
Arrays.sort(array, start, end); // 小任務(wù)直接排序
return;
}
int mid = (start + end) >>> 1;
invokeAll(
new ParallelMergeSort(array, start, mid),
new ParallelMergeSort(array, mid, end)
);
merge(array, start, mid, end); // 合并結(jié)果
}
// 生產(chǎn)級(jí)優(yōu)化:復(fù)用臨時(shí)數(shù)組減少內(nèi)存分配
private void merge(int[] array, int start, int mid, int end) {
int[] temp = ThreadLocalRandom.current().ints().toArray();
// ... 合并邏輯 ...
}
}
// 使用方式
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
int[] data = loadHugeData();
pool.invoke(new ParallelMergeSort(data, 0, data.length));
性能優(yōu)化點(diǎn):
- 合理設(shè)置
THRESHOLD
(通過壓測(cè)確定最佳值) - 避免在遞歸中頻繁創(chuàng)建臨時(shí)數(shù)組
- 使用
ThreadLocalRandom
保證線程安全
場(chǎng)景 2:金融計(jì)算(蒙特卡洛模擬)
需求:快速計(jì)算期權(quán)定價(jià),要求高精度且低延遲。
代碼實(shí)現(xiàn):
public class MonteCarloTask extends RecursiveTask<Double> {
privatefinalint iterations;
privatestaticfinalint THRESHOLD = 10_000;
@Override
protected Double compute() {
if (iterations <= THRESHOLD) {
return calculateSync(); // 同步計(jì)算
}
MonteCarloTask left = new MonteCarloTask(iterations / 2);
MonteCarloTask right = new MonteCarloTask(iterations / 2);
left.fork();
double rightResult = right.compute();
double leftResult = left.join(); // 注意順序:先計(jì)算再join
return (leftResult + rightResult) / 2;
}
private double calculateSync() {
double sum = 0;
for (int i = 0; i < iterations; i++) {
sum += randomSimulation();
}
return sum / iterations;
}
}
// 生產(chǎn)級(jí)調(diào)用(指定超時(shí))
ForkJoinPool pool = new ForkJoinPool(4);
MonteCarloTask task = new MonteCarloTask(1_000_000);
pool.submit(task);
try {
double result = task.get(5, TimeUnit.SECONDS); // 嚴(yán)格超時(shí)控制
} catch (TimeoutException e) {
task.cancel(true);
// 降級(jí)策略...
}
ForkJoinPool 生產(chǎn)級(jí)配置
自定義線程工廠
public class NamedForkJoinThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory {
privatefinal String namePrefix;
privatefinal AtomicInteger counter = new AtomicInteger(1);
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
ForkJoinWorkerThread thread = new ForkJoinWorkerThread(pool) {};
thread.setName(namePrefix + "-" + counter.getAndIncrement());
thread.setPriority(Thread.NORM_PRIORITY);
thread.setDaemon(false); // 生產(chǎn)環(huán)境必須為非守護(hù)線程
return thread;
}
}
與其他并發(fā)框架對(duì)比
圖片
Fork/Join 適用場(chǎng)景:
- 遞歸可分治的問題(排序、遍歷、數(shù)學(xué)計(jì)算)
- 嚴(yán)格低延遲要求的計(jì)算任務(wù)
- 需要自動(dòng)負(fù)載均衡的大規(guī)模數(shù)據(jù)處理
CountDownLatch
CountDownLatch 是一個(gè)同步工具類,它允許一個(gè)或多個(gè)線程一直等待,直到其他線程執(zhí)行完后再執(zhí)行。
例如,應(yīng)用程序的主線程希望在負(fù)責(zé)啟動(dòng)框架服務(wù)的線程已經(jīng)啟動(dòng)所有框架服務(wù)之后執(zhí)行。
假如有這樣一個(gè)需求:處理 10 萬條數(shù)據(jù),分片并行處理,全部完成后觸發(fā)匯總操作。
public class BatchProcessor {
privatestaticfinalint BATCH_SIZE = 1000;
privatefinal ExecutorService executor = Executors.newFixedThreadPool(8);
public void process(List<Data> allData) {
int total = allData.size();
CountDownLatch latch = new CountDownLatch(total / BATCH_SIZE);
for (int i = 0; i < total; i += BATCH_SIZE) {
List<Data> batch = allData.subList(i, Math.min(i+BATCH_SIZE, total));
executor.submit(() -> {
try {
processBatch(batch);
} finally {
latch.countDown(); // 確保計(jì)數(shù)減少
}
});
}
try {
if (!latch.await(5, TimeUnit.MINUTES)) {
thrownew TimeoutException("Batch processing timeout");
}
generateSummaryReport();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
shutdownNow();
}
}
private void processBatch(List<Data> batch) { /* ... */ }
}
CountDownLatch 的構(gòu)造函數(shù)接收一個(gè) int 類型的參數(shù)作為計(jì)數(shù)器,如果你想等待 N 個(gè)點(diǎn)完 成,這里就傳入 N。
當(dāng)我們調(diào)用 CountDownLatch 的 countDown 方法時(shí),N 就會(huì)減 1,CountDownLatch 的 await 方法 會(huì)阻塞當(dāng)前線程,直到 N 變成零。
用在多個(gè)線程時(shí),只需要把這個(gè) CountDownLatch 的引用傳遞到線程里即可。
如果有某個(gè)線程處理得比較慢,我們不可能讓主線程一直等待,所以可以使 用另外一個(gè)帶指定時(shí)間的 await 方法——await(long time,TimeUnit unit)
,這個(gè)方法等待特定時(shí) 間后,就會(huì)不再阻塞當(dāng)前線程。
實(shí)現(xiàn)原理
CountDownLatch
的核心實(shí)現(xiàn)原理是基于 AQS
,AQS
全稱 AbstractQueuedSynchronizer
,是 java.util.concurrent
中提供的一種高效且可擴(kuò)展的同步機(jī)制;
它是一種提供了原子式管理同步狀態(tài)、阻塞和喚醒線程功能以及隊(duì)列模型的簡(jiǎn)單框架。
除了 CountDownLatch
工具類,JDK 當(dāng)中的 Semaphore
、ReentrantLock
等工具類都是基于 AQS
來實(shí)現(xiàn)的。下面我們用 CountDownLatch
來分析一下 AQS
的實(shí)現(xiàn)。
圖片
CountDownLatch
的源碼實(shí)現(xiàn),發(fā)現(xiàn)其實(shí)它的代碼實(shí)現(xiàn)非常簡(jiǎn)單,算上注釋也才 300+ 行代碼,如果去掉注釋的話代碼不到 100 行,大部分方法實(shí)現(xiàn)都是調(diào)用的 Sync
這個(gè)靜態(tài)內(nèi)部類的實(shí)現(xiàn),而 Sync
就是繼承自 AbstractQueuedSynchronizer
。
CountDownLatch 的 UML 類圖如下:
圖片
核心代碼如下。
private staticfinalclass Sync extends AbstractQueuedSynchronizer {
Sync(int count) { setState(count); } // 初始化計(jì)數(shù)器
// 嘗試獲取共享鎖:當(dāng) state=0 時(shí)返回 1(成功),否則返回 -1(失?。? protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 嘗試釋放共享鎖:CAS 遞減 state,直到變?yōu)?0
protected boolean tryReleaseShared(int releases) {
for (;;) { // 自旋保證原子性
int c = getState();
if (c == 0) returnfalse; // 已經(jīng)釋放完畢
int nextc = c - 1;
if (compareAndSetState(c, nextc)) // CAS 更新
return nextc == 0; // 返回是否觸發(fā)喚醒
}
}
}
Sync
重寫了 AQS
中的 tryAcquireShared
和 tryReleaseShared
兩個(gè)方法。
當(dāng)調(diào)用 CountDownLatch
的 awit()
方法時(shí),會(huì)調(diào)用內(nèi)部類 Sync
的 acquireSharedInterruptibly()
方法,在這個(gè)方法中會(huì)調(diào)用 tryAcquireShared
方法,這個(gè)方法就是 Sync
重寫的 AQS
中的方法;
調(diào)用 countDown()
方法原理基本類似。
await() 方法實(shí)現(xiàn)
在調(diào)用 await()
方法時(shí),會(huì)直接調(diào)用 AQS
類的 acquireSharedInterruptibly
方法,在 acquireSharedInterruptibly
方法內(nèi)部會(huì)繼續(xù)調(diào)用 Sync
實(shí)現(xiàn)類中的 tryAcquireShared
方法,在 tryAcquireShared
方法中判斷 state
變量值是否為 0
。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1); // 進(jìn)入 AQS 核心邏輯
}
// AQS 中的實(shí)現(xiàn)
public final void acquireSharedInterruptibly(int arg) {
if (Thread.interrupted()) throw new InterruptedException();
if (tryAcquireShared(arg) < 0) // 檢查 state 是否為 0
doAcquireSharedInterruptibly(arg); // 進(jìn)入阻塞隊(duì)列
}
doAcquireSharedInterruptibly 關(guān)鍵步驟。
圖片
關(guān)鍵點(diǎn):
- 節(jié)點(diǎn)入隊(duì):通過
addWaiter
方法將線程封裝為SHARED
模式節(jié)點(diǎn)加入隊(duì)列尾部 - 自旋檢查:循環(huán)判斷前驅(qū)節(jié)點(diǎn)是否是頭節(jié)點(diǎn)(公平性保證)
- 阻塞控制:調(diào)用
LockSupport.park()
掛起線程,響應(yīng)中斷。
countDown() 方法
當(dāng)執(zhí)行 CountDownLatch
的 countDown()
方法,將計(jì)數(shù)器減一,也就是將 state
值減一,當(dāng)減到 0 的時(shí)候,等待隊(duì)列中的線程被釋放。是調(diào)用 AQS
的 releaseShared()
方法來實(shí)現(xiàn)的。
public void countDown() {
sync.releaseShared(1); // 觸發(fā)釋放操作
}
// AQS 中的實(shí)現(xiàn)
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // CAS 遞減 state
doReleaseShared(); // 喚醒后續(xù)節(jié)點(diǎn)
return true;
}
return false;
}
CyclicBarrier
CyclicBarrier 的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。
它要做的事情是,讓一 組線程到達(dá)一個(gè)屏障(也可以叫同步點(diǎn))時(shí)被阻塞,直到最后一個(gè)線程到達(dá)屏障時(shí),屏障才會(huì) 開門,所有被屏障攔截的線程才會(huì)繼續(xù)運(yùn)行。
現(xiàn)實(shí)生活中我們經(jīng)常會(huì)遇到這樣的情景,在進(jìn)行某個(gè)活動(dòng)前需要等待人全部都齊了才開始。
例如吃飯時(shí)要等全家人都上座了才動(dòng)筷子,旅游時(shí)要等全部人都到齊了才出發(fā),比賽時(shí)要等運(yùn)動(dòng)員都上場(chǎng)后才開始。
CyclicBarrier 和 CountDownLatch 是不是很像,只是 CyclicBarrier 可以有不止一個(gè)柵欄,因?yàn)樗臇艡冢˙arrier)可以重復(fù)使用(Cyclic)。
圖片
CyclicBarrier 是 Java 并發(fā)包中的可重用同步屏障,其特性包括:
- 多階段協(xié)同:支持多次
await()
的同步點(diǎn) - 柵欄動(dòng)作(Barrier Action):當(dāng)所有線程抵達(dá)屏障時(shí)觸發(fā)
- 自動(dòng)重置:每次所有線程通過屏障后自動(dòng)復(fù)位
- 中斷處理:可響應(yīng)線程中斷并傳播異常
如何使用
構(gòu)造方法
public CyclicBarrier(int parties)
public CyclicBarrier(int parties, Runnable barrierAction)
解析:
- parties:傳入一個(gè)計(jì)數(shù)器值,用來配置可以阻塞多少個(gè)線程的。
- 第二個(gè)構(gòu)造方法有一個(gè) Runnable 參數(shù),這個(gè)對(duì)象可以在計(jì)數(shù)器值減到 0 后,發(fā)起一次調(diào)用。
例如:下面代碼就會(huì)在計(jì)數(shù)器減到 0 后,打印出"回環(huán)屏障退出"。
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("回環(huán)屏障退出"));
場(chǎng)景 :多階段分布式計(jì)算匯總
需求:實(shí)時(shí)計(jì)算商品庫存分布(本地倉 + 區(qū)域倉 + 全國倉),需三階段統(tǒng)計(jì)數(shù)據(jù)匯總。
public class InventoryComputeService {
// 構(gòu)建 3 線程等待 CyclicBarrier
privatefinalint PARTIES = 3;
privatefinal CyclicBarrier barrier = new CyclicBarrier(PARTIES, this::mergeData);
// 保存最后的結(jié)果
privatevolatile Map<String, Integer> result = new ConcurrentHashMap<>();
public void compute() {
// 異步執(zhí)行 3 個(gè)任務(wù),執(zhí)行完成調(diào)用 barrier.await();,當(dāng)所有任務(wù)完成后會(huì)執(zhí)行 mergeData
List<CompletableFuture<Void>> tasks = new ArrayList<>();
tasks.add(computeLocalStock());
tasks.add(computeRegionalStock());
tasks.add(computeNationalStock());
// 本次3 個(gè)任務(wù)任何一個(gè)計(jì)算出現(xiàn)異常的話,重置 barrier
CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0]))
.exceptionally(ex -> {
barrier.reset(); // 異常處理
returnnull;
});
}
private CompletableFuture<Void> computeLocalStock() {
// 異步線程
return CompletableFuture.runAsync(() -> {
try {
// 模擬計(jì)算耗時(shí)
result.put("local", calculate(Region.LOCAL));
// 執(zhí)行完成,調(diào)用 await
barrier.await(5, TimeUnit.SECONDS); // 超時(shí)控制
} catch (Exception e) {
handleException(e);
}
});
}
// computeRegionalStock/computeNationalStock 同理...
private void mergeData() {
lock.lock();
try {
System.out.println("各區(qū)域最終庫存合并結(jié)果: " + result);
} finally {
lock.unlock();
result.clear(); // 清空狀態(tài)為下次計(jì)算準(zhǔn)備
}
}
}
代碼核心解釋
構(gòu)造方法創(chuàng)建等待三個(gè)線程執(zhí)行完成的 CyclicBarrier,CyclicBarrier
與 CountDownLatch
最大的區(qū)別是 CountDownLatch
一次性的,CyclicBarrier
是可循環(huán)利用的。
private final CyclicBarrier barrier = new CyclicBarrier(PARTIES, this::mergeData);
當(dāng)三個(gè)線程都執(zhí)行完成,會(huì)調(diào)用 mergeData
方法統(tǒng)計(jì)結(jié)果。
圖片
實(shí)現(xiàn)原理
核心數(shù)據(jù)結(jié)構(gòu)
public class CyclicBarrier {
privatefinal ReentrantLock lock = new ReentrantLock();
privatefinal Condition trip = lock.newCondition();
privatefinalint parties; // 需要同步的線程數(shù)
privatefinal Runnable barrierCommand; // 柵欄動(dòng)作
private Generation generation = new Generation(); // 當(dāng)前代
privatestaticclass Generation {
boolean broken = false; // 柵欄是否破裂
}
// 掛起線程數(shù)計(jì)數(shù)器(每次循環(huán)遞減)
privateint count;
}
CyclicBarrier 狀態(tài)流轉(zhuǎn)
圖片
await 實(shí)現(xiàn)
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
thrownew Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 回環(huán)屏障使用完畢或重置后,都會(huì)生成一個(gè)新的generation,這個(gè)對(duì)象可以用來讓線程退出回環(huán)屏障
final Generation g = generation;
// 每個(gè)進(jìn)入的線程,都使計(jì)數(shù)器減1,當(dāng)計(jì)數(shù)器歸零后進(jìn)入下面的if判斷
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 如果實(shí)例化時(shí)傳入了Runnable對(duì)象,則在這里調(diào)用它的run()方法
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 里面做了喚醒所有等待線程的操作,線程是在下面的自旋中掛起的
nextGeneration();
return0;
} finally {
if (!ranAction)
breakBarrier();
}
}
for (;;) {
// 此處省略的線程被interrupt的try catch
// 根據(jù)是否傳入等待時(shí)間來判斷調(diào)用哪一個(gè)方法
if (!timed) {
// condition的await()方法,這里會(huì)暫時(shí)釋放鎖
trip.await();
} elseif (nanos > 0L) {
nanos = trip.awaitNanos(nanos);
}
// 計(jì)數(shù)器歸零后,線程退出自旋
if (g != generation) {
return index;
}
}
} finally {
lock.unlock();
}
}
上面代碼中的trip
就是一個(gè) Condition 對(duì)象,是CyclicBarrier
的一個(gè)成員變量??偨Y(jié)一下 doWait()方法,其實(shí)做的事情還是比較簡(jiǎn)單的。
線程進(jìn)入 doWait(), 先搶占到鎖 lock 鎖對(duì)象,并執(zhí)行計(jì)數(shù)器遞減 1 的操作。 遞減后的計(jì)數(shù)器值不為 0,則將自己掛起在 Condition 隊(duì)列中。
遞減后的計(jì)數(shù)器值為 0,則調(diào)用 signalAll()喚醒所有在條件隊(duì)列中的線程,并創(chuàng)建新的 generation 對(duì)象,讓線程可以退出回環(huán)屏障。
核心方法流程圖如下。
圖片
Semaphore
Semaphore
,它是一個(gè)信號(hào)量,主要作用是用來控制并發(fā)中同一個(gè)時(shí)刻執(zhí)行的線程數(shù)量,可以用來做限流器,或者流程控制器。
在創(chuàng)建的時(shí)候會(huì)指定好它有多少個(gè)信號(hào)量,比如 Semaphre semaphore = new Semaphore(2)
,就只有 2 個(gè)信號(hào)量。
核心功能是控制同時(shí)訪問特定資源的線程數(shù)量,具有以下特性:
- 許可管理:通過
acquire()
/release()
操作許可數(shù)量 - 公平性選擇:支持公平/非公平兩種模式
- 可中斷:支持帶超時(shí)的許可獲取
- 動(dòng)態(tài)調(diào)整:運(yùn)行時(shí)修改許可數(shù)量
這個(gè)信號(hào)量可以比作是車道,每一個(gè)時(shí)刻每條車道只能允許一輛汽車通過,你可以理解為高速收費(fèi)站上的收費(fèi)口,每個(gè)收費(fèi)口任意一時(shí)刻只能允許一輛汽車通行。
畫個(gè)圖來講解一下:
圖片
如何使用
接口限流(突發(fā)流量控制)。
public class ApiRateLimiter {
// 生產(chǎn)級(jí)配置:許可數(shù) = QPS閾值 * 響應(yīng)時(shí)間(秒)
privatestaticfinal Semaphore SEMAPHORE = new Semaphore(500);
privatestaticfinal Timer METRIC_TIMER = new Timer(true);
static {
// 監(jiān)控線程:每10秒打印許可使用率
METRIC_TIMER.schedule(new TimerTask() {
public void run() {
double usage = (SEMAPHORE.availablePermits() / 500.0) * 100;
log.info("API許可使用率: {0}%", 100 - usage);
}
}, 10_000, 10_000);
}
public Response handleRequest(Request request) {
if (!SEMAPHORE.tryAcquire(50, TimeUnit.MILLISECONDS)) { // 非阻塞獲取
thrownew BizException(429, "請(qǐng)求過于頻繁");
}
try {
return doBusinessLogic(request); // 核心業(yè)務(wù)邏輯
} finally {
SEMAPHORE.release(); // 確保釋放許可
}
}
}
生產(chǎn)級(jí)要點(diǎn):
- 使用
tryAcquire
替代acquire
避免線程阻塞 - 通過
finally
保證許可釋放 - 集成監(jiān)控上報(bào)(Prometheus + Grafana).
實(shí)現(xiàn)原理
Semaphore
有兩種模式,公平模式和非公平模式,分別對(duì)應(yīng)兩個(gè)內(nèi)部類為 FairSync
、NonfairSync
,這兩個(gè)子類繼承了 Sync
,都是基于之前講解過的 AQS
來實(shí)現(xiàn)的。
核心數(shù)據(jù)結(jié)構(gòu)
畫個(gè)圖來說明一下內(nèi)部的結(jié)構(gòu)如下:
圖片
Semaphore
的公平模式依賴于 FairSync
公平同步器來實(shí)現(xiàn),非公平模式依賴于 NonfairSync
非公平同步器來實(shí)現(xiàn)。
其中 FairSync
、NonfairSync
繼承自 Sync
,而 Sync
又繼承自 AQS
,這些同步器的底層都是依賴于 AQS
提供的機(jī)制來實(shí)現(xiàn)的。
public class Semaphore implements java.io.Serializable {
privatefinal Sync sync;
abstractstaticclass Sync extends AbstractQueuedSynchronizer {
Sync(int permits) { setState(permits); }
final int getPermits() { return getState(); }
// 非公平嘗試獲取許可
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
// 釋放許可
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) thrownew Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
returntrue;
}
}
}
// 公平模式實(shí)現(xiàn)
staticfinalclass FairSync extends Sync {
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors()) // 檢查是否有等待線程
return -1;
// ...與非公平模式相同
}
}
}
}
所以掌握 AQS 很重要啊家人們,AQS 是模板方法模式的經(jīng)典運(yùn)用。
這里的 Semaphore
實(shí)現(xiàn)的思路跟我們之前講過的 ReentrantLock
非常的相似,包括內(nèi)部類的結(jié)構(gòu)都是一樣的,也是有公平和非公平兩種模式。
只是不同的是 Semaphore
是共享鎖,支持多個(gè)線程同時(shí)操作;然而 ReentrantLock
是互斥鎖,同一個(gè)時(shí)刻只允許一個(gè)線程操作。
公平模式 acquire
公平模式,Semaphore.acquire
方法源碼直接是調(diào)用 FairSync
的 acquireSharedInterruptibly
,也就是進(jìn)入了 AQS 的 acquireSharedInterruptibly
的模板方法里面了。
java.util.concurrent.Semaphore#acquire()
源碼如下。
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
跳入 AQS
的 acquireSharedInterruptibly
方法。
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted() ||
// Semaphore.FairSync 子類實(shí)現(xiàn) tryAcquireShared
(tryAcquireShared(arg) < 0 &&
acquire(null, arg, true, true, false, 0L) < 0))
throw new InterruptedException();
}
這個(gè)方法定義了一個(gè)模板流程:
- 先調(diào)用子類的
tryAcquireShared
方法獲取共享鎖,也就是獲取信號(hào)量。 - 如果獲取信號(hào)量成功,即返回值大于等于 0,則直接返回。
- 如果獲取失敗,返回值小于 0,則調(diào)用 AQS 的
doAcquireSharedInterruptibly
方法,進(jìn)入 AQS 的等待隊(duì)列里面,等待別人釋放資源之后它再去獲取。
這里我們畫個(gè)圖理解一下:
圖片
Semaphore.FairSync 子類實(shí)現(xiàn) tryAcquireShared
protected int tryAcquireShared(int acquires) {
for (;;) {
// 這里作為公平模式,首先判斷一下AQS等待隊(duì)列里面
// 有沒有人在等待獲取信號(hào)量,如果有人排隊(duì)了,自己就不去獲取了
if (hasQueuedPredecessors())
return -1;
// 獲取剩余的信號(hào)量資源
int available = getState();
// 剩余資源減去我需要的資源,是否小于0
// 如果小于0則說明資源不夠了
// 如果大于等于0,說明資源是足夠我使用的
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
上面的源碼就是獲取信號(hào)量的核心流程了:
- 首先判斷一下 AQS 等待隊(duì)列里面是否有人在排隊(duì),如果是,則自己不嘗試獲取資源了,乖乖的去排隊(duì)
- 如果沒有人在排隊(duì),獲取一下當(dāng)前剩余的信號(hào)量
available
,然后減去自己需要的信號(hào)量acquires
,得到減去后的結(jié)果remaining
- 如果 remaining 小于 0,直接返回
remaining
,說明資源不夠,獲取失敗了,這個(gè)時(shí)候就會(huì)進(jìn)入 AQS 等待隊(duì)列等待。 - 如果
remaining
大于等于 0,則執(zhí)行 CAS 操作compareAndSetState
競(jìng)爭(zhēng)資源,如果成功了,說明自己獲取信號(hào)量成功,如果失敗了同樣進(jìn)入 AQS 等待隊(duì)列。
我們畫一下公平模式 FairSync
的 tryAcquireShared
流程圖,以及整個(gè)公平模式的 acquire 方法的流程圖:
圖片
公平模式 release
看完獲取,我們緊接著來看下釋放,這里 Semaphore
的 release
方法直接調(diào)用 Sync
的 releaseShared
方法:
public void release() {
sync.releaseShared(1);
}
繼續(xù)來分析 releaseShared
方法,進(jìn)入到 AQS 的 releaseShard
釋放資源的模板方法:
public final boolean releaseShared(int arg) {
// 1. 調(diào)用子類的tryReleaseShared釋放資源
if (tryReleaseShared(arg)) {
// 釋放資源成功,調(diào)用doReleaseShared喚醒等待隊(duì)列中等待資源的線程
doReleaseShared();
return true;
}
return false;
}
這里的模板流程有:
- 調(diào)用子類的
tryReleaseShared
去釋放資源,即釋放信號(hào)量 - 如果釋放成功了,則調(diào)用
doReleaseShared
喚醒 AQS 中等待資源的線程,將資源傳播下去,如果釋放失敗,即返回小于等于 0,則直接返回。 - 所以,這里除了 AQS 的核心模板流程之外,具體釋放邏輯就是
Sync
的tryReleaseShared
方法的源碼了,我們繼續(xù)來查看:
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
// 這里就是將釋放的信號(hào)量資源加回去而已
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 嘗試CAS設(shè)置資源,成功直接返回,失敗則進(jìn)入下一循環(huán)重試
if (compareAndSetState(current, next))
return true;
}
}
釋放資源的流程圖如下:
圖片
Exchanger
Exchanger
(交換者)是一個(gè)用于線程間協(xié)作的工具類。Exchanger 用于進(jìn)行線程間的數(shù)據(jù)交 換。
它提供一個(gè)同步點(diǎn),在這個(gè)同步點(diǎn),兩個(gè)線程可以交換彼此的數(shù)據(jù)。這兩個(gè)線程通過 exchange 方法交換數(shù)據(jù),如果第一個(gè)線程先執(zhí)行 exchange()方法,它會(huì)一直等待第二個(gè)線程也 執(zhí)行 exchange 方法,當(dāng)兩個(gè)線程都到達(dá)同步點(diǎn)時(shí),這兩個(gè)線程就可以交換數(shù)據(jù),將本線程生產(chǎn) 出來的數(shù)據(jù)傳遞給對(duì)方。
圖片
使用場(chǎng)景
這個(gè)玩意的使用場(chǎng)景很少很少……大家對(duì)她有個(gè)了解即可,大可不必深入。
因?yàn)榇嬖诤芏嗑窒扌浴?/span>
- 僅限兩個(gè)線程
- 超過兩個(gè)線程使用同一
Exchanger
會(huì)導(dǎo)致未定義行為。 - 替代方案:使用
CyclicBarrier
或Phaser
實(shí)現(xiàn)多線程同步。
- 阻塞風(fēng)險(xiǎn)
- 若一方線程未到達(dá)同步點(diǎn),另一線程會(huì)永久阻塞。
- 解決方案:使用帶超時(shí)的
exchange(V x, long timeout, TimeUnit unit)
。
- 性能瓶頸
- 頻繁交換大數(shù)據(jù)對(duì)象會(huì)導(dǎo)致內(nèi)存和 CPU 開銷。
- 優(yōu)化建議:交換輕量級(jí)對(duì)象(如引用或標(biāo)識(shí)符),而非完整數(shù)據(jù)。
- 不適用于分布式系統(tǒng)
Exchanger
僅限單 JVM 內(nèi)的線程通信。- 替代方案:消息隊(duì)列(如 Kafka)或 RPC 框架(如 gRPC)。
Exchanger
在多種并發(fā)編程場(chǎng)景中都非常有用。例如,在遺傳算法中,可以使用Exchanger
來實(shí)現(xiàn)個(gè)體之間的信息交換;在管道設(shè)計(jì)中,可以使用Exchanger
來傳遞數(shù)據(jù)塊或任務(wù);在游戲中,可以使用Exchanger
來實(shí)現(xiàn)玩家之間的物品交易等。
如下代碼,用 Exchanger 實(shí)現(xiàn)兩個(gè)線程將交換彼此持有的字符串?dāng)?shù)據(jù):
import java.util.concurrent.Exchanger;
publicclass ExchangerExample {
public static void main(String[] args) {
// 創(chuàng)建一個(gè)Exchanger對(duì)象
Exchanger<String> exchanger = new Exchanger<>();
// 創(chuàng)建一個(gè)線程,它將使用"Hello"與另一個(gè)線程交換數(shù)據(jù)
Thread producer = new Thread(() -> {
try {
String producedData = "Hello";
String consumerData = exchanger.exchange(producedData);
System.out.println("生產(chǎn)者線程交換后得到的數(shù)據(jù): " + consumerData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "生產(chǎn)者線程");
// 創(chuàng)建一個(gè)線程,它將使用"World"與另一個(gè)線程交換數(shù)據(jù)
Thread consumer = new Thread(() -> {
try {
String consumerData = "World";
String producedData = exchanger.exchange(consumerData);
System.out.println("消費(fèi)者線程交換后得到的數(shù)據(jù): " + producedData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "消費(fèi)者線程");
// 啟動(dòng)線程
producer.start();
consumer.start();
}
}
代碼中我們創(chuàng)建了一個(gè) Exchanger
對(duì)象,并且定義了兩個(gè)線程:一個(gè)生產(chǎn)者線程和一個(gè)消費(fèi)者線程。
生產(chǎn)者線程持有一個(gè)字符串 "Hello"
,而消費(fèi)者線程持有一個(gè)字符串 "World"
。兩個(gè)線程都通過調(diào)用 exchanger.exchange()
方法來等待交換數(shù)據(jù)。
當(dāng)兩個(gè)線程都到達(dá)交換點(diǎn)時(shí)(即都調(diào)用了 exchange()
方法),Exchanger
會(huì)確保它們安全地交換數(shù)據(jù)。交換完成后,每個(gè)線程都會(huì)得到對(duì)方原本持有的數(shù)據(jù),并打印出來。
實(shí)現(xiàn)原理
分別從數(shù)據(jù)結(jié)構(gòu)和 exchange 方法來實(shí)現(xiàn)流程來學(xué)習(xí)實(shí)現(xiàn)原理。
核心數(shù)據(jù)結(jié)構(gòu)
Participant 線程本地存儲(chǔ)
public class Exchanger<V> {
// 每個(gè)線程持有一個(gè)Node
private final Participant participant;
static final class Participant extends ThreadLocal<Node> {
public Node initialValue() { return new Node(); }
}
}
圖片
關(guān)鍵作用:
- 每個(gè)線程通過
Participant
持有獨(dú)立的Node
對(duì)象 - 避免多線程競(jìng)爭(zhēng)同一存儲(chǔ)位置
- 底層使用
ThreadLocal
實(shí)現(xiàn)線程隔離
Node 交換節(jié)點(diǎn)設(shè)計(jì)
@sun.misc.Contended // 防止偽共享
static final class Node {
int index; // Arena下標(biāo)
int bound; // 最近記錄的前導(dǎo)邊界
int collides; // CAS失敗計(jì)數(shù)
int hash; // 偽隨機(jī)自旋
Object item; // 攜帶的數(shù)據(jù)
volatile Object match; // 交換的數(shù)據(jù)
volatile Thread parked; // 掛起的線程
}
內(nèi)存布局優(yōu)化:
- 使用
@Contended
注解填充緩存行(64 字節(jié)) - 確保不同線程訪問的字段不在同一緩存行
- 示例內(nèi)存布局:
| 64字節(jié)緩存行 | Node.item | ...填充... |
| 64字節(jié)緩存行 | Node.match | ...填充... |
每個(gè)線程的 Node 有一個(gè) match 屬性用于存儲(chǔ)待交換的數(shù)據(jù)。
exchange 方法執(zhí)行流程
主流程源碼(精簡(jiǎn)版)
public V exchange(V x) throws InterruptedException {
Object v;
Node[] a;
Node q = participant.get();
// Arena模式(
if ((a = arena) != null ||
(q = slotExchange(q, x, false, 0L)) == null)
return (V)v;
// ...省略超時(shí)處理
}
private final Object slotExchange(Node q, Object x, boolean timed, long nanos) {
// 核心交換邏輯(
for (;;) {
if (slot != null) { // 存在等待節(jié)點(diǎn)
Node node = (Node)slot;
if (U.compareAndSwapObject(this, SLOT, node, null)) {
Object v = node.item;
node.match = x; // 數(shù)據(jù)交換(
Thread t = node.parked;
if (t != null)
U.unpark(t); // 喚醒對(duì)方線程
return v;
}
} elseif (U.compareAndSwapObject(this, SLOT, null, q)) {
// 掛起當(dāng)前線程
return timed ? awaitNanos(q, nanos) : await(q);
}
}
}
關(guān)鍵步驟解釋:
- CAS 設(shè)置槽位:
U.compareAndSwapObject(this, SLOT, null, q)
- 數(shù)據(jù)交換:直接修改對(duì)方節(jié)點(diǎn)的
match
字段 - 喚醒機(jī)制:通過
Unsafe.unpark()
解除線程阻塞