自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

精通Java并發(fā)鎖機制:24種鎖技巧+業(yè)務(wù)鎖匹配方案

開發(fā) 前端
為什么需要?CyclicBarrier?技術(shù): 在多階段數(shù)據(jù)處理的場景中,不同的處理任務(wù)可能由不同的線程執(zhí)行,而這些線程的執(zhí)行時間可能不同。?CyclicBarrier?允許每個階段的處理在開始前等待所有相關(guān)線程完成上一階段的任務(wù),確保數(shù)據(jù)的一致性和完整性。

在 Java 并發(fā)編程中,鎖是確保線程安全、協(xié)調(diào)多線程訪問共享資源的關(guān)鍵機制。從基本的 synchronized 同步關(guān)鍵字到高級的 ReentrantLock、讀寫鎖 ReadWriteLock、無鎖設(shè)計如 AtomicInteger,再到復(fù)雜的同步輔助工具如 CountDownLatch、 CyclicBarrier 和 Semaphore,每種鎖都針對特定的并發(fā)場景設(shè)計,以解決多線程環(huán)境下的同步問題。 StampedLock 提供了樂觀讀鎖和悲觀寫鎖的選項,而 ConcurrentHashMap 和 ConcurrentLinkedQueue 等并發(fā)集合則通過內(nèi)部機制優(yōu)化了并發(fā)訪問。了解不同鎖的特點和適用場景,對于構(gòu)建高效、穩(wěn)定的并發(fā)應(yīng)用程序至關(guān)重要。

1、鎖選擇維度

選擇適合的鎖通常依賴于特定的應(yīng)用場景和并發(fā)需求。以下是一個表格,概述了不同鎖類型的關(guān)鍵特性和選擇它們的考量維度:

鎖類型

適用場景

鎖模式

性能特點

公平性

鎖的粗細

條件支持

阻塞策略

用途舉例

synchronized

簡單的同步需求,無需復(fù)雜控制

獨占式

適中,偏向鎖、輕量級鎖優(yōu)化

無公平策略

粗粒度鎖

不支持

阻塞等待

單例模式、簡單的計數(shù)器

ReentrantLock

需要靈活的鎖控制,如可中斷、超時、嘗試鎖定等

獨占式

高,支持多種鎖定方式

可配置公平性

細粒度鎖

支持

可中斷、超時、嘗試

同步代碼塊或方法、復(fù)雜同步控制

ReadWriteLock

讀多寫少的場景

共享-獨占式

高,提高讀操作并發(fā)性

不支持公平性

細粒度鎖

不支持

阻塞等待

緩存系統(tǒng)、文件系統(tǒng)

StampedLock

讀多寫多,需要樂觀讀和悲觀寫的場景

樂觀讀-悲觀寫

高,提供讀寫鎖的擴展

可配置公平性

細粒度鎖

支持

可中斷、超時、嘗試

高性能計數(shù)器、數(shù)據(jù)緩存

CountDownLatch

需要等待一組操作完成的場景


低,一次性

不支持公平性

粗粒度鎖

不支持

阻塞等待

任務(wù)協(xié)調(diào)、初始化操作

Semaphore

需要控制資源訪問數(shù)量的場景

信號量

高,控制并發(fā)數(shù)量

不支持公平性

細粒度鎖

支持

阻塞等待

限流、資源池管理

CyclicBarrier

需要周期性執(zhí)行一組操作的場景


低,重用性

支持公平性

粗粒度鎖

支持

阻塞等待

并行計算、批處理

2、鎖詳細分析

2.7. CyclicBarrier

CyclicBarrier 是 Java 中用于線程間同步的一種工具,它允許一組線程互相等待,直到所有線程都到達一個公共屏障點。 

圖片圖片

圖解說明:
  • Java 線程:表示運行中的線程,它們可能需要在某個點同步。
  • CyclicBarrier 實例:是 CyclicBarrier 類的實例,用于協(xié)調(diào)一組線程在屏障點同步。
  • 屏障:表示線程需要到達的同步點,所有線程必須到達這個點才能繼續(xù)執(zhí)行。
  • 共享資源或任務(wù):表示線程需要訪問的共享資源或執(zhí)行的任務(wù),它們在屏障點同步后可以安全地執(zhí)行。
  • 等待區(qū):表示等待其他線程到達屏障點的線程集合。
  • 計數(shù)器: CyclicBarrier 內(nèi)部維護一個計數(shù)器,用于跟蹤尚未到達屏障點的線程數(shù)量。
  • 屏障動作(Runnable) :可選的,當(dāng)所有線程到達屏障點時,可以執(zhí)行一個特定的動作或任務(wù)。
綜合說明:
  • 作用: CyclicBarrier 是一種同步幫助工具,允許一組線程相互等待,直到所有線程都到達某個公共屏障點。
  • 背景:在需要多個線程協(xié)作完成任務(wù)時, CyclicBarrier 提供了一種機制,使得所有線程可以在屏障點同步,然后繼續(xù)執(zhí)行。
  • 優(yōu)點:

可重復(fù)使用:與 CountDownLatch 不同, CyclicBarrier 可以重復(fù)使用,適用于周期性的任務(wù)同步。

支持屏障動作:可以設(shè)置一個在所有線程到達屏障點后執(zhí)行的回調(diào)。

  • 缺點:
  • 可能導(dǎo)致死鎖:如果一個或多個線程未到達屏障點,其他線程將一直等待。

  • 復(fù)雜性:需要合理設(shè)計以避免線程永久等待。

  • 場景:適用于需要周期性同步多個線程的場景。

  • 業(yè)務(wù)舉例:在多階段數(shù)據(jù)處理流程中,每個階段需要所有數(shù)據(jù)都準(zhǔn)備好后才能開始處理。使用 CyclicBarrier可以確保所有數(shù)據(jù)加載線程在每個階段開始前都已準(zhǔn)備好。

