精通Java并發(fā)鎖機制:24種鎖技巧+業(yè)務(wù)鎖匹配方案
在 Java 并發(fā)編程中,鎖是確保線程安全、協(xié)調(diào)多線程訪問共享資源的關(guān)鍵機制。從基本的 synchronized 同步關(guān)鍵字到高級的 ReentrantLock、讀寫鎖 ReadWriteLock、無鎖設(shè)計如 AtomicInteger,再到復(fù)雜的同步輔助工具如 CountDownLatch、 CyclicBarrier 和 Semaphore,每種鎖都針對特定的并發(fā)場景設(shè)計,以解決多線程環(huán)境下的同步問題。 StampedLock 提供了樂觀讀鎖和悲觀寫鎖的選項,而 ConcurrentHashMap 和 ConcurrentLinkedQueue 等并發(fā)集合則通過內(nèi)部機制優(yōu)化了并發(fā)訪問。了解不同鎖的特點和適用場景,對于構(gòu)建高效、穩(wěn)定的并發(fā)應(yīng)用程序至關(guān)重要。
1、鎖選擇維度
選擇適合的鎖通常依賴于特定的應(yīng)用場景和并發(fā)需求。以下是一個表格,概述了不同鎖類型的關(guān)鍵特性和選擇它們的考量維度:
鎖類型 | 適用場景 | 鎖模式 | 性能特點 | 公平性 | 鎖的粗細 | 條件支持 | 阻塞策略 | 用途舉例 |
| 簡單的同步需求,無需復(fù)雜控制 | 獨占式 | 適中,偏向鎖、輕量級鎖優(yōu)化 | 無公平策略 | 粗粒度鎖 | 不支持 | 阻塞等待 | 單例模式、簡單的計數(shù)器 |
| 需要靈活的鎖控制,如可中斷、超時、嘗試鎖定等 | 獨占式 | 高,支持多種鎖定方式 | 可配置公平性 | 細粒度鎖 | 支持 | 可中斷、超時、嘗試 | 同步代碼塊或方法、復(fù)雜同步控制 |
| 讀多寫少的場景 | 共享-獨占式 | 高,提高讀操作并發(fā)性 | 不支持公平性 | 細粒度鎖 | 不支持 | 阻塞等待 | 緩存系統(tǒng)、文件系統(tǒng) |
| 讀多寫多,需要樂觀讀和悲觀寫的場景 | 樂觀讀-悲觀寫 | 高,提供讀寫鎖的擴展 | 可配置公平性 | 細粒度鎖 | 支持 | 可中斷、超時、嘗試 | 高性能計數(shù)器、數(shù)據(jù)緩存 |
| 需要等待一組操作完成的場景 | 無 | 低,一次性 | 不支持公平性 | 粗粒度鎖 | 不支持 | 阻塞等待 | 任務(wù)協(xié)調(diào)、初始化操作 |
| 需要控制資源訪問數(shù)量的場景 | 信號量 | 高,控制并發(fā)數(shù)量 | 不支持公平性 | 細粒度鎖 | 支持 | 阻塞等待 | 限流、資源池管理 |
| 需要周期性執(zhí)行一組操作的場景 | 無 | 低,重用性 | 支持公平性 | 粗粒度鎖 | 支持 | 阻塞等待 | 并行計算、批處理 |
2、鎖詳細分析
2.7. CyclicBarrier
CyclicBarrier 是 Java 中用于線程間同步的一種工具,它允許一組線程互相等待,直到所有線程都到達一個公共屏障點。
圖片
圖解說明:
- Java 線程:表示運行中的線程,它們可能需要在某個點同步。
- CyclicBarrier 實例:是 CyclicBarrier 類的實例,用于協(xié)調(diào)一組線程在屏障點同步。
- 屏障:表示線程需要到達的同步點,所有線程必須到達這個點才能繼續(xù)執(zhí)行。
- 共享資源或任務(wù):表示線程需要訪問的共享資源或執(zhí)行的任務(wù),它們在屏障點同步后可以安全地執(zhí)行。
- 等待區(qū):表示等待其他線程到達屏障點的線程集合。
- 計數(shù)器: CyclicBarrier 內(nèi)部維護一個計數(shù)器,用于跟蹤尚未到達屏障點的線程數(shù)量。
- 屏障動作(Runnable) :可選的,當(dāng)所有線程到達屏障點時,可以執(zhí)行一個特定的動作或任務(wù)。
綜合說明:
- 作用: CyclicBarrier 是一種同步幫助工具,允許一組線程相互等待,直到所有線程都到達某個公共屏障點。
- 背景:在需要多個線程協(xié)作完成任務(wù)時, CyclicBarrier 提供了一種機制,使得所有線程可以在屏障點同步,然后繼續(xù)執(zhí)行。
- 優(yōu)點:
可重復(fù)使用:與 CountDownLatch 不同, CyclicBarrier 可以重復(fù)使用,適用于周期性的任務(wù)同步。
支持屏障動作:可以設(shè)置一個在所有線程到達屏障點后執(zhí)行的回調(diào)。
- 缺點:
可能導(dǎo)致死鎖:如果一個或多個線程未到達屏障點,其他線程將一直等待。
復(fù)雜性:需要合理設(shè)計以避免線程永久等待。
場景:適用于需要周期性同步多個線程的場景。
業(yè)務(wù)舉例:在多階段數(shù)據(jù)處理流程中,每個階段需要所有數(shù)據(jù)都準(zhǔn)備好后才能開始處理。使用 CyclicBarrier可以確保所有數(shù)據(jù)加載線程在每個階段開始前都已準(zhǔn)備好。
使用方式:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Race {
private final CyclicBarrier barrier;
public Race(int numberOfRunners) {
barrier = new CyclicBarrier(numberOfRunners, () -> {
System.out.println("比賽開始!");
// 這里可以放置所有參與者到達屏障后要執(zhí)行的操作
});
}
public void run() {
System.out.println("等待其他參賽者...");
try {
barrier.await(); // 等待其他線程
System.out.println("開始跑步!");
// 跑步時間
Thread.sleep((long) (Math.random() * 10000));
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
final int numberOfRunners = 5;
Race race = new Race(numberOfRunners);
// 創(chuàng)建參賽者線程
for (int i = 0; i < numberOfRunners; i++) {
final int runnerNumber = i + 1;
new Thread(() -> {
System.out.println("參賽者 " + runnerNumber + " 已準(zhǔn)備就緒");
race.run();
}).start();
}
}
}
業(yè)務(wù)代碼案例:
業(yè)務(wù)說明: 在大數(shù)據(jù)處理系統(tǒng)中,經(jīng)常需要對大量數(shù)據(jù)進行多階段處理,例如,數(shù)據(jù)清洗、轉(zhuǎn)換、聚合和加載。這些處理階段通常需要按順序執(zhí)行,且每個階段開始前必須確保所有數(shù)據(jù)都已準(zhǔn)備好。
為什么需要 CyclicBarrier 技術(shù): 在多階段數(shù)據(jù)處理的場景中,不同的處理任務(wù)可能由不同的線程執(zhí)行,而這些線程的執(zhí)行時間可能不同。 CyclicBarrier 允許每個階段的處理在開始前等待所有相關(guān)線程完成上一階段的任務(wù),確保數(shù)據(jù)的一致性和完整性。
沒有 CyclicBarrier 技術(shù)會帶來什么后果:
沒有使用 CyclicBarrier 或其他同步協(xié)調(diào)機制可能會導(dǎo)致以下問題:
- 數(shù)據(jù)不一致:如果后續(xù)階段的處理在前一階段的數(shù)據(jù)未完全準(zhǔn)備好時開始,可能會導(dǎo)致處理結(jié)果不準(zhǔn)確。
- 資源浪費:在等待數(shù)據(jù)準(zhǔn)備的過程中,系統(tǒng)資源可能被無效占用,導(dǎo)致資源利用效率低下。
- 錯誤和異常:由于階段間的依賴關(guān)系沒有得到妥善處理,可能會引發(fā)程序錯誤或運行時異常。
代碼實現(xiàn):
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class DataProcessingPipeline {
private final ExecutorService executor = Executors.newFixedThreadPool(4);
private final CyclicBarrier barrier;
private final int numberOfPhases;
private final int numberOfTasks;
public DataProcessingPipeline(int numberOfTasks, int numberOfPhases) {
this.numberOfTasks = numberOfTasks;
this.numberOfPhases = numberOfPhases;
this.barrier = new CyclicBarrier(numberOfTasks, () -> {
System.out.println("一個階段完成,準(zhǔn)備進入下一階段");
});
}
public void processData() throws Exception {
for (int phase = 1; phase <= numberOfPhases; phase++) {
System.out.println("階段 " + phase + " 開始");
for (int task = 0; task < numberOfTasks; task++) {
final int currentTask = task;
executor.submit(() -> {
try {
// 數(shù)據(jù)處理任務(wù)
System.out.println("任務(wù) " + currentTask + " 在階段 " + phase + " 執(zhí)行");
Thread.sleep((long) (Math.random() * 5000));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
barrier.await(); // 等待所有任務(wù)完成
}
executor.shutdown();
}
public static void main(String[] args) {
DataProcessingPipeline pipeline = new DataProcessingPipeline(4, 3);
try {
pipeline.processData();
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.8. Atomic Variables
原子變量是 Java 中 java.util.concurrent.atomic 包提供的一些類,它們利用底層硬件的原子性指令來保證操作的原子性,無需使用鎖。
圖片
圖解說明:
- Java 線程:表示運行中的線程,它們可能需要對共享資源進行原子操作。
- Atomic Variables:表示原子變量的集合,包括 AtomicInteger、 AtomicLong、 AtomicReference 等。
- AtomicInteger、AtomicLong、AtomicReference:分別表示整型、長整型和引用類型的原子變量。
- 硬件支持的原子指令:底層硬件提供的原子性指令,如 compare-and-swap (CAS)、load-linked、store-conditional 等。
- 共享資源:表示被多個線程共享的數(shù)據(jù),如計數(shù)器、累加器等。
- 內(nèi)存:表示 Java 程序使用的內(nèi)存空間,包括堆和棧等。
- 變量狀態(tài):表示原子變量在內(nèi)存中的當(dāng)前狀態(tài)。
綜合說明:
- 作用:原子變量類(如 AtomicInteger, AtomicLong, AtomicReference 等)提供了一種機制,使得對變量的某些操作(如自增、自減、讀取和寫入)是原子性的,無需使用傳統(tǒng)的鎖。
- 背景:在多線程環(huán)境中,對共享變量的并發(fā)訪問需要同步措施以防止數(shù)據(jù)競爭。原子變量利用底層硬件的原子指令來保證操作的原子性,從而簡化了線程同步。
- 優(yōu)點:
無鎖設(shè)計:避免使用傳統(tǒng)鎖,減少了線程切換的開銷。
性能優(yōu)化:對于高競爭的簡單變量訪問,原子變量通常比鎖有更好的性能。
- 缺點:
功能限制:僅適用于簡單的操作,復(fù)雜的操作無法通過原子變量實現(xiàn)。
可組合性問題:復(fù)雜的原子操作需要仔細設(shè)計,否則可能引入競態(tài)條件。
場景:適用于對簡單變量進行原子操作的場景,如計數(shù)器、累加器等。
業(yè)務(wù)舉例:在電商平臺的庫存管理中, AtomicInteger 可以用來原子地更新商品的庫存數(shù)量,確保在高并發(fā)環(huán)境下庫存數(shù)據(jù)的一致性。
使用方式:
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class Counter {
// 使用 AtomicInteger 來確保計數(shù)器的線程安全
private final AtomicInteger count = new AtomicInteger(0);
// 提供一個方法來增加計數(shù)器的值
public void increment() {
// 原子地增加計數(shù)器的值
count.incrementAndGet();
}
// 提供一個方法來獲取當(dāng)前計數(shù)器的值
public int getCount() {
// 原子地獲取計數(shù)器的值
return count.get();
}
}
public class DataStore {
// 使用 AtomicLong 來統(tǒng)計數(shù)據(jù)總量
private final AtomicLong dataCount = new AtomicLong(0);
public void addData(long size) {
// 原子地將數(shù)據(jù)大小累加到總量
dataCount.addAndGet(size);
}
public long getDataCount() {
// 原子地獲取當(dāng)前數(shù)據(jù)總量
return dataCount.get();
}
}
// 測試類
public class AtomicVariablesDemo {
public static void main(String[] args) {
Counter counter = new Counter();
DataStore dataStore = new DataStore();
// 多線程環(huán)境中對計數(shù)器和數(shù)據(jù)總量的更新
for (int i = 0; i < 10; i++) {
new Thread(() -> {
counter.increment();
dataStore.addData(100); // 假設(shè)每次操作增加100單位數(shù)據(jù)
}).start();
}
// 等待所有線程完成
while (Thread.activeCount() > 1) {
Thread.yield();
}
// 輸出計數(shù)器的值和數(shù)據(jù)總量
System.out.println("Counter value: " + counter.getCount());
System.out.println("Data store size: " + dataStore.getDataCount());
}
}
業(yè)務(wù)代碼案例:
場景描述:社交網(wǎng)絡(luò)的實時消息計數(shù)
業(yè)務(wù)說明: 社交網(wǎng)絡(luò)平臺需要顯示每個用戶的實時消息通知數(shù)。每當(dāng)用戶收到新消息時,消息計數(shù)需要增加;用戶閱讀消息時,計數(shù)可能會減少或被重置。此計數(shù)需要對所有用戶可見,且在高并發(fā)環(huán)境下保持準(zhǔn)確。
為什么需要 AtomicVariables 技術(shù): 在社交網(wǎng)絡(luò)中,多個用戶可能同時發(fā)送消息給同一個接收者,或者一個用戶可能同時在多個設(shè)備上接收消息。這導(dǎo)致對消息計數(shù)的讀取和更新操作非常頻繁。使用 AtomicInteger 可以確保消息計數(shù)更新的原子性,并且在多線程環(huán)境下保持數(shù)據(jù)的一致性。
沒有 AtomicVariables 技術(shù)會帶來什么后果:
沒有使用 AtomicVariables 或其他并發(fā)控制機制可能會導(dǎo)致以下問題:
- 數(shù)據(jù)不一致:消息計數(shù)可能會出錯,導(dǎo)致用戶看到不正確的消息數(shù)量。
- 用戶體驗下降:如果消息通知不準(zhǔn)確,用戶可能會錯過重要通知,或者對應(yīng)用的可靠性產(chǎn)生懷疑。
- 系統(tǒng)復(fù)雜度增加:在沒有有效同步機制的情況下,維護數(shù)據(jù)一致性將變得復(fù)雜且容易出錯。
代碼實現(xiàn):
import java.util.concurrent.atomic.AtomicInteger;
public class MessageNotificationCounter {
private final AtomicInteger messageCount = new AtomicInteger(0);
// 接收新消息時調(diào)用此方法
public void receiveMessage() {
// 原子地增加消息計數(shù)
messageCount.incrementAndGet();
System.out.println("New message received. Total messages: " + messageCount.get());
}
// 用戶閱讀消息時調(diào)用此方法
public void messagesRead() {
// 原子地減少消息計數(shù)
messageCount.decrementAndGet();
System.out.println("Messages read. Remaining messages: " + messageCount.get());
}
// 獲取當(dāng)前消息計數(shù)
public int getMessageCount() {
return messageCount.get();
}
}
// 測試類
public class AtomicVariablesDemo {
public static void main(String[] args) {
MessageNotificationCounter counter = new MessageNotificationCounter();
// 多個用戶同時發(fā)送消息
Thread sender1 = new Thread(() -> {
counter.receiveMessage();
});
Thread sender2 = new Thread(() -> {
counter.receiveMessage();
});
// 用戶閱讀消息
Thread reader = new Thread(() -> {
counter.messagesRead();
});
sender1.start();
sender2.start();
reader.start();
sender1.join();
sender2.join();
reader.join();
System.out.println("Final message count: " + counter.getMessageCount());
}
}
2.9. ConcurrentHashMap
ConcurrentHashMap 是 Java 中一個線程安全的哈希表,它通過分段鎖(Segmentation)和 CAS 操作來支持高并發(fā)的讀寫操作。
圖解說明:
- Java 線程:表示運行中的線程,它們可能需要對 ConcurrentHashMap 進行讀寫操作。
- ConcurrentHashMap 實例:是 ConcurrentHashMap 類的實例,用于存儲鍵值對并提供線程安全的訪問。
- Segment 數(shù)組: ConcurrentHashMap 將哈希表分為多個段(Segment),每個段維護一部分哈希桶,通過分段鎖減少鎖的競爭。
- Hash 桶:存儲哈希桶數(shù)組,每個桶可以包含一個或多個鍵值對。
- 鏈表或紅黑樹:在哈希桶中,鍵值對最初以鏈表形式存儲,當(dāng)鏈表長度超過閾值時,鏈表可能會被轉(zhuǎn)換為紅黑樹以提高搜索效率。
- 共享資源:表示存儲在 ConcurrentHashMap 中的鍵值對數(shù)據(jù)。
- 讀操作:線程可以并發(fā)地讀取 ConcurrentHashMap 中的數(shù)據(jù),在讀多寫少的場景下,讀操作不會阻塞其他讀操作。
- 寫操作:線程對 ConcurrentHashMap 的寫入操作,寫操作需要獲取相應(yīng)段的鎖。
- 鎖:每個段擁有自己的鎖,寫操作需要獲取鎖,而讀操作通常不需要。
升級設(shè)計說明:
Java 1.7 ConcurrentHashMap 鎖機制
在 Java 1.7 中, ConcurrentHashMap 使用分段鎖機制,其中每個段相當(dāng)于一個小的哈希表,擁有自己的鎖。
Java 1.8 ConcurrentHashMap 鎖機制
在 Java 1.8 中, ConcurrentHashMap 摒棄了分段鎖機制,采用了 CAS 和 synchronized 來確保線程安全。
綜合說明:
- 作用: ConcurrentHashMap 是 Java 中提供的一個線程安全的哈希表,它通過分段鎖的概念來允許并發(fā)的讀寫操作,從而提高并發(fā)訪問的性能。
- 背景:傳統(tǒng)的 HashMap 在多線程環(huán)境下需要外部同步,而 ConcurrentHashMap 通過鎖分離技術(shù)減少了鎖的競爭,提供了更好的并發(fā)性能。
- 優(yōu)點:
- 高并發(fā):通過細分鎖到段,允許多個線程同時操作不同段的數(shù)據(jù)。
- 動態(tài)擴容:內(nèi)部采用動態(tài)數(shù)組和鏈表結(jié)構(gòu),提高了空間和時間效率。
- 缺點:
復(fù)雜度高:實現(xiàn)復(fù)雜,需要維護多個鎖和復(fù)雜的數(shù)據(jù)結(jié)構(gòu)。
性能調(diào)優(yōu):在極端高并發(fā)場景下,可能需要調(diào)整默認的并發(fā)級別。
場景:適用于需要高并發(fā)訪問的緩存或數(shù)據(jù)存儲。
業(yè)務(wù)舉例:在大數(shù)據(jù)處理系統(tǒng)中, ConcurrentHashMap 可以用來存儲實時計算結(jié)果,支持大量并發(fā)的讀寫操作而不會導(dǎo)致性能瓶頸。
使用方式:
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
public class ConcurrentHashMapDemo {
// 創(chuàng)建一個 ConcurrentHashMap 實例
private final ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 將一個鍵值對插入到 Map 中
public void put(String key, Integer value) {
// put 方法是線程安全的
map.put(key, value);
}
// 從 Map 中獲取與指定鍵關(guān)聯(lián)的值
public Integer get(String key) {
// get 方法是線程安全的
return map.get(key);
}
// 計算 Map 中的元素數(shù)量
public int size() {
// size 方法是線程安全的
return map.size();
}
// 演示刪除操作
public void remove(String key) {
// remove 方法是線程安全的
map.remove(key);
}
// 演示如何批量添加數(shù)據(jù)
public void addAll(Map<String, Integer> newData) {
// putAll 方法是線程安全的
map.putAll(newData);
}
public static void main(String[] args) {
ConcurrentHashMapDemo demo = new ConcurrentHashMapDemo();
// 批量添加數(shù)據(jù)
demo.addAll(Map.of("key1", 1, "key2", 2, "key3", 3));
// 單獨添加一條數(shù)據(jù)
demo.put("key4", 4);
// 獲取并打印一條數(shù)據(jù)
System.out.println("Value for 'key1': " + demo.get("key1"));
// 獲取 Map 的大小
System.out.println("Map size: " + demo.size());
// 刪除一條數(shù)據(jù)
demo.remove("key2");
// 再次獲取 Map 的大小
System.out.println("Map size after removal: " + demo.size());
}
}
業(yè)務(wù)代碼案例:
業(yè)務(wù)說明: 在分布式緩存系統(tǒng)中,經(jīng)常需要存儲和檢索用戶會話信息、應(yīng)用配置、熱點數(shù)據(jù)等。這些數(shù)據(jù)需要被多個應(yīng)用實例共享,并且要求在高并發(fā)環(huán)境下依然保持高性能。緩存數(shù)據(jù)通常有過期時間,需要定期清理。
為什么需要 ConcurrentHashMap 技術(shù): ConcurrentHashMap 提供了一種高效的方式來處理并發(fā)的讀取和更新操作,并且它的分段鎖機制允許多個線程同時對不同段進行操作,從而提高并發(fā)處理能力。此外, ConcurrentHashMap 在 Java 8 中引入的紅黑樹結(jié)構(gòu)使得即使在高并發(fā)更新導(dǎo)致哈希沖突時,也能保持高效的性能。
沒有 ConcurrentHashMap 技術(shù)會帶來什么后果:
沒有使用 ConcurrentHashMap 可能會導(dǎo)致以下問題:
- 性能瓶頸:在高并發(fā)環(huán)境下,如果使用 HashMap 加 synchronized,可能導(dǎo)致嚴(yán)重的性能瓶頸,因為所有線程必須等待一個鎖。
- 數(shù)據(jù)不一致:在沒有適當(dāng)同步的情況下,多個線程同時更新數(shù)據(jù)可能導(dǎo)致緩存數(shù)據(jù)不一致。
- 擴展性差:隨著系統(tǒng)負載的增加,基于 HashMap 的緩存解決方案可能難以擴展,因為鎖競爭和線程安全問題。
代碼實現(xiàn):
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
public class DistributedCache<K, V> {
private final ConcurrentHashMap<K, V> cacheMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<K, Long> expirationMap = new ConcurrentHashMap<>();
public void put(K key, V value, long ttl) {
cacheMap.put(key, value);
expirationMap.put(key, System.currentTimeMillis() + ttl);
scheduleEviction(key, ttl);
}
public V get(K key) {
Long expirationTime = expirationMap.get(key);
if (expirationTime == null || expirationTime < System.currentTimeMillis()) {
cacheMap.remove(key);
expirationMap.remove(key);
return null;
}
return cacheMap.get(key);
}
private void scheduleEviction(final K key, final long ttl) {
new Thread(() -> {
try {
TimeUnit.MILLISECONDS.sleep(ttl);
cacheMap.computeIfPresent(key, (k, v) -> null);
expirationMap.remove(key);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
public static void main(String[] args) {
DistributedCache<String, String> cache = new DistributedCache<>();
cache.put("userSession", "sessionData", 5000); // 緩存設(shè)置5秒過期
// 多個線程并發(fā)訪問緩存
for (int i = 0; i < 100; i++) {
int finalI = i;
new Thread(() -> {
String result = cache.get("userSession");
System.out.println("Thread " + finalI + " retrieved: " + result);
}).start();
}
}
}
2.10.ConcurrentSkipListMap
ConcurrentSkipListMap 是 Java 中實現(xiàn)的一個高性能并發(fā)的有序映射,它使用跳表(Skip List)作為其底層數(shù)據(jù)結(jié)構(gòu)。
圖片
圖解說明:
- Java 線程:表示運行中的線程,它們可能需要對 ConcurrentSkipListMap 進行讀寫操作。
- ConcurrentSkipListMap 實例:是 ConcurrentSkipListMap 類的實例,用于存儲鍵值對并提供線程安全的訪問。
- Skip List 層級結(jié)構(gòu):跳表由多層索引構(gòu)成,每一層都是一個有序的鏈表。
- 索引層:跳表中的索引層,用于加速搜索操作。
- 數(shù)據(jù)層:跳表中的底層數(shù)據(jù)結(jié)構(gòu),存儲實際的鍵值對。
- Node 節(jié)點:跳表中的節(jié)點,包含鍵值對和指向其他節(jié)點的鏈接。
- 共享資源:表示存儲在 ConcurrentSkipListMap 中的鍵值對數(shù)據(jù)。
- 讀操作:線程可以并發(fā)地讀取 ConcurrentSkipListMap 中的數(shù)據(jù)。
- 寫操作:線程可以并發(fā)地修改 ConcurrentSkipListMap 中的數(shù)據(jù)。
- CAS 操作:在更新節(jié)點鏈接或修改數(shù)據(jù)時,使用 CAS 操作來保證線程安全。
- 自旋鎖/同步塊:在某些情況下,如果 CAS 操作失敗,可能會使用自旋鎖或同步塊來確保操作的原子性。
操作流程:
- 讀操作:
線程通過索引層快速定位到數(shù)據(jù)層的節(jié)點。
線程使用 volatile 讀取節(jié)點的值,確保內(nèi)存可見性。
- 寫操作:
線程在更新或添加節(jié)點時,首先嘗試使用 CAS 操作。
如果 CAS 操作失敗,線程可能會使用自旋鎖或同步塊來確保原子性。
綜合說明:
作用: ConcurrentSkipListMap 是一種線程安全的有序映射,它通過使用跳表(Skip List)數(shù)據(jù)結(jié)構(gòu)來支持高效的并發(fā)訪問和排序操作。 背景:在需要高效并發(fā)訪問和保持元素有序的場景中,傳統(tǒng)的 TreeMap 由于其加鎖策略在高并發(fā)環(huán)境下性能受限, ConcurrentSkipListMap 提供了一種替代方案。 優(yōu)點:
- 高性能并發(fā)訪問:通過跳表結(jié)構(gòu)和細粒度鎖定,實現(xiàn)了高效的并發(fā)讀取和更新。
- 有序性:保持元素的有序性,支持范圍查詢等操作。
- 動態(tài)調(diào)整:可以根據(jù)訪問模式動態(tài)調(diào)整結(jié)構(gòu),優(yōu)化性能。 缺點:
- 內(nèi)存占用:相比無序的 ConcurrentHashMap,由于維護了有序性,內(nèi)存占用可能更高。
- 復(fù)雜性:實現(xiàn)相對復(fù)雜,涉及多級索引和節(jié)點的管理。 場景:適用于需要有序數(shù)據(jù)且高并發(fā)訪問的場景,如實時數(shù)據(jù)索引、范圍查詢等。 業(yè)務(wù)舉例:在一個金融市場分析系統(tǒng)中,需要維護一個實時更新的價格索引, ConcurrentSkipListMap 可以用來存儲和快速檢索各種金融工具的當(dāng)前價格。
使用方式:
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
public class ConcurrentSkipListMapDemo {
// 創(chuàng)建一個 ConcurrentSkipListMap 實例
private final ConcurrentSkipListMap<Integer, String> map = new ConcurrentSkipListMap<>();
// 將一個鍵值對插入到 Map 中
public void put(Integer key, String value) {
// put 方法是線程安全的
map.put(key, value);
}
// 從 Map 中獲取與指定鍵關(guān)聯(lián)的值
public String get(Integer key) {
// get 方法是線程安全的
return map.get(key);
}
// 獲取 Map 的鍵集合
public java.util.NavigableSet<Integer> keySet() {
// keySet 方法返回 Map 的鍵集合視圖
return map.keySet();
}
// 獲取 Map 的值集合
public java.util.Collection<String> values() {
// values 方法返回 Map 的值集合視圖
return map.values();
}
// 獲取 Map 的大小
public int size() {
// size 方法是線程安全的
return map.size();
}
// 演示刪除操作
public void remove(Integer key) {
// remove 方法是線程安全的
map.remove(key);
}
public static void main(String[] args) {
ConcurrentSkipListMapDemo demo = new ConcurrentSkipListMapDemo();
// 插入一些數(shù)據(jù)
demo.put(1, "One");
demo.put(2, "Two");
demo.put(3, "Three");
// 獲取并打印一條數(shù)據(jù)
System.out.println("Value for key 2: " + demo.get(2));
// 獲取 Map 的大小
System.out.println("Map size: " + demo.size());
// 獲取并打印所有鍵
System.out.println("All keys: " + demo.keySet());
// 刪除一條數(shù)據(jù)
demo.remove(2);
// 再次獲取 Map 的大小
System.out.println("Map size after removal: " + demo.size());
// 獲取并打印所有值
System.out.println("All values: " + demo.values());
}
}
業(yè)務(wù)代碼案例:
業(yè)務(wù)說明: 實時股票交易系統(tǒng)需要維護一個動態(tài)變化的股票價格索引,該索引需要根據(jù)實時的市場數(shù)據(jù)進行更新,并且允許多個交易線程并發(fā)地讀取和更新股票價格。此外,系統(tǒng)還需要定期根據(jù)價格波動進行調(diào)整,如計算價格的平均值、執(zhí)行價格范圍查詢等。
為什么需要 ConcurrentSkipListMap 技術(shù): ConcurrentSkipListMap 是一個線程安全的有序映射,它允許高效的范圍查詢和有序訪問,這對于股票價格索引來說至關(guān)重要。由于股票價格會頻繁更新,且需要快速響應(yīng)市場變化,使用 ConcurrentSkipListMap 可以提供高效的插入、刪除和查找操作,同時保持數(shù)據(jù)的有序性。
沒有 ConcurrentSkipListMap 技術(shù)會帶來什么后果:
沒有使用 ConcurrentSkipListMap 或其他適合有序并發(fā)操作的數(shù)據(jù)結(jié)構(gòu)可能會導(dǎo)致以下問題:
- 性能瓶頸:如果使用 HashMap 或 ConcurrentHashMap,雖然可以實現(xiàn)并發(fā)更新,但無法高效執(zhí)行有序操作和范圍查詢,可能導(dǎo)致查詢性能不佳。
- 數(shù)據(jù)不一致:在高并發(fā)更新的情況下,如果沒有適當(dāng)?shù)耐綑C制,可能會導(dǎo)致價格信息的不一致。
- 復(fù)雜性增加:如果使用 synchronized 列表或數(shù)組來維護價格索引,可能需要手動管理復(fù)雜的同步和排序邏輯,增加系統(tǒng)復(fù)雜性和出錯的風(fēng)險。
代碼實現(xiàn):
import java.util.concurrent.ConcurrentSkipListMap;
public class StockPriceIndex {
private final ConcurrentSkipListMap<String, Double> priceIndex = new ConcurrentSkipListMap<>();
public void updatePrice(String stockSymbol, Double newPrice) {
// 更新股票價格
priceIndex.put(stockSymbol, newPrice);
}
public Double getPrice(String stockSymbol) {
// 獲取股票價格
return priceIndex.get(stockSymbol);
}
public void removeStock(String stockSymbol) {
// 移除股票信息
priceIndex.remove(stockSymbol);
}
public ConcurrentSkipListMap<String, Double> headMap(String toKey) {
// 獲取指定范圍內(nèi)的股票價格索引
return priceIndex.headMap(toKey);
}
public static void main(String[] args) {
StockPriceIndex index = new StockPriceIndex();
index.updatePrice("AAPL", 150.00);
index.updatePrice("GOOGL", 2750.50);
index.updatePrice("MSFT", 250.00);
System.out.println("Price of AAPL: " + index.getPrice("AAPL"));
System.out.println("Price of GOOGL: " + index.getPrice("GOOGL"));
// 獲取所有小于 "MSFT" 的股票價格索引
ConcurrentSkipListMap<String, Double> subMap = index.headMap("MSFT");
subMap.forEach((k, v) -> System.out.println("Stock: " + k + ", Price: " + v));
}
}
2.11. ConcurrentLinkedQueue
ConcurrentLinkedQueue 是 Java 中一個線程安全的無鎖隊列,它使用 CAS (Compare-And-Swap) 操作來保證線程安全。
圖片
圖解說明:
- Java 線程:表示運行中的線程,它們可能需要對 ConcurrentLinkedQueue 進行入隊或出隊操作。
- ConcurrentLinkedQueue 實例:是 ConcurrentLinkedQueue 類的實例,用于存儲隊列中的元素并提供線程安全的訪問。
- Node 節(jié)點結(jié)構(gòu): ConcurrentLinkedQueue 使用內(nèi)部的 Node 類來存儲隊列中的每個元素。每個節(jié)點包含隊列中的一個元素和指向下一個節(jié)點的鏈接。
- 虛擬頭節(jié)點:隊列使用一個虛擬頭節(jié)點來簡化出隊操作。虛擬頭節(jié)點不存儲實際的隊列元素。
- 虛擬尾節(jié)點:隊列使用一個虛擬尾節(jié)點來簡化入隊操作。虛擬尾節(jié)點指向隊列中的最后一個節(jié)點。
- 隊列元素:表示存儲在隊列中的實際數(shù)據(jù)。
- 入隊操作:線程將新元素添加到隊列尾部的過程,通過 CAS 更新虛擬尾節(jié)點的鏈接。
- 出隊操作:線程從隊列頭部移除元素的過程,通過 CAS 更新虛擬頭節(jié)點的鏈接。
- CAS 操作: ConcurrentLinkedQueue 使用 CAS 操作來更新節(jié)點之間的鏈接,從而實現(xiàn)無鎖的線程安全隊列。
- 自旋等待:在 CAS 操作失敗時,線程可能會自旋等待直到操作成功。
操作流程:
- 入隊操作:線程通過 CAS 操作將新節(jié)點插入到隊列尾部,并更新尾節(jié)點指針。
- 出隊操作:線程通過 CAS 操作移除隊列頭部的節(jié)點,并更新頭節(jié)點指針。
- CAS 操作:在入隊和出隊過程中,線程使用 CAS 來保證節(jié)點鏈接的原子性更新。
綜合說明:
作用: ConcurrentLinkedQueue 是一種基于鏈接節(jié)點的無界線程安全隊列,支持高并發(fā)的入隊和出隊操作。 背景:在多線程環(huán)境中,需要一種高效的隊列來處理任務(wù)或消息傳遞, ConcurrentLinkedQueue 提供了一種無鎖的解決方案。 優(yōu)點:
- 無鎖設(shè)計:利用 CAS 操作實現(xiàn)無鎖的線程安全隊列,提高了并發(fā)性能。
- 簡單高效:提供了簡單的入隊和出隊操作,適合作為任務(wù)隊列或消息傳遞隊列。
- 無界隊列:理論上隊列大小無界,適用于處理大量任務(wù)。 缺點:
- 可能的內(nèi)存消耗:由于是無界隊列,在極端情況下可能會消耗大量內(nèi)存。
- 性能限制:在某些高競爭場景下,CAS 操作可能導(dǎo)致性能瓶頸。 場景:適用于作為任務(wù)隊列或消息傳遞隊列,支持高并發(fā)的入隊和出隊操作。 業(yè)務(wù)舉例:在一個分布式計算系統(tǒng)中, ConcurrentLinkedQueue 可以用于收集各個計算節(jié)點的輸出結(jié)果,然后由一個或多個消費者線程進行處理。
這兩個并發(fā)集合類在 Java 中提供了強大的工具,以支持復(fù)雜的并發(fā)數(shù)據(jù)處理需求,它們各自適用于不同的應(yīng)用場景,可以根據(jù)具體需求選擇合適的并發(fā)集合。kedQueue
使用方式:
import java.util.concurrent.ConcurrentLinkedQueue;
public class ConcurrentLinkedQueueDemo {
// 創(chuàng)建一個 ConcurrentLinkedQueue 實例
private final ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
// 向隊列中添加元素
public void add(int number) {
// add 方法是線程安全的
queue.add(number);
System.out.println("Added " + number);
}
// 從隊列中獲取并移除元素
public Integer poll() {
// poll 方法是線程安全的,返回并移除隊列頭部的元素
Integer result = queue.poll();
if (result != null) {
System.out.println("Polled " + result);
} else {
System.out.println("Queue is empty");
}
return result;
}
// 查看隊列頭部的元素但不移除
public Integer peek() {
// peek 方法是線程安全的,返回隊列頭部的元素但不移除
Integer result = queue.peek();
if (result != null) {
System.out.println("Peeked " + result);
} else {
System.out.println("Queue is empty");
}
return result;
}
// 獲取隊列的大小
public int size() {
// size 方法估算隊列的大小
int size = queue.size();
System.out.println("Queue size: " + size);
return size;
}
public static void main(String[] args) {
ConcurrentLinkedQueueDemo demo = new ConcurrentLinkedQueueDemo();
// 啟動生產(chǎn)者線程
Thread producerThread = new Thread(() -> {
demo.add(1);
demo.add(2);
demo.add(3);
});
// 啟動消費者線程
Thread consumerThread = new Thread(() -> {
demo.poll();
demo.poll();
demo.poll();
demo.poll(); // 這次調(diào)用應(yīng)該會返回 null,因為隊列已空
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
// 在所有線程完成后獲取隊列大小
demo.size();
}
}
業(yè)務(wù)代碼案例:
業(yè)務(wù)說明: 大規(guī)模日志處理系統(tǒng)需要從多個源實時收集、存儲并分析日志數(shù)據(jù)。這些日志數(shù)據(jù)通常由分布在不同服務(wù)器上的應(yīng)用程序生成,并且需要被快速地處理以避免數(shù)據(jù)丟失或延遲問題。
為什么需要 ConcurrentLinkedQueue 技術(shù): 在日志處理場景中,日志數(shù)據(jù)的產(chǎn)生速度往往非常快,且來源眾多,因此需要一個高效且線程安全的隊列來緩存這些日志數(shù)據(jù)。 ConcurrentLinkedQueue 提供了高吞吐量和低延遲的并發(fā)訪問,無需使用鎖,使得它特別適合用作日志數(shù)據(jù)的緩沖區(qū)。此外,由于 ConcurrentLinkedQueue 是無界的,因此不會阻塞生產(chǎn)者線程,即使在高負載情況下也能保持高性能。
沒有 ConcurrentLinkedQueue 技術(shù)會帶來什么后果: 沒有使用 ConcurrentLinkedQueue 或其他高效的并發(fā)隊列可能會導(dǎo)致以下問題:
- 數(shù)據(jù)丟失:如果使用有界隊列且沒有適當(dāng)?shù)纳a(chǎn)者速率控制,可能會因為隊列滿導(dǎo)致日志數(shù)據(jù)丟失。
- 性能瓶頸:如果使用鎖或其他同步機制來保護共享隊列,可能會導(dǎo)致性能瓶頸,尤其是在高并發(fā)場景下。
- 系統(tǒng)不穩(wěn)定:在高負載情況下,如果隊列處理速度跟不上數(shù)據(jù)產(chǎn)生速度,可能會導(dǎo)致系統(tǒng)崩潰或重啟。
代碼實現(xiàn):
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class LogProcessor {
private final ConcurrentLinkedQueue<String> logQueue = new ConcurrentLinkedQueue<>();
private final ExecutorService processorPool = Executors.newFixedThreadPool(10);
public void log(String message) {
// 生產(chǎn)者線程調(diào)用此方法來添加日志到隊列
logQueue.add(message);
}
public void startLogProcessing() {
// 消費者線程池,用于處理隊列中的日志
processorPool.submit(() -> {
while (true) {
try {
// 消費者線程調(diào)用此方法來處理隊列中的日志
String logEntry = logQueue.poll();
if (logEntry != null) {
processLog(logEntry);
} else {
TimeUnit.MILLISECONDS.sleep(100); // 避免 CPU 過載
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
private void processLog(String logEntry) {
// 實際處理日志的邏輯
System.out.println("Processing log: " + logEntry);
}
public static void main(String[] args) {
LogProcessor logProcessor = new LogProcessor();
logProcessor.startLogProcessing();
// 多個生產(chǎn)者線程生成日志
for (int i = 0; i < 100; i++) {
int finalI = i;
new Thread(() -> {
logProcessor.log("Log entry " + finalI);
}).start();
}
}
}
2.12. BlockingQueue
BlockingQueue 是 Java 中用于線程間通信的隊列,支持阻塞操作,當(dāng)隊列為空時,獲取元素的操作會阻塞;當(dāng)隊列滿時,插入元素的操作會阻塞。
圖片
圖解說明:
- Java 線程:表示運行中的線程,它們可能需要向隊列中添加或移除元素。
- BlockingQueue 實例:是 BlockingQueue 接口的具體實現(xiàn),如 ArrayBlockingQueue、 LinkedBlockingQueue 等,用于線程間通信。
- 內(nèi)部數(shù)據(jù)結(jié)構(gòu):表示 BlockingQueue 內(nèi)部用于存儲元素的數(shù)據(jù)結(jié)構(gòu),如數(shù)組、鏈表等。
- 隊列容量:表示 BlockingQueue 的最大容量,如果隊列有界,則插入操作在隊列滿時會阻塞。
- 等待區(qū)(元素) :表示當(dāng)隊列為空時,等待獲取元素的線程集合。
- 等待區(qū)(空間) :表示當(dāng)隊列滿時,等待空間釋放的線程集合。
- 元素添加操作:表示向 BlockingQueue 中添加元素的操作,如果隊列滿,則操作會阻塞。
- 元素移除操作:表示從 BlockingQueue 中移除元素的操作,如果隊列為空,則操作會阻塞。
綜合說明:
- 作用: BlockingQueue 是一個線程安全的隊列,支持阻塞操作,當(dāng)隊列為空時,獲取元素的操作會阻塞;當(dāng)隊列滿時,插入元素的操作會阻塞。
- 背景:在生產(chǎn)者-消費者模型中,需要一種機制來協(xié)調(diào)生產(chǎn)者和消費者之間的操作, BlockingQueue 提供了這種協(xié)調(diào)。
- 優(yōu)點:
- 線程協(xié)調(diào):自然地實現(xiàn)了生產(chǎn)者-消費者之間的線程協(xié)調(diào)。
- 阻塞操作:提供了阻塞獲取和阻塞插入的方法,簡化了并發(fā)編程。
- 缺點:
可能的死鎖:不當(dāng)使用可能導(dǎo)致死鎖,例如一個線程永久阻塞等待一個不會到來的元素。
性能考慮:在高并發(fā)環(huán)境下,隊列的容量和鎖策略需要仔細調(diào)優(yōu)。
場景:適用于生產(chǎn)者-消費者場景,如任務(wù)分配、資源池管理等。
業(yè)務(wù)舉例:在消息處理系統(tǒng)中, BlockingQueue 可以用于緩存待處理的消息,生產(chǎn)者線程生成消息并放入隊列,消費者線程從隊列中取出并處理消息,確保了消息的順序性和系統(tǒng)的響應(yīng)性。
使用方式:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueDemo {
// 創(chuàng)建一個 LinkedBlockingQueue 實例,容量限制為10
private final BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(10);
// 向 BlockingQueue 中添加元素
public void produce(Integer element) throws InterruptedException {
// put 方法在隊列滿時阻塞,直到隊列中有空間
blockingQueue.put(element);
System.out.println("Produced: " + element);
}
// 從 BlockingQueue 中獲取元素
public Integer consume() throws InterruptedException {
// take 方法在隊列空時阻塞,直到隊列中有元素
Integer element = blockingQueue.take();
System.out.println("Consumed: " + element);
return element;
}
// 獲取 BlockingQueue 的大小
public int size() {
// size 方法返回隊列當(dāng)前的元素數(shù)量
return blockingQueue.size();
}
public static void main(String[] args) throws InterruptedException {
BlockingQueueDemo demo = new BlockingQueueDemo();
// 創(chuàng)建生產(chǎn)者線程
Thread producerThread = new Thread(() -> {
try {
for (int i = 0; i < 15; i++) {
demo.produce(i);
Thread.sleep(100); // 生產(chǎn)延時
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 創(chuàng)建消費者線程
Thread consumerThread = new Thread(() -> {
try {
for (int i = 0; i < 15; i++) {
int element = demo.consume();
Thread.sleep(150); // 消費延時
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
// 打印最終隊列的大小
System.out.println("Final queue size: " + demo.size());
}
}
業(yè)務(wù)代碼案例:
業(yè)務(wù)說明: 消息隊列系統(tǒng)在微服務(wù)架構(gòu)中用于異步處理任務(wù),例如發(fā)送郵件、短信通知等。這些服務(wù)通常由獨立的服務(wù)實例處理,以提高系統(tǒng)的響應(yīng)性和可擴展性。消息隊列需要能夠處理高并發(fā)的消息生產(chǎn)和消費,確保消息的可靠傳遞。
為什么需要 BlockingQueue 技術(shù): BlockingQueue 提供了一種有效的機制來處理生產(chǎn)者-消費者場景,特別是在面對高并發(fā)和需要線程安全時。它能夠使生產(chǎn)者在隊列滿時阻塞,消費者在隊列空時阻塞,從而平衡生產(chǎn)和消費的速度,確保系統(tǒng)的穩(wěn)定性和消息的不丟失。
沒有 BlockingQueue 技術(shù)會帶來什么后果:
沒有使用 BlockingQueue 或其他并發(fā)隊列可能會導(dǎo)致以下問題:
- 消息丟失:在高并發(fā)情況下,如果沒有適當(dāng)?shù)臋C制來控制消息的產(chǎn)生和消費,可能會導(dǎo)致消息丟失。
- 系統(tǒng)過載:如果沒有流控機制,生產(chǎn)者可能會過快地生成消息,導(dǎo)致系統(tǒng)資源耗盡,甚至崩潰。
- 數(shù)據(jù)不一致:在多線程環(huán)境下,如果不正確地管理消息的訪問,可能會導(dǎo)致數(shù)據(jù)處理的不一致性。
代碼實現(xiàn):
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class MessageQueueSystem {
private final BlockingQueue<Message> messageQueue = new LinkedBlockingQueue<>();
public void produceMessage(String content) throws InterruptedException {
// 將消息添加到隊列中,如果隊列滿了,生產(chǎn)者線程將被阻塞
messageQueue.put(new Message(content));
System.out.println("Message produced: " + content);
}
public Message consumeMessage() throws InterruptedException {
// 從隊列中取出消息,如果隊列空了,消費者線程將被阻塞
Message message = messageQueue.take();
System.out.println("Message consumed: " + message.getContent());
return message;
}
public static void main(String[] args) throws InterruptedException {
MessageQueueSystem messageQueueSystem = new MessageQueueSystem();
// 創(chuàng)建生產(chǎn)者線程
Thread producerThread = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
messageQueueSystem.produceMessage("Message " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 創(chuàng)建消費者線程
Thread consumerThread = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
messageQueueSystem.consumeMessage();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
}
}
class Message {
private final String content;
public Message(String content) {
this.content = content;
}
public String getContent() {
return content;
}
}
2.13. Condition
Condition 是 Java 中 java.util.concurrent.locks 包提供的一個接口,它用于實現(xiàn)等待/通知機制。 Condition 通常與 Lock 接口配合使用,允許一個或多個線程在某些條件滿足之前掛起,并在條件滿足時被喚醒。
圖片
圖解說明:
- Java 線程:表示運行中的線程,它們可能需要在某些條件滿足之前掛起。
- Lock 實例:是 Lock 接口的具體實現(xiàn),如 ReentrantLock,用于控制對共享資源的訪問。
- Condition 實例:是 Condition 接口的具體實現(xiàn),與 Lock 實例配合使用,用于線程間的等待/通知機制。
- 等待隊列(線程) :當(dāng)線程調(diào)用 Condition 的 await() 方法時,如果條件不滿足,線程會被放入等待隊列。
- 共享資源:表示被多個線程共享的數(shù)據(jù),需要通過 Lock 和 Condition 來保護以確保線程安全。
- 條件檢查:表示線程在嘗試獲取資源之前需要檢查的條件。
- 喚醒信號:當(dāng)條件滿足時,其他線程會發(fā)送喚醒信號給等待隊列中的線程。
- 鎖狀態(tài):表示鎖的當(dāng)前狀態(tài),如是否被鎖定,以及鎖定的線程等。
操作流程:
- 鎖定:線程通過 Lock 實例獲取鎖。
- 條件檢查:線程檢查條件是否滿足。
- 等待:如果條件不滿足,線程調(diào)用 Condition 的 await() 方法,釋放鎖并進入等待隊列。
- 喚醒:當(dāng)條件滿足時,其他線程調(diào)用 Condition 的 signal() 或 signalAll() 方法,發(fā)送喚醒信號給等待隊列中的線程。
- 重新競爭鎖:被喚醒的線程重新競爭鎖。
- 再次檢查條件:線程在重新獲得鎖后,再次檢查條件是否滿足,如果滿足則繼續(xù)執(zhí)行。
綜合說明:
- 作用: Condition 是與 Lock 接口配合使用的同步輔助工具,它允許一個或多個線程等待,直到被其他線程喚醒。
- 背景:在復(fù)雜的同步場景中,需要更細粒度的控制線程的等待和喚醒, Condition 提供了這種能力。
- 優(yōu)點:
- 細粒度控制:提供了比 Object.wait()/ Object.notify() 更靈活的線程間協(xié)調(diào)機制。
- 多條件支持:一個鎖可以關(guān)聯(lián)多個條件,每個條件可以獨立喚醒等待的線程。
- 缺點:
使用復(fù)雜:需要與 Lock 一起使用,增加了編程復(fù)雜度。
錯誤使用可能導(dǎo)致死鎖或線程饑餓。
場景:適用于需要線程間復(fù)雜協(xié)調(diào)的場景,如任務(wù)調(diào)度、資源分配等。
業(yè)務(wù)舉例:在酒店預(yù)訂系統(tǒng)中, Condition 可以用于實現(xiàn)房間狀態(tài)的等待和通知機制。當(dāng)房間變?yōu)榭臻e時,等待的顧客可以被通知并進行預(yù)訂。
使用方式:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedBuffer {
private final Object[] buffer;
private int putPtr, takePtr, count;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public BoundedBuffer(int size) {
buffer = new Object[size];
}
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == buffer.length) { // 等待直到緩沖區(qū)非滿
notFull.await();
}
buffer[putPtr] = x;
putPtr = (putPtr + 1) % buffer.length;
count++;
notEmpty.signal(); // 通知可能等待的消費者
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0) { // 等待直到緩沖區(qū)非空
notEmpty.await();
}
Object x = buffer[takePtr];
takePtr = (takePtr + 1) % buffer.length;
count--;
notFull.signal(); // 通知可能等待的生產(chǎn)者
return x;
} finally {
lock.unlock();
}
}
}
public class ProducerConsumerDemo {
private final BoundedBuffer buffer;
public ProducerConsumerDemo(int size) {
buffer = new BoundedBuffer(size);
}
public void produce(String item) {
buffer.put(item);
}
public String consume() {
return (String) buffer.take();
}
public static void main(String[] args) throws InterruptedException {
final int SIZE = 10;
final ProducerConsumerDemo demo = new ProducerConsumerDemo(SIZE);
// 生產(chǎn)者線程
Thread producerThread = new Thread(() -> {
for (int i = 0; i < 20; i++) {
demo.produce("Item " + i);
try {
Thread.sleep(100); // 生產(chǎn)延時
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 消費者線程
Thread consumerThread = new Thread(() -> {
for (int i = 0; i < 20; i++) {
String item = demo.consume();
System.out.println("Consumed: " + item);
try {
Thread.sleep(150); // 消費延時
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
}
}
業(yè)務(wù)代碼案例:
業(yè)務(wù)說明: 任務(wù)調(diào)度系統(tǒng)負責(zé)管理和執(zhí)行定時任務(wù)。這些任務(wù)可能包括數(shù)據(jù)備份、報告生成、系統(tǒng)維護等。系統(tǒng)需要能夠按預(yù)定時間觸發(fā)任務(wù),并確保任務(wù)在執(zhí)行時不會相互干擾。
為什么需要 Condition 技術(shù): 在任務(wù)調(diào)度系統(tǒng)中,任務(wù)的觸發(fā)通常依賴于時間,而任務(wù)的執(zhí)行可能需要等待特定條件滿足。 Condition 配合 Lock 使用,可以在沒有任務(wù)可執(zhí)行時讓調(diào)度器線程等待,直到有任務(wù)準(zhǔn)備好執(zhí)行。這種機制允許系統(tǒng)在沒有任務(wù)執(zhí)行需求時保持空閑,從而節(jié)省資源。
沒有 Condition 技術(shù)會帶來什么后果:
沒有使用 Condition 或其他等待/通知機制可能會導(dǎo)致以下問題:
- 資源浪費:如果調(diào)度器不斷輪詢檢查新任務(wù),可能會浪費大量 CPU 資源。
- 響應(yīng)性差:在新任務(wù)到來時,如果沒有有效的機制來喚醒調(diào)度器,可能會導(dǎo)致任務(wù)執(zhí)行延遲。
- 代碼復(fù)雜度:沒有 Condition,可能需要使用更復(fù)雜的多線程同步機制,增加了代碼的復(fù)雜性和出錯的風(fēng)險。
代碼實現(xiàn):
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Date;
import java.util.LinkedList;
import java.util.Queue;
public class TaskScheduler {
private final ReentrantLock lock = new ReentrantLock();
private final Condition taskAvailable = lock.newCondition();
private final Queue<Runnable> tasks = new LinkedList<>();
public void schedule(Runnable task, long delay) {
lock.lock();
try {
tasks.add(() -> {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
task.run();
});
taskAvailable.signal(); // 通知調(diào)度器有新任務(wù)
} finally {
lock.unlock();
}
}
public void startScheduling() {
new Thread(this::runScheduler).start();
}
private void runScheduler() {
lock.lock();
try {
while (true) {
while (tasks.isEmpty()) { // 如果沒有任務(wù),等待
taskAvailable.await();
}
Runnable task = tasks.poll();
task.run();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
TaskScheduler scheduler = new TaskScheduler();
scheduler.schedule(() -> System.out.println("Task 1 executed at " + new Date()), 2000);
scheduler.schedule(() -> System.out.println("Task 2 executed at " + new Date()), 4000);
scheduler.startScheduling();
}
}