使用方式:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class Race {
    private final CyclicBarrier barrier;

    public Race(int numberOfRunners) {
        barrier = new CyclicBarrier(numberOfRunners, () -> {
            System.out.println("比賽開始!");
            // 這里可以放置所有參與者到達屏障后要執(zhí)行的操作
        });
    }

    public void run() {
        System.out.println("等待其他參賽者...");
        try {
            barrier.await(); // 等待其他線程
            System.out.println("開始跑步!");
            // 跑步時間
            Thread.sleep((long) (Math.random() * 10000));
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        final int numberOfRunners = 5;
        Race race = new Race(numberOfRunners);

        // 創(chuàng)建參賽者線程
        for (int i = 0; i < numberOfRunners; i++) {
            final int runnerNumber = i + 1;
            new Thread(() -> {
                System.out.println("參賽者 " + runnerNumber + " 已準(zhǔn)備就緒");
                race.run();
            }).start();
        }
    }
}
業(yè)務(wù)代碼案例:

業(yè)務(wù)說明: 在大數(shù)據(jù)處理系統(tǒng)中,經(jīng)常需要對大量數(shù)據(jù)進行多階段處理,例如,數(shù)據(jù)清洗、轉(zhuǎn)換、聚合和加載。這些處理階段通常需要按順序執(zhí)行,且每個階段開始前必須確保所有數(shù)據(jù)都已準(zhǔn)備好。

為什么需要 CyclicBarrier 技術(shù): 在多階段數(shù)據(jù)處理的場景中,不同的處理任務(wù)可能由不同的線程執(zhí)行,而這些線程的執(zhí)行時間可能不同。 CyclicBarrier 允許每個階段的處理在開始前等待所有相關(guān)線程完成上一階段的任務(wù),確保數(shù)據(jù)的一致性和完整性。

沒有 CyclicBarrier 技術(shù)會帶來什么后果:

沒有使用 CyclicBarrier 或其他同步協(xié)調(diào)機制可能會導(dǎo)致以下問題:

  1. 數(shù)據(jù)不一致:如果后續(xù)階段的處理在前一階段的數(shù)據(jù)未完全準(zhǔn)備好時開始,可能會導(dǎo)致處理結(jié)果不準(zhǔn)確。
  2. 資源浪費:在等待數(shù)據(jù)準(zhǔn)備的過程中,系統(tǒng)資源可能被無效占用,導(dǎo)致資源利用效率低下。
  3. 錯誤和異常:由于階段間的依賴關(guān)系沒有得到妥善處理,可能會引發(fā)程序錯誤或運行時異常。

代碼實現(xiàn):

import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class DataProcessingPipeline {
    private final ExecutorService executor = Executors.newFixedThreadPool(4);
    private final CyclicBarrier barrier;
    private final int numberOfPhases;
    private final int numberOfTasks;

    public DataProcessingPipeline(int numberOfTasks, int numberOfPhases) {
        this.numberOfTasks = numberOfTasks;
        this.numberOfPhases = numberOfPhases;
        this.barrier = new CyclicBarrier(numberOfTasks, () -> {
            System.out.println("一個階段完成,準(zhǔn)備進入下一階段");
        });
    }

    public void processData() throws Exception {
        for (int phase = 1; phase <= numberOfPhases; phase++) {
            System.out.println("階段 " + phase + " 開始");
            for (int task = 0; task < numberOfTasks; task++) {
                final int currentTask = task;
                executor.submit(() -> {
                    try {
                        // 數(shù)據(jù)處理任務(wù)
                        System.out.println("任務(wù) " + currentTask + " 在階段 " + phase + " 執(zhí)行");
                        Thread.sleep((long) (Math.random() * 5000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        try {
                            barrier.await();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
            barrier.await(); // 等待所有任務(wù)完成
        }
        executor.shutdown();
    }

    public static void main(String[] args) {
        DataProcessingPipeline pipeline = new DataProcessingPipeline(4, 3);
        try {
            pipeline.processData();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2.8. Atomic Variables

原子變量是 Java 中 java.util.concurrent.atomic 包提供的一些類,它們利用底層硬件的原子性指令來保證操作的原子性,無需使用鎖。 

圖片圖片

圖解說明:
  • Java 線程:表示運行中的線程,它們可能需要對共享資源進行原子操作。
  • Atomic Variables:表示原子變量的集合,包括 AtomicInteger、 AtomicLong、 AtomicReference 等。
  • AtomicInteger、AtomicLong、AtomicReference:分別表示整型、長整型和引用類型的原子變量。
  • 硬件支持的原子指令:底層硬件提供的原子性指令,如 compare-and-swap (CAS)、load-linked、store-conditional 等。
  • 共享資源:表示被多個線程共享的數(shù)據(jù),如計數(shù)器、累加器等。
  • 內(nèi)存:表示 Java 程序使用的內(nèi)存空間,包括堆和棧等。
  • 變量狀態(tài):表示原子變量在內(nèi)存中的當(dāng)前狀態(tài)。
綜合說明:
  • 作用:原子變量類(如 AtomicInteger, AtomicLong, AtomicReference 等)提供了一種機制,使得對變量的某些操作(如自增、自減、讀取和寫入)是原子性的,無需使用傳統(tǒng)的鎖。
  • 背景:在多線程環(huán)境中,對共享變量的并發(fā)訪問需要同步措施以防止數(shù)據(jù)競爭。原子變量利用底層硬件的原子指令來保證操作的原子性,從而簡化了線程同步。
  • 優(yōu)點:

無鎖設(shè)計:避免使用傳統(tǒng)鎖,減少了線程切換的開銷。

性能優(yōu)化:對于高競爭的簡單變量訪問,原子變量通常比鎖有更好的性能。

  • 缺點:
  • 功能限制:僅適用于簡單的操作,復(fù)雜的操作無法通過原子變量實現(xiàn)。

  • 可組合性問題:復(fù)雜的原子操作需要仔細設(shè)計,否則可能引入競態(tài)條件。

  • 場景:適用于對簡單變量進行原子操作的場景,如計數(shù)器、累加器等。

  • 業(yè)務(wù)舉例:在電商平臺的庫存管理中, AtomicInteger 可以用來原子地更新商品的庫存數(shù)量,確保在高并發(fā)環(huán)境下庫存數(shù)據(jù)的一致性。

使用方式:
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class Counter {
    // 使用 AtomicInteger 來確保計數(shù)器的線程安全
    private final AtomicInteger count = new AtomicInteger(0);

    // 提供一個方法來增加計數(shù)器的值
    public void increment() {
        // 原子地增加計數(shù)器的值
        count.incrementAndGet();
    }

    // 提供一個方法來獲取當(dāng)前計數(shù)器的值
    public int getCount() {
        // 原子地獲取計數(shù)器的值
        return count.get();
    }
}

public class DataStore {
    // 使用 AtomicLong 來統(tǒng)計數(shù)據(jù)總量
    private final AtomicLong dataCount = new AtomicLong(0);

    public void addData(long size) {
        // 原子地將數(shù)據(jù)大小累加到總量
        dataCount.addAndGet(size);
    }

    public long getDataCount() {
        // 原子地獲取當(dāng)前數(shù)據(jù)總量
        return dataCount.get();
    }
}

// 測試類
public class AtomicVariablesDemo {
    public static void main(String[] args) {
        Counter counter = new Counter();
        DataStore dataStore = new DataStore();

        // 多線程環(huán)境中對計數(shù)器和數(shù)據(jù)總量的更新
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                counter.increment();
                dataStore.addData(100);  // 假設(shè)每次操作增加100單位數(shù)據(jù)
            }).start();
        }

        // 等待所有線程完成
        while (Thread.activeCount() > 1) {
            Thread.yield();
        }

        // 輸出計數(shù)器的值和數(shù)據(jù)總量
        System.out.println("Counter value: " + counter.getCount());
        System.out.println("Data store size: " + dataStore.getDataCount());
    }
}

業(yè)務(wù)代碼案例:

場景描述:社交網(wǎng)絡(luò)的實時消息計數(shù)

業(yè)務(wù)說明: 社交網(wǎng)絡(luò)平臺需要顯示每個用戶的實時消息通知數(shù)。每當(dāng)用戶收到新消息時,消息計數(shù)需要增加;用戶閱讀消息時,計數(shù)可能會減少或被重置。此計數(shù)需要對所有用戶可見,且在高并發(fā)環(huán)境下保持準(zhǔn)確。

為什么需要 AtomicVariables 技術(shù): 在社交網(wǎng)絡(luò)中,多個用戶可能同時發(fā)送消息給同一個接收者,或者一個用戶可能同時在多個設(shè)備上接收消息。這導(dǎo)致對消息計數(shù)的讀取和更新操作非常頻繁。使用 AtomicInteger 可以確保消息計數(shù)更新的原子性,并且在多線程環(huán)境下保持數(shù)據(jù)的一致性。

沒有 AtomicVariables 技術(shù)會帶來什么后果:

沒有使用 AtomicVariables 或其他并發(fā)控制機制可能會導(dǎo)致以下問題:

  1. 數(shù)據(jù)不一致:消息計數(shù)可能會出錯,導(dǎo)致用戶看到不正確的消息數(shù)量。
  2. 用戶體驗下降:如果消息通知不準(zhǔn)確,用戶可能會錯過重要通知,或者對應(yīng)用的可靠性產(chǎn)生懷疑。
  3. 系統(tǒng)復(fù)雜度增加:在沒有有效同步機制的情況下,維護數(shù)據(jù)一致性將變得復(fù)雜且容易出錯。

代碼實現(xiàn):

import java.util.concurrent.atomic.AtomicInteger;

public class MessageNotificationCounter {
    private final AtomicInteger messageCount = new AtomicInteger(0);

    // 接收新消息時調(diào)用此方法
    public void receiveMessage() {
        // 原子地增加消息計數(shù)
        messageCount.incrementAndGet();
        System.out.println("New message received. Total messages: " + messageCount.get());
    }

    // 用戶閱讀消息時調(diào)用此方法
    public void messagesRead() {
        // 原子地減少消息計數(shù)
        messageCount.decrementAndGet();
        System.out.println("Messages read. Remaining messages: " + messageCount.get());
    }

    // 獲取當(dāng)前消息計數(shù)
    public int getMessageCount() {
        return messageCount.get();
    }
}

// 測試類
public class AtomicVariablesDemo {
    public static void main(String[] args) {
        MessageNotificationCounter counter = new MessageNotificationCounter();

        // 多個用戶同時發(fā)送消息
        Thread sender1 = new Thread(() -> {
            counter.receiveMessage();
        });
        Thread sender2 = new Thread(() -> {
            counter.receiveMessage();
        });

        // 用戶閱讀消息
        Thread reader = new Thread(() -> {
            counter.messagesRead();
        });

        sender1.start();
        sender2.start();
        reader.start();

        sender1.join();
        sender2.join();
        reader.join();

        System.out.println("Final message count: " + counter.getMessageCount());
    }
}

2.9. ConcurrentHashMap

ConcurrentHashMap 是 Java 中一個線程安全的哈希表,它通過分段鎖(Segmentation)和 CAS 操作來支持高并發(fā)的讀寫操作。 

圖解說明:
  • Java 線程:表示運行中的線程,它們可能需要對 ConcurrentHashMap 進行讀寫操作。
  • ConcurrentHashMap 實例:是 ConcurrentHashMap 類的實例,用于存儲鍵值對并提供線程安全的訪問。
  • Segment 數(shù)組: ConcurrentHashMap 將哈希表分為多個段(Segment),每個段維護一部分哈希桶,通過分段鎖減少鎖的競爭。
  • Hash 桶:存儲哈希桶數(shù)組,每個桶可以包含一個或多個鍵值對。
  • 鏈表或紅黑樹:在哈希桶中,鍵值對最初以鏈表形式存儲,當(dāng)鏈表長度超過閾值時,鏈表可能會被轉(zhuǎn)換為紅黑樹以提高搜索效率。
  • 共享資源:表示存儲在 ConcurrentHashMap 中的鍵值對數(shù)據(jù)。
  • 讀操作:線程可以并發(fā)地讀取 ConcurrentHashMap 中的數(shù)據(jù),在讀多寫少的場景下,讀操作不會阻塞其他讀操作。
  • 寫操作:線程對 ConcurrentHashMap 的寫入操作,寫操作需要獲取相應(yīng)段的鎖。
  • 鎖:每個段擁有自己的鎖,寫操作需要獲取鎖,而讀操作通常不需要。
升級設(shè)計說明:
Java 1.7 ConcurrentHashMap 鎖機制

在 Java 1.7 中, ConcurrentHashMap 使用分段鎖機制,其中每個段相當(dāng)于一個小的哈希表,擁有自己的鎖。 

Java 1.8 ConcurrentHashMap 鎖機制

在 Java 1.8 中, ConcurrentHashMap 摒棄了分段鎖機制,采用了 CAS 和 synchronized 來確保線程安全。

綜合說明:
  • 作用: ConcurrentHashMap 是 Java 中提供的一個線程安全的哈希表,它通過分段鎖的概念來允許并發(fā)的讀寫操作,從而提高并發(fā)訪問的性能。
  • 背景:傳統(tǒng)的 HashMap 在多線程環(huán)境下需要外部同步,而 ConcurrentHashMap 通過鎖分離技術(shù)減少了鎖的競爭,提供了更好的并發(fā)性能。
  • 優(yōu)點:
  • 高并發(fā):通過細分鎖到段,允許多個線程同時操作不同段的數(shù)據(jù)。
  • 動態(tài)擴容:內(nèi)部采用動態(tài)數(shù)組和鏈表結(jié)構(gòu),提高了空間和時間效率。
  • 缺點:
  • 復(fù)雜度高:實現(xiàn)復(fù)雜,需要維護多個鎖和復(fù)雜的數(shù)據(jù)結(jié)構(gòu)。

  • 性能調(diào)優(yōu):在極端高并發(fā)場景下,可能需要調(diào)整默認的并發(fā)級別。

  • 場景:適用于需要高并發(fā)訪問的緩存或數(shù)據(jù)存儲。

  • 業(yè)務(wù)舉例:在大數(shù)據(jù)處理系統(tǒng)中, ConcurrentHashMap 可以用來存儲實時計算結(jié)果,支持大量并發(fā)的讀寫操作而不會導(dǎo)致性能瓶頸。

使用方式:
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;

public class ConcurrentHashMapDemo {
    // 創(chuàng)建一個 ConcurrentHashMap 實例
    private final ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

    // 將一個鍵值對插入到 Map 中
    public void put(String key, Integer value) {
        // put 方法是線程安全的
        map.put(key, value);
    }

    // 從 Map 中獲取與指定鍵關(guān)聯(lián)的值
    public Integer get(String key) {
        // get 方法是線程安全的
        return map.get(key);
    }

    // 計算 Map 中的元素數(shù)量
    public int size() {
        // size 方法是線程安全的
        return map.size();
    }

    // 演示刪除操作
    public void remove(String key) {
        // remove 方法是線程安全的
        map.remove(key);
    }

    // 演示如何批量添加數(shù)據(jù)
    public void addAll(Map<String, Integer> newData) {
        // putAll 方法是線程安全的
        map.putAll(newData);
    }

    public static void main(String[] args) {
        ConcurrentHashMapDemo demo = new ConcurrentHashMapDemo();

        // 批量添加數(shù)據(jù)
        demo.addAll(Map.of("key1", 1, "key2", 2, "key3", 3));

        // 單獨添加一條數(shù)據(jù)
        demo.put("key4", 4);

        // 獲取并打印一條數(shù)據(jù)
        System.out.println("Value for 'key1': " + demo.get("key1"));

        // 獲取 Map 的大小
        System.out.println("Map size: " + demo.size());

        // 刪除一條數(shù)據(jù)
        demo.remove("key2");

        // 再次獲取 Map 的大小
        System.out.println("Map size after removal: " + demo.size());
    }
}
業(yè)務(wù)代碼案例:

業(yè)務(wù)說明: 在分布式緩存系統(tǒng)中,經(jīng)常需要存儲和檢索用戶會話信息、應(yīng)用配置、熱點數(shù)據(jù)等。這些數(shù)據(jù)需要被多個應(yīng)用實例共享,并且要求在高并發(fā)環(huán)境下依然保持高性能。緩存數(shù)據(jù)通常有過期時間,需要定期清理。

為什么需要 ConcurrentHashMap 技術(shù): ConcurrentHashMap 提供了一種高效的方式來處理并發(fā)的讀取和更新操作,并且它的分段鎖機制允許多個線程同時對不同段進行操作,從而提高并發(fā)處理能力。此外, ConcurrentHashMap 在 Java 8 中引入的紅黑樹結(jié)構(gòu)使得即使在高并發(fā)更新導(dǎo)致哈希沖突時,也能保持高效的性能。

沒有 ConcurrentHashMap 技術(shù)會帶來什么后果:

沒有使用 ConcurrentHashMap 可能會導(dǎo)致以下問題:

  1. 性能瓶頸:在高并發(fā)環(huán)境下,如果使用 HashMap 加 synchronized,可能導(dǎo)致嚴(yán)重的性能瓶頸,因為所有線程必須等待一個鎖。
  2. 數(shù)據(jù)不一致:在沒有適當(dāng)同步的情況下,多個線程同時更新數(shù)據(jù)可能導(dǎo)致緩存數(shù)據(jù)不一致。
  3. 擴展性差:隨著系統(tǒng)負載的增加,基于 HashMap 的緩存解決方案可能難以擴展,因為鎖競爭和線程安全問題。

代碼實現(xiàn):

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

public class DistributedCache<K, V> {
    private final ConcurrentHashMap<K, V> cacheMap = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<K, Long> expirationMap = new ConcurrentHashMap<>();

    public void put(K key, V value, long ttl) {
        cacheMap.put(key, value);
        expirationMap.put(key, System.currentTimeMillis() + ttl);
        scheduleEviction(key, ttl);
    }

    public V get(K key) {
        Long expirationTime = expirationMap.get(key);
        if (expirationTime == null || expirationTime < System.currentTimeMillis()) {
            cacheMap.remove(key);
            expirationMap.remove(key);
            return null;
        }
        return cacheMap.get(key);
    }

    private void scheduleEviction(final K key, final long ttl) {
        new Thread(() -> {
            try {
                TimeUnit.MILLISECONDS.sleep(ttl);
                cacheMap.computeIfPresent(key, (k, v) -> null);
                expirationMap.remove(key);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }

    public static void main(String[] args) {
        DistributedCache<String, String> cache = new DistributedCache<>();
        cache.put("userSession", "sessionData", 5000); // 緩存設(shè)置5秒過期

        // 多個線程并發(fā)訪問緩存
        for (int i = 0; i < 100; i++) {
            int finalI = i;
            new Thread(() -> {
                String result = cache.get("userSession");
                System.out.println("Thread " + finalI + " retrieved: " + result);
            }).start();
        }
    }
}

2.10.ConcurrentSkipListMap

ConcurrentSkipListMap 是 Java 中實現(xiàn)的一個高性能并發(fā)的有序映射,它使用跳表(Skip List)作為其底層數(shù)據(jù)結(jié)構(gòu)。 

圖片圖片

圖解說明:
  • Java 線程:表示運行中的線程,它們可能需要對 ConcurrentSkipListMap 進行讀寫操作。
  • ConcurrentSkipListMap 實例:是 ConcurrentSkipListMap 類的實例,用于存儲鍵值對并提供線程安全的訪問。
  • Skip List 層級結(jié)構(gòu):跳表由多層索引構(gòu)成,每一層都是一個有序的鏈表。
  • 索引層:跳表中的索引層,用于加速搜索操作。
  • 數(shù)據(jù)層:跳表中的底層數(shù)據(jù)結(jié)構(gòu),存儲實際的鍵值對。
  • Node 節(jié)點:跳表中的節(jié)點,包含鍵值對和指向其他節(jié)點的鏈接。
  • 共享資源:表示存儲在 ConcurrentSkipListMap 中的鍵值對數(shù)據(jù)。
  • 讀操作:線程可以并發(fā)地讀取 ConcurrentSkipListMap 中的數(shù)據(jù)。
  • 寫操作:線程可以并發(fā)地修改 ConcurrentSkipListMap 中的數(shù)據(jù)。
  • CAS 操作:在更新節(jié)點鏈接或修改數(shù)據(jù)時,使用 CAS 操作來保證線程安全。
  • 自旋鎖/同步塊:在某些情況下,如果 CAS 操作失敗,可能會使用自旋鎖或同步塊來確保操作的原子性。

操作流程:

  1. 讀操作:

線程通過索引層快速定位到數(shù)據(jù)層的節(jié)點。

線程使用 volatile 讀取節(jié)點的值,確保內(nèi)存可見性。

  1. 寫操作:
  • 線程在更新或添加節(jié)點時,首先嘗試使用 CAS 操作。

  • 如果 CAS 操作失敗,線程可能會使用自旋鎖或同步塊來確保原子性。

綜合說明:

作用: ConcurrentSkipListMap 是一種線程安全的有序映射,它通過使用跳表(Skip List)數(shù)據(jù)結(jié)構(gòu)來支持高效的并發(fā)訪問和排序操作。 背景:在需要高效并發(fā)訪問和保持元素有序的場景中,傳統(tǒng)的 TreeMap 由于其加鎖策略在高并發(fā)環(huán)境下性能受限, ConcurrentSkipListMap 提供了一種替代方案。 優(yōu)點:

  • 高性能并發(fā)訪問:通過跳表結(jié)構(gòu)和細粒度鎖定,實現(xiàn)了高效的并發(fā)讀取和更新。
  • 有序性:保持元素的有序性,支持范圍查詢等操作。
  • 動態(tài)調(diào)整:可以根據(jù)訪問模式動態(tài)調(diào)整結(jié)構(gòu),優(yōu)化性能。 缺點:
  • 內(nèi)存占用:相比無序的 ConcurrentHashMap,由于維護了有序性,內(nèi)存占用可能更高。
  • 復(fù)雜性:實現(xiàn)相對復(fù)雜,涉及多級索引和節(jié)點的管理。 場景:適用于需要有序數(shù)據(jù)且高并發(fā)訪問的場景,如實時數(shù)據(jù)索引、范圍查詢等。 業(yè)務(wù)舉例:在一個金融市場分析系統(tǒng)中,需要維護一個實時更新的價格索引, ConcurrentSkipListMap 可以用來存儲和快速檢索各種金融工具的當(dāng)前價格。
使用方式:
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;

public class ConcurrentSkipListMapDemo {
    // 創(chuàng)建一個 ConcurrentSkipListMap 實例
    private final ConcurrentSkipListMap<Integer, String> map = new ConcurrentSkipListMap<>();

    // 將一個鍵值對插入到 Map 中
    public void put(Integer key, String value) {
        // put 方法是線程安全的
        map.put(key, value);
    }

    // 從 Map 中獲取與指定鍵關(guān)聯(lián)的值
    public String get(Integer key) {
        // get 方法是線程安全的
        return map.get(key);
    }

    // 獲取 Map 的鍵集合
    public java.util.NavigableSet<Integer> keySet() {
        // keySet 方法返回 Map 的鍵集合視圖
        return map.keySet();
    }

    // 獲取 Map 的值集合
    public java.util.Collection<String> values() {
        // values 方法返回 Map 的值集合視圖
        return map.values();
    }

    // 獲取 Map 的大小
    public int size() {
        // size 方法是線程安全的
        return map.size();
    }

    // 演示刪除操作
    public void remove(Integer key) {
        // remove 方法是線程安全的
        map.remove(key);
    }

    public static void main(String[] args) {
        ConcurrentSkipListMapDemo demo = new ConcurrentSkipListMapDemo();

        // 插入一些數(shù)據(jù)
        demo.put(1, "One");
        demo.put(2, "Two");
        demo.put(3, "Three");

        // 獲取并打印一條數(shù)據(jù)
        System.out.println("Value for key 2: " + demo.get(2));

        // 獲取 Map 的大小
        System.out.println("Map size: " + demo.size());

        // 獲取并打印所有鍵
        System.out.println("All keys: " + demo.keySet());

        // 刪除一條數(shù)據(jù)
        demo.remove(2);

        // 再次獲取 Map 的大小
        System.out.println("Map size after removal: " + demo.size());

        // 獲取并打印所有值
        System.out.println("All values: " + demo.values());
    }
}
業(yè)務(wù)代碼案例:

業(yè)務(wù)說明: 實時股票交易系統(tǒng)需要維護一個動態(tài)變化的股票價格索引,該索引需要根據(jù)實時的市場數(shù)據(jù)進行更新,并且允許多個交易線程并發(fā)地讀取和更新股票價格。此外,系統(tǒng)還需要定期根據(jù)價格波動進行調(diào)整,如計算價格的平均值、執(zhí)行價格范圍查詢等。

為什么需要 ConcurrentSkipListMap 技術(shù): ConcurrentSkipListMap 是一個線程安全的有序映射,它允許高效的范圍查詢和有序訪問,這對于股票價格索引來說至關(guān)重要。由于股票價格會頻繁更新,且需要快速響應(yīng)市場變化,使用 ConcurrentSkipListMap 可以提供高效的插入、刪除和查找操作,同時保持數(shù)據(jù)的有序性。

沒有 ConcurrentSkipListMap 技術(shù)會帶來什么后果:

沒有使用 ConcurrentSkipListMap 或其他適合有序并發(fā)操作的數(shù)據(jù)結(jié)構(gòu)可能會導(dǎo)致以下問題:

  1. 性能瓶頸:如果使用 HashMap 或 ConcurrentHashMap,雖然可以實現(xiàn)并發(fā)更新,但無法高效執(zhí)行有序操作和范圍查詢,可能導(dǎo)致查詢性能不佳。
  2. 數(shù)據(jù)不一致:在高并發(fā)更新的情況下,如果沒有適當(dāng)?shù)耐綑C制,可能會導(dǎo)致價格信息的不一致。
  3. 復(fù)雜性增加:如果使用 synchronized 列表或數(shù)組來維護價格索引,可能需要手動管理復(fù)雜的同步和排序邏輯,增加系統(tǒng)復(fù)雜性和出錯的風(fēng)險。

代碼實現(xiàn):

import java.util.concurrent.ConcurrentSkipListMap;

public class StockPriceIndex {
    private final ConcurrentSkipListMap<String, Double> priceIndex = new ConcurrentSkipListMap<>();

    public void updatePrice(String stockSymbol, Double newPrice) {
        // 更新股票價格
        priceIndex.put(stockSymbol, newPrice);
    }

    public Double getPrice(String stockSymbol) {
        // 獲取股票價格
        return priceIndex.get(stockSymbol);
    }

    public void removeStock(String stockSymbol) {
        // 移除股票信息
        priceIndex.remove(stockSymbol);
    }

    public ConcurrentSkipListMap<String, Double> headMap(String toKey) {
        // 獲取指定范圍內(nèi)的股票價格索引
        return priceIndex.headMap(toKey);
    }

    public static void main(String[] args) {
        StockPriceIndex index = new StockPriceIndex();
        index.updatePrice("AAPL", 150.00);
        index.updatePrice("GOOGL", 2750.50);
        index.updatePrice("MSFT", 250.00);

        System.out.println("Price of AAPL: " + index.getPrice("AAPL"));
        System.out.println("Price of GOOGL: " + index.getPrice("GOOGL"));

        // 獲取所有小于 "MSFT" 的股票價格索引
        ConcurrentSkipListMap<String, Double> subMap = index.headMap("MSFT");
        subMap.forEach((k, v) -> System.out.println("Stock: " + k + ", Price: " + v));
    }
}

2.11. ConcurrentLinkedQueue

ConcurrentLinkedQueue 是 Java 中一個線程安全的無鎖隊列,它使用 CAS (Compare-And-Swap) 操作來保證線程安全。 

圖片圖片

圖解說明:
  • Java 線程:表示運行中的線程,它們可能需要對 ConcurrentLinkedQueue 進行入隊或出隊操作。
  • ConcurrentLinkedQueue 實例:是 ConcurrentLinkedQueue 類的實例,用于存儲隊列中的元素并提供線程安全的訪問。
  • Node 節(jié)點結(jié)構(gòu): ConcurrentLinkedQueue 使用內(nèi)部的 Node 類來存儲隊列中的每個元素。每個節(jié)點包含隊列中的一個元素和指向下一個節(jié)點的鏈接。
  • 虛擬頭節(jié)點:隊列使用一個虛擬頭節(jié)點來簡化出隊操作。虛擬頭節(jié)點不存儲實際的隊列元素。
  • 虛擬尾節(jié)點:隊列使用一個虛擬尾節(jié)點來簡化入隊操作。虛擬尾節(jié)點指向隊列中的最后一個節(jié)點。
  • 隊列元素:表示存儲在隊列中的實際數(shù)據(jù)。
  • 入隊操作:線程將新元素添加到隊列尾部的過程,通過 CAS 更新虛擬尾節(jié)點的鏈接。
  • 出隊操作:線程從隊列頭部移除元素的過程,通過 CAS 更新虛擬頭節(jié)點的鏈接。
  • CAS 操作: ConcurrentLinkedQueue 使用 CAS 操作來更新節(jié)點之間的鏈接,從而實現(xiàn)無鎖的線程安全隊列。
  • 自旋等待:在 CAS 操作失敗時,線程可能會自旋等待直到操作成功。

操作流程:

  1. 入隊操作:線程通過 CAS 操作將新節(jié)點插入到隊列尾部,并更新尾節(jié)點指針。
  2. 出隊操作:線程通過 CAS 操作移除隊列頭部的節(jié)點,并更新頭節(jié)點指針。
  3. CAS 操作:在入隊和出隊過程中,線程使用 CAS 來保證節(jié)點鏈接的原子性更新。
綜合說明:

作用: ConcurrentLinkedQueue 是一種基于鏈接節(jié)點的無界線程安全隊列,支持高并發(fā)的入隊和出隊操作。 背景:在多線程環(huán)境中,需要一種高效的隊列來處理任務(wù)或消息傳遞, ConcurrentLinkedQueue 提供了一種無鎖的解決方案。 優(yōu)點:

  • 無鎖設(shè)計:利用 CAS 操作實現(xiàn)無鎖的線程安全隊列,提高了并發(fā)性能。
  • 簡單高效:提供了簡單的入隊和出隊操作,適合作為任務(wù)隊列或消息傳遞隊列。
  • 無界隊列:理論上隊列大小無界,適用于處理大量任務(wù)。 缺點:
  • 可能的內(nèi)存消耗:由于是無界隊列,在極端情況下可能會消耗大量內(nèi)存。
  • 性能限制:在某些高競爭場景下,CAS 操作可能導(dǎo)致性能瓶頸。 場景:適用于作為任務(wù)隊列或消息傳遞隊列,支持高并發(fā)的入隊和出隊操作。 業(yè)務(wù)舉例:在一個分布式計算系統(tǒng)中, ConcurrentLinkedQueue 可以用于收集各個計算節(jié)點的輸出結(jié)果,然后由一個或多個消費者線程進行處理。

這兩個并發(fā)集合類在 Java 中提供了強大的工具,以支持復(fù)雜的并發(fā)數(shù)據(jù)處理需求,它們各自適用于不同的應(yīng)用場景,可以根據(jù)具體需求選擇合適的并發(fā)集合。kedQueue

使用方式:
import java.util.concurrent.ConcurrentLinkedQueue;

public class ConcurrentLinkedQueueDemo {
    // 創(chuàng)建一個 ConcurrentLinkedQueue 實例
    private final ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();

    // 向隊列中添加元素
    public void add(int number) {
        // add 方法是線程安全的
        queue.add(number);
        System.out.println("Added " + number);
    }

    // 從隊列中獲取并移除元素
    public Integer poll() {
        // poll 方法是線程安全的,返回并移除隊列頭部的元素
        Integer result = queue.poll();
        if (result != null) {
            System.out.println("Polled " + result);
        } else {
            System.out.println("Queue is empty");
        }
        return result;
    }

    // 查看隊列頭部的元素但不移除
    public Integer peek() {
        // peek 方法是線程安全的,返回隊列頭部的元素但不移除
        Integer result = queue.peek();
        if (result != null) {
            System.out.println("Peeked " + result);
        } else {
            System.out.println("Queue is empty");
        }
        return result;
    }

    // 獲取隊列的大小
    public int size() {
        // size 方法估算隊列的大小
        int size = queue.size();
        System.out.println("Queue size: " + size);
        return size;
    }

    public static void main(String[] args) {
        ConcurrentLinkedQueueDemo demo = new ConcurrentLinkedQueueDemo();

        // 啟動生產(chǎn)者線程
        Thread producerThread = new Thread(() -> {
            demo.add(1);
            demo.add(2);
            demo.add(3);
        });

        // 啟動消費者線程
        Thread consumerThread = new Thread(() -> {
            demo.poll();
            demo.poll();
            demo.poll();
            demo.poll(); // 這次調(diào)用應(yīng)該會返回 null,因為隊列已空
        });

        producerThread.start();
        consumerThread.start();

        producerThread.join();
        consumerThread.join();

        // 在所有線程完成后獲取隊列大小
        demo.size();
    }
}
業(yè)務(wù)代碼案例:

業(yè)務(wù)說明: 大規(guī)模日志處理系統(tǒng)需要從多個源實時收集、存儲并分析日志數(shù)據(jù)。這些日志數(shù)據(jù)通常由分布在不同服務(wù)器上的應(yīng)用程序生成,并且需要被快速地處理以避免數(shù)據(jù)丟失或延遲問題。

為什么需要 ConcurrentLinkedQueue 技術(shù): 在日志處理場景中,日志數(shù)據(jù)的產(chǎn)生速度往往非常快,且來源眾多,因此需要一個高效且線程安全的隊列來緩存這些日志數(shù)據(jù)。 ConcurrentLinkedQueue 提供了高吞吐量和低延遲的并發(fā)訪問,無需使用鎖,使得它特別適合用作日志數(shù)據(jù)的緩沖區(qū)。此外,由于 ConcurrentLinkedQueue 是無界的,因此不會阻塞生產(chǎn)者線程,即使在高負載情況下也能保持高性能。

沒有 ConcurrentLinkedQueue 技術(shù)會帶來什么后果: 沒有使用 ConcurrentLinkedQueue 或其他高效的并發(fā)隊列可能會導(dǎo)致以下問題:

  1. 數(shù)據(jù)丟失:如果使用有界隊列且沒有適當(dāng)?shù)纳a(chǎn)者速率控制,可能會因為隊列滿導(dǎo)致日志數(shù)據(jù)丟失。
  2. 性能瓶頸:如果使用鎖或其他同步機制來保護共享隊列,可能會導(dǎo)致性能瓶頸,尤其是在高并發(fā)場景下。
  3. 系統(tǒng)不穩(wěn)定:在高負載情況下,如果隊列處理速度跟不上數(shù)據(jù)產(chǎn)生速度,可能會導(dǎo)致系統(tǒng)崩潰或重啟。

代碼實現(xiàn):

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class LogProcessor {
    private final ConcurrentLinkedQueue<String> logQueue = new ConcurrentLinkedQueue<>();
    private final ExecutorService processorPool = Executors.newFixedThreadPool(10);

    public void log(String message) {
        // 生產(chǎn)者線程調(diào)用此方法來添加日志到隊列
        logQueue.add(message);
    }

    public void startLogProcessing() {
        // 消費者線程池,用于處理隊列中的日志
        processorPool.submit(() -> {
            while (true) {
                try {
                    // 消費者線程調(diào)用此方法來處理隊列中的日志
                    String logEntry = logQueue.poll();
                    if (logEntry != null) {
                        processLog(logEntry);
                    } else {
                        TimeUnit.MILLISECONDS.sleep(100); // 避免 CPU 過載
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });
    }

    private void processLog(String logEntry) {
        // 實際處理日志的邏輯
        System.out.println("Processing log: " + logEntry);
    }

    public static void main(String[] args) {
        LogProcessor logProcessor = new LogProcessor();
        logProcessor.startLogProcessing();

        // 多個生產(chǎn)者線程生成日志
        for (int i = 0; i < 100; i++) {
            int finalI = i;
            new Thread(() -> {
                logProcessor.log("Log entry " + finalI);
            }).start();
        }
    }
}

2.12. BlockingQueue

BlockingQueue 是 Java 中用于線程間通信的隊列,支持阻塞操作,當(dāng)隊列為空時,獲取元素的操作會阻塞;當(dāng)隊列滿時,插入元素的操作會阻塞。 

圖片圖片

圖解說明:
  • Java 線程:表示運行中的線程,它們可能需要向隊列中添加或移除元素。
  • BlockingQueue 實例:是 BlockingQueue 接口的具體實現(xiàn),如 ArrayBlockingQueue、 LinkedBlockingQueue 等,用于線程間通信。
  • 內(nèi)部數(shù)據(jù)結(jié)構(gòu):表示 BlockingQueue 內(nèi)部用于存儲元素的數(shù)據(jù)結(jié)構(gòu),如數(shù)組、鏈表等。
  • 隊列容量:表示 BlockingQueue 的最大容量,如果隊列有界,則插入操作在隊列滿時會阻塞。
  • 等待區(qū)(元素) :表示當(dāng)隊列為空時,等待獲取元素的線程集合。
  • 等待區(qū)(空間) :表示當(dāng)隊列滿時,等待空間釋放的線程集合。
  • 元素添加操作:表示向 BlockingQueue 中添加元素的操作,如果隊列滿,則操作會阻塞。
  • 元素移除操作:表示從 BlockingQueue 中移除元素的操作,如果隊列為空,則操作會阻塞。
綜合說明:
  • 作用: BlockingQueue 是一個線程安全的隊列,支持阻塞操作,當(dāng)隊列為空時,獲取元素的操作會阻塞;當(dāng)隊列滿時,插入元素的操作會阻塞。
  • 背景:在生產(chǎn)者-消費者模型中,需要一種機制來協(xié)調(diào)生產(chǎn)者和消費者之間的操作, BlockingQueue 提供了這種協(xié)調(diào)。
  • 優(yōu)點:
  • 線程協(xié)調(diào):自然地實現(xiàn)了生產(chǎn)者-消費者之間的線程協(xié)調(diào)。
  • 阻塞操作:提供了阻塞獲取和阻塞插入的方法,簡化了并發(fā)編程。
  • 缺點:
  • 可能的死鎖:不當(dāng)使用可能導(dǎo)致死鎖,例如一個線程永久阻塞等待一個不會到來的元素。

  • 性能考慮:在高并發(fā)環(huán)境下,隊列的容量和鎖策略需要仔細調(diào)優(yōu)。

  • 場景:適用于生產(chǎn)者-消費者場景,如任務(wù)分配、資源池管理等。

  • 業(yè)務(wù)舉例:在消息處理系統(tǒng)中, BlockingQueue 可以用于緩存待處理的消息,生產(chǎn)者線程生成消息并放入隊列,消費者線程從隊列中取出并處理消息,確保了消息的順序性和系統(tǒng)的響應(yīng)性。

使用方式:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class BlockingQueueDemo {
    // 創(chuàng)建一個 LinkedBlockingQueue 實例,容量限制為10
    private final BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(10);

    // 向 BlockingQueue 中添加元素
    public void produce(Integer element) throws InterruptedException {
        // put 方法在隊列滿時阻塞,直到隊列中有空間
        blockingQueue.put(element);
        System.out.println("Produced: " + element);
    }

    // 從 BlockingQueue 中獲取元素
    public Integer consume() throws InterruptedException {
        // take 方法在隊列空時阻塞,直到隊列中有元素
        Integer element = blockingQueue.take();
        System.out.println("Consumed: " + element);
        return element;
    }

    // 獲取 BlockingQueue 的大小
    public int size() {
        // size 方法返回隊列當(dāng)前的元素數(shù)量
        return blockingQueue.size();
    }

    public static void main(String[] args) throws InterruptedException {
        BlockingQueueDemo demo = new BlockingQueueDemo();

        // 創(chuàng)建生產(chǎn)者線程
        Thread producerThread = new Thread(() -> {
            try {
                for (int i = 0; i < 15; i++) {
                    demo.produce(i);
                    Thread.sleep(100); // 生產(chǎn)延時
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 創(chuàng)建消費者線程
        Thread consumerThread = new Thread(() -> {
            try {
                for (int i = 0; i < 15; i++) {
                    int element = demo.consume();
                    Thread.sleep(150); // 消費延時
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        producerThread.start();
        consumerThread.start();

        producerThread.join();
        consumerThread.join();

        // 打印最終隊列的大小
        System.out.println("Final queue size: " + demo.size());
    }
}
業(yè)務(wù)代碼案例:

業(yè)務(wù)說明: 消息隊列系統(tǒng)在微服務(wù)架構(gòu)中用于異步處理任務(wù),例如發(fā)送郵件、短信通知等。這些服務(wù)通常由獨立的服務(wù)實例處理,以提高系統(tǒng)的響應(yīng)性和可擴展性。消息隊列需要能夠處理高并發(fā)的消息生產(chǎn)和消費,確保消息的可靠傳遞。

為什么需要 BlockingQueue 技術(shù): BlockingQueue 提供了一種有效的機制來處理生產(chǎn)者-消費者場景,特別是在面對高并發(fā)和需要線程安全時。它能夠使生產(chǎn)者在隊列滿時阻塞,消費者在隊列空時阻塞,從而平衡生產(chǎn)和消費的速度,確保系統(tǒng)的穩(wěn)定性和消息的不丟失。

沒有 BlockingQueue 技術(shù)會帶來什么后果:

沒有使用 BlockingQueue 或其他并發(fā)隊列可能會導(dǎo)致以下問題:

  1. 消息丟失:在高并發(fā)情況下,如果沒有適當(dāng)?shù)臋C制來控制消息的產(chǎn)生和消費,可能會導(dǎo)致消息丟失。
  2. 系統(tǒng)過載:如果沒有流控機制,生產(chǎn)者可能會過快地生成消息,導(dǎo)致系統(tǒng)資源耗盡,甚至崩潰。
  3. 數(shù)據(jù)不一致:在多線程環(huán)境下,如果不正確地管理消息的訪問,可能會導(dǎo)致數(shù)據(jù)處理的不一致性。

代碼實現(xiàn):

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class MessageQueueSystem {
    private final BlockingQueue<Message> messageQueue = new LinkedBlockingQueue<>();

    public void produceMessage(String content) throws InterruptedException {
        // 將消息添加到隊列中,如果隊列滿了,生產(chǎn)者線程將被阻塞
        messageQueue.put(new Message(content));
        System.out.println("Message produced: " + content);
    }

    public Message consumeMessage() throws InterruptedException {
        // 從隊列中取出消息,如果隊列空了,消費者線程將被阻塞
        Message message = messageQueue.take();
        System.out.println("Message consumed: " + message.getContent());
        return message;
    }

    public static void main(String[] args) throws InterruptedException {
        MessageQueueSystem messageQueueSystem = new MessageQueueSystem();

        // 創(chuàng)建生產(chǎn)者線程
        Thread producerThread = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    messageQueueSystem.produceMessage("Message " + i);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // 創(chuàng)建消費者線程
        Thread consumerThread = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    messageQueueSystem.consumeMessage();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producerThread.start();
        consumerThread.start();

        producerThread.join();
        consumerThread.join();
    }
}

class Message {
    private final String content;

    public Message(String content) {
        this.content = content;
    }

    public String getContent() {
        return content;
    }
}

2.13. Condition

Condition 是 Java 中 java.util.concurrent.locks 包提供的一個接口,它用于實現(xiàn)等待/通知機制。 Condition 通常與 Lock 接口配合使用,允許一個或多個線程在某些條件滿足之前掛起,并在條件滿足時被喚醒。 

圖片圖片

圖解說明:
  • Java 線程:表示運行中的線程,它們可能需要在某些條件滿足之前掛起。
  • Lock 實例:是 Lock 接口的具體實現(xiàn),如 ReentrantLock,用于控制對共享資源的訪問。
  • Condition 實例:是 Condition 接口的具體實現(xiàn),與 Lock 實例配合使用,用于線程間的等待/通知機制。
  • 等待隊列(線程) :當(dāng)線程調(diào)用 Condition 的 await() 方法時,如果條件不滿足,線程會被放入等待隊列。
  • 共享資源:表示被多個線程共享的數(shù)據(jù),需要通過 Lock 和 Condition 來保護以確保線程安全。
  • 條件檢查:表示線程在嘗試獲取資源之前需要檢查的條件。
  • 喚醒信號:當(dāng)條件滿足時,其他線程會發(fā)送喚醒信號給等待隊列中的線程。
  • 鎖狀態(tài):表示鎖的當(dāng)前狀態(tài),如是否被鎖定,以及鎖定的線程等。
操作流程:
  1. 鎖定:線程通過 Lock 實例獲取鎖。
  2. 條件檢查:線程檢查條件是否滿足。
  3. 等待:如果條件不滿足,線程調(diào)用 Condition 的 await() 方法,釋放鎖并進入等待隊列。
  4. 喚醒:當(dāng)條件滿足時,其他線程調(diào)用 Condition 的 signal() 或 signalAll() 方法,發(fā)送喚醒信號給等待隊列中的線程。
  5. 重新競爭鎖:被喚醒的線程重新競爭鎖。
  6. 再次檢查條件:線程在重新獲得鎖后,再次檢查條件是否滿足,如果滿足則繼續(xù)執(zhí)行。
綜合說明:
  • 作用: Condition 是與 Lock 接口配合使用的同步輔助工具,它允許一個或多個線程等待,直到被其他線程喚醒。
  • 背景:在復(fù)雜的同步場景中,需要更細粒度的控制線程的等待和喚醒, Condition 提供了這種能力。
  • 優(yōu)點:
  • 細粒度控制:提供了比 Object.wait()/ Object.notify() 更靈活的線程間協(xié)調(diào)機制。
  • 多條件支持:一個鎖可以關(guān)聯(lián)多個條件,每個條件可以獨立喚醒等待的線程。
  • 缺點:
  • 使用復(fù)雜:需要與 Lock 一起使用,增加了編程復(fù)雜度。

  • 錯誤使用可能導(dǎo)致死鎖或線程饑餓。

  • 場景:適用于需要線程間復(fù)雜協(xié)調(diào)的場景,如任務(wù)調(diào)度、資源分配等。

  • 業(yè)務(wù)舉例:在酒店預(yù)訂系統(tǒng)中, Condition 可以用于實現(xiàn)房間狀態(tài)的等待和通知機制。當(dāng)房間變?yōu)榭臻e時,等待的顧客可以被通知并進行預(yù)訂。

使用方式:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedBuffer {
    private final Object[] buffer;
    private int putPtr, takePtr, count;
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public BoundedBuffer(int size) {
        buffer = new Object[size];
    }

    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == buffer.length) { // 等待直到緩沖區(qū)非滿
                notFull.await();
            }
            buffer[putPtr] = x;
            putPtr = (putPtr + 1) % buffer.length;
            count++;
            notEmpty.signal(); // 通知可能等待的消費者
        } finally {
            lock.unlock();
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) { // 等待直到緩沖區(qū)非空
                notEmpty.await();
            }
            Object x = buffer[takePtr];
            takePtr = (takePtr + 1) % buffer.length;
            count--;
            notFull.signal(); // 通知可能等待的生產(chǎn)者
            return x;
        } finally {
            lock.unlock();
        }
    }
}

public class ProducerConsumerDemo {
    private final BoundedBuffer buffer;

    public ProducerConsumerDemo(int size) {
        buffer = new BoundedBuffer(size);
    }

    public void produce(String item) {
        buffer.put(item);
    }

    public String consume() {
        return (String) buffer.take();
    }

    public static void main(String[] args) throws InterruptedException {
        final int SIZE = 10;
        final ProducerConsumerDemo demo = new ProducerConsumerDemo(SIZE);

        // 生產(chǎn)者線程
        Thread producerThread = new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                demo.produce("Item " + i);
                try {
                    Thread.sleep(100); // 生產(chǎn)延時
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        // 消費者線程
        Thread consumerThread = new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                String item = demo.consume();
                System.out.println("Consumed: " + item);
                try {
                    Thread.sleep(150); // 消費延時
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        producerThread.start();
        consumerThread.start();

        producerThread.join();
        consumerThread.join();
    }
}
業(yè)務(wù)代碼案例:

業(yè)務(wù)說明: 任務(wù)調(diào)度系統(tǒng)負責(zé)管理和執(zhí)行定時任務(wù)。這些任務(wù)可能包括數(shù)據(jù)備份、報告生成、系統(tǒng)維護等。系統(tǒng)需要能夠按預(yù)定時間觸發(fā)任務(wù),并確保任務(wù)在執(zhí)行時不會相互干擾。

為什么需要 Condition 技術(shù): 在任務(wù)調(diào)度系統(tǒng)中,任務(wù)的觸發(fā)通常依賴于時間,而任務(wù)的執(zhí)行可能需要等待特定條件滿足。 Condition 配合 Lock 使用,可以在沒有任務(wù)可執(zhí)行時讓調(diào)度器線程等待,直到有任務(wù)準(zhǔn)備好執(zhí)行。這種機制允許系統(tǒng)在沒有任務(wù)執(zhí)行需求時保持空閑,從而節(jié)省資源。

沒有 Condition 技術(shù)會帶來什么后果:

沒有使用 Condition 或其他等待/通知機制可能會導(dǎo)致以下問題:

  1. 資源浪費:如果調(diào)度器不斷輪詢檢查新任務(wù),可能會浪費大量 CPU 資源。
  2. 響應(yīng)性差:在新任務(wù)到來時,如果沒有有效的機制來喚醒調(diào)度器,可能會導(dǎo)致任務(wù)執(zhí)行延遲。
  3. 代碼復(fù)雜度:沒有 Condition,可能需要使用更復(fù)雜的多線程同步機制,增加了代碼的復(fù)雜性和出錯的風(fēng)險。

代碼實現(xiàn):

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Date;
import java.util.LinkedList;
import java.util.Queue;

public class TaskScheduler {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition taskAvailable = lock.newCondition();
    private final Queue<Runnable> tasks = new LinkedList<>();

    public void schedule(Runnable task, long delay) {
        lock.lock();
        try {
            tasks.add(() -> {
                try {
                    Thread.sleep(delay);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                task.run();
            });
            taskAvailable.signal(); // 通知調(diào)度器有新任務(wù)
        } finally {
            lock.unlock();
        }
    }

    public void startScheduling() {
        new Thread(this::runScheduler).start();
    }

    private void runScheduler() {
        lock.lock();
        try {
            while (true) {
                while (tasks.isEmpty()) { // 如果沒有任務(wù),等待
                    taskAvailable.await();
                }
                Runnable task = tasks.poll();
                task.run();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        TaskScheduler scheduler = new TaskScheduler();
        scheduler.schedule(() -> System.out.println("Task 1 executed at " + new Date()), 2000);
        scheduler.schedule(() -> System.out.println("Task 2 executed at " + new Date()), 4000);
        scheduler.startScheduling();
    }
}

責(zé)任編輯:武曉燕 來源: Solomon肖哥彈架構(gòu)
相關(guān)推薦

2020-07-06 08:03:32

Java悲觀鎖樂觀鎖

2024-03-18 12:21:28

Java輕量級鎖重量級鎖

2019-11-28 16:00:06

重入鎖讀寫鎖樂觀鎖

2023-10-08 09:34:11

Java編程

2019-01-04 11:18:35

獨享鎖共享鎖非公平鎖

2024-01-29 01:08:01

悲觀鎖遞歸鎖讀寫鎖

2019-10-17 08:51:00

Java悲觀鎖Monitor

2023-07-05 08:18:54

Atomic類樂觀鎖悲觀鎖

2010-07-26 15:17:46

SQL Server鎖

2019-04-12 15:14:44

Python線程

2010-04-16 15:12:12

ORACLE鎖機制

2020-04-24 15:44:50

MySQL數(shù)據(jù)庫鎖機制

2024-10-10 09:40:29

2021-03-31 10:05:26

偏向鎖輕量級鎖

2021-01-15 05:12:14

Java并發(fā)樂觀鎖

2018-07-31 10:10:06

MySQLInnoDB死鎖

2022-03-24 13:36:18

Java悲觀鎖樂觀鎖

2024-12-16 00:52:26

MySQL數(shù)據(jù)庫并發(fā)

2024-11-29 07:38:12

MySQL數(shù)據(jù)庫

2024-02-29 09:44:36

Java工具
點贊
收藏

51CTO技術(shù)棧公眾號