自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Java 并發(fā)流程工具的實戰(zhàn)探索

開發(fā) 前端
本文將踏上Java并發(fā)流程工具的實戰(zhàn)探索之旅。我們不僅會深入剖析這些工具的核心原理,更會通過實際代碼示例,詳細(xì)展示它們在不同應(yīng)用場景中的具體應(yīng)用。

在當(dāng)今數(shù)字化時代,軟件系統(tǒng)面臨著日益增長的高并發(fā)需求。無論是大型電商平臺在促銷活動時處理海量的訂單請求,還是在線游戲服務(wù)器同時承載眾多玩家的交互操作,高效的并發(fā)處理能力都成為了系統(tǒng)性能和穩(wěn)定性的關(guān)鍵因素。

Java作為一門廣泛應(yīng)用于企業(yè)級開發(fā)的編程語言,提供了豐富且強(qiáng)大的并發(fā)流程工具。這些工具猶如精巧的齒輪,相互配合,使開發(fā)者能夠在多線程環(huán)境下有條不紊地控制程序的執(zhí)行流程,確保各個線程之間協(xié)調(diào)運作,高效完成復(fù)雜的任務(wù)。

本文將踏上Java并發(fā)流程工具的實戰(zhàn)探索之旅。我們不僅會深入剖析這些工具的核心原理,更會通過實際代碼示例,詳細(xì)展示它們在不同應(yīng)用場景中的具體應(yīng)用。從簡單的線程同步控制,到復(fù)雜的多階段任務(wù)協(xié)調(diào),一步步揭開Java并發(fā)流程工具的神秘面紗,幫助讀者掌握在實際項目中運用這些工具優(yōu)化程序性能、提升系統(tǒng)可靠性的技巧。

一、CountDownLatch

1. 詳解CountDownLatch工作流程

筆者一般稱CountDownLatch為倒計時門閂,它主要用于需要某些條件下才能喚醒的需求場景,例如我們線程1必須等到線程2做完某些事,那么就可以設(shè)置一個CountDownLatch并將數(shù)值設(shè)置為1,一旦線程2完成業(yè)務(wù)邏輯后,將數(shù)值修改為0,此時線程1就會被喚醒:

2. 模擬等待工作完成

通過上述的描述可能有點抽象,我們直接通過幾個例子演示一下,我們現(xiàn)在有這樣一個需求,希望等待5個線程完成之后,打印輸出一句工作完成:

對應(yīng)的代碼示例如下,可以看到我們創(chuàng)建了數(shù)值為5的CountDownLatch ,一旦線程池里的線程完成工作后就調(diào)用countDown進(jìn)行扣減,一旦數(shù)值變?yōu)?,主線程await就會放行,執(zhí)行后續(xù)輸出:

int workerSize = 5;
        CountDownLatch workCount = new CountDownLatch(workerSize);
        ExecutorService threadPool = Executors.newFixedThreadPool(workerSize);

        for (int i = 0; i < workerSize; i++) {
            final int workerNum = i;
            //5個工人輸出完成工作后,扣減倒計時門閂數(shù)
            threadPool.submit(() -> {
                log.info("worker[{}]完成手頭的工作", workerNum);
                workCount.countDown();
            });
        }

        try {
            //阻塞當(dāng)前線程(主線程)往后走,只有倒計時門閂變?yōu)?之后才能繼續(xù)后續(xù)邏輯
            log.info("等待worker工作完成");
            workCount.await();
        } catch (InterruptedException e) {
            log.info("倒計時門閂阻塞失敗,失敗原因[{}]", e.getMessage(), e);
        }

        threadPool.shutdown();
        while (!threadPool.isTerminated()) {

        }

        log.info("所有工人都完成手頭的工作了");

對應(yīng)的我們也給出輸出結(jié)果,可以看到主線程在線程池線程完成后才輸出:

3. 模擬運動員賽跑

實際上CountDownLatch可以讓多個線程進(jìn)行等待,我們不妨用線程模擬一下所有運動員就緒后,等待槍響后起跑的場景:

代碼如下,每當(dāng)運動員即線程池的線程準(zhǔn)備就緒,則調(diào)用await等待槍響,一旦所有運動員就緒之后,主線程調(diào)用countDown模擬槍響,然后運動員起跑:

public static void main(String[] args) {
        log.info("百米跑比賽開始");

        int playerNum = 3;
        CountDownLatch gun = new CountDownLatch(1);
        ExecutorService threadPool = Executors.newFixedThreadPool(playerNum);
        
        for (int i = 0; i < playerNum; i++) {
            final int playNo = i;
            
            threadPool.submit(() -> {
                log.info("[{}]號運動員已就緒", playNo);
                try {
                    gun.await();
                } catch (InterruptedException e) {
                    log.info("[{}]號運動員線程阻塞失敗,失敗原因[{}]", playNo, e.getMessage(), e);
                }
                log.info("[{}]號運動員已經(jīng)到達(dá)重點", playNo);
            });
        }

        //按下槍 所有運動員起跑
        gun.countDown();

        threadPool.shutdown();
        while (!threadPool.isTerminated()) {

        }

        log.info("百米賽跑已結(jié)束");
    }

對應(yīng)的我們也給出相應(yīng)的輸出結(jié)果:

4. 從源碼角度分析CountDownLatch工作流程

我們以等待所有工人完成工作的例子進(jìn)行解析,實際上在CountDownLatch是通過state和一個抽象隊列即aqs完成多線程之間的流程調(diào)度,主線程調(diào)用await方法等待其他worker線程,如果其它worker線程沒有完成工作,那么CountDownLatch就會將其存入抽象隊列中。

一旦其他線程將state設(shè)置為0時,await對應(yīng)的線程就會從抽象隊列中釋放并喚醒:

對應(yīng)我們給出countDown的實現(xiàn),可以看到該方法底層就是將aqs隊列中的state進(jìn)行扣減:

public void countDown() {
        sync.releaseShared(1);
    }

//releaseShared內(nèi)部核心邏輯就是將state扣減1
protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                //扣減state并通過cas修改賦值
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

而countDown本質(zhì)上就是查看這個state,如果state被扣減為0,則調(diào)用aqs底層doReleaseShared方法將隊列中等待線程喚醒:

public void countDown() {
        sync.releaseShared(1);
    }


public final boolean releaseShared(int arg) {
  //查看是否扣減為0
        if (tryReleaseShared(arg)) {
        //如果是0則將當(dāng)前等待線程喚醒
            doReleaseShared();
            return true;
        }
        return false;
    }

上文講解countDown涉及一些關(guān)于AQS的實用理解和設(shè)計,關(guān)于更多AQS的知識點,感興趣的讀者可以閱讀一下筆者的這篇文章:《AQS 源碼解析:原理與實踐

二、Semaphore

1. 詳解Semaphore

信號量多用于限流的場景,例如我們希望單位時間內(nèi)只能有一個線程工作,我們就可以使用信號量,只有拿到線程的信號量才能工作,工作完成后釋放信號量,其余線程才能爭搶這個信號量并進(jìn)行進(jìn)一步的操作。 對應(yīng)我們給出下面這段代碼,可以看到生命信號量數(shù)值為6,每當(dāng)線程拿到3個信號量之后就會執(zhí)行業(yè)務(wù)操作,完成后調(diào)用release釋放3個令牌,讓其他線程繼續(xù)爭搶:

//設(shè)置可復(fù)用的信號量,令牌數(shù)為3
        Semaphore semaphore = new Semaphore(6, true);
        //創(chuàng)建5個線程
        int workSize = 5;
        ExecutorService executorService = Executors.newFixedThreadPool(workSize);


        for (int i = 0; i < workSize; i++) {
            executorService.submit(() -> {
                try {
                    //拿3個令牌
                    semaphore.acquire(3);

                    log.info("進(jìn)行業(yè)務(wù)邏輯處理.......");
                    ThreadUtil.sleep(1000);

                    //釋放3個令牌
                    semaphore.release(3);

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        executorService.shutdown();
        while (!executorService.isTerminated()) {

        }

對應(yīng)輸出結(jié)果如下,可以看到每個線程拿到令牌后都會休眠1秒,從輸出結(jié)果來看每秒只有兩個線程才工作,符合我們的限流需求:

2. 詳解Semaphore工作原理

Semaphore底層也是用到的aqs隊列,線程進(jìn)行資源獲取時也是通過查看state是否足夠,在明確足夠的情況下進(jìn)行state扣減,然后進(jìn)行工作。如果線程發(fā)現(xiàn)state數(shù)量不夠,那么就會被Semaphore存入aqs底層的抽象隊列中,直到state數(shù)量足夠后被喚醒:

對此我們給出Semaphore底層的acquire的邏輯可以看到,它會讀取state數(shù)值然后進(jìn)行扣減,如果剩余數(shù)量大于0則說明令牌獲取成功線程可以執(zhí)行后續(xù)邏輯,反之說明當(dāng)前令牌數(shù)不夠,外部邏輯會將該線程掛到等待隊列中,等待令牌足夠后將其喚醒:

protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                //讀取可用的state    
                int available = getState();
                //計算剩余的state
                int remaining = available - acquires;
                //如果小于0說明令牌數(shù)不足直接返回出去,讓外部將線程掛起,反之通過cas修改剩余數(shù),返回大于0的結(jié)果讓持有令牌的線程執(zhí)行后續(xù)邏輯
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

3. Semaphore使用注意事項

  • 獲取和釋放的時候都可以指定數(shù)量,但是要保持一致。
  • 公平性設(shè)置為true會更加合理
  • 并不必須由獲取許可證的線程釋放許可證??梢允茿獲取,B釋放。

三、Condition

1. 詳解Condition

Condition即條件對象,不是很常用或者直接用到的對象,常用于線程等待喚醒操作,例如A線程需要等待某個條件的時候,我們可以通過condition.await()方法,A線程就會進(jìn)入阻塞狀態(tài)。

線程B執(zhí)行condition.signal()方法,則JVM就會從被阻塞線程中找到等待該condition的線程。線程A收到可執(zhí)行信號的時候,他的線程狀態(tài)就會變成Runnable可執(zhí)行狀態(tài)。

對此我們給出代碼示例,可以看到我們從ReentrantLock 中拿到一個Condition 對象,讓創(chuàng)建的線程進(jìn)入等待狀態(tài),隨后讓主線程調(diào)用condition 的signal將其喚醒:

private ReentrantLock lock = new ReentrantLock();
    //條件對象,操控線程的等待和通知
    private Condition condition = lock.newCondition();

    public void waitCondition() throws InterruptedException {
        lock.lock();
        try {
            log.info("等待達(dá)到條件后通知");
            condition.await();
            log.info("收到通知,開始執(zhí)行業(yè)務(wù)邏輯");
        } finally {
            lock.unlock();
            log.info("執(zhí)行完成,釋放鎖");
        }
    }


    public void notifyCondition() throws InterruptedException {
        lock.lock();
        try {
            log.info("達(dá)到條件發(fā)起通知");
            condition.signal();
            log.info("發(fā)起通知結(jié)束");
        } finally {
            lock.unlock();
            log.info("發(fā)起通知執(zhí)行完成,釋放鎖");
        }
    }


    public static void main(String[] args) throws InterruptedException {
        Main obj = new Main();

        new Thread(() -> {
            try {
                obj.waitCondition();
                //讓出CPU時間片,交給主線程發(fā)起通知
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                log.error("等待條件通知設(shè)置失敗,失敗原因 [{}]", e.getMessage(), e);
            }
        }).start();

        //休眠3s喚醒等待線程
        Thread.sleep(3000);
        obj.notifyCondition();
    }

對應(yīng)的我們也給出輸出結(jié)果:

2. 基于條件對象完成生產(chǎn)者、消費者模式

我們假設(shè)用一個隊列存放一波生產(chǎn)者生產(chǎn)的資源,當(dāng)資源滿了通知消費者消費。當(dāng)消費者消費空了,通知生產(chǎn)者生產(chǎn)。

所以這時候使用condition控制流程最合適(這也是阻塞的隊列內(nèi)部的實現(xiàn)),所以我們要定義兩個信號,分別為:

  • 當(dāng)資源被耗盡,我們就使用資源未滿條件(notFull): 調(diào)用signal通知生產(chǎn)者消費,消費者調(diào)用await進(jìn)入等待。
  • 當(dāng)資源被填滿,使用資源為空條件(notEmpty):將生產(chǎn)者用await方法掛起,消費者用signal喚醒消費告知非空。

很明顯生產(chǎn)者和消費者本質(zhì)上就是基于這兩個標(biāo)識分別標(biāo)志自己的等待時機(jī)和通知時機(jī),以生產(chǎn)者為例,即每生產(chǎn)一個資源后就可以調(diào)用notEmpty通知消費者消費,當(dāng)生產(chǎn)者速度過快,則用await等待未滿notFull條件阻塞:

首先我們給出生產(chǎn)者和消費者條件和資源隊列聲明,基于上述條件我們給出一個經(jīng)典的生產(chǎn)者和消費者模式的示例,我們首先給出生產(chǎn)者代碼,可以看到資源滿的時候調(diào)用notFull.await();將自己掛起等待未滿,生產(chǎn)資源后調(diào)用 notEmpty.signal();通知消費者消費。

對應(yīng)消費者示例代碼也是一樣,當(dāng)資源消費完全,調(diào)用notEmpty.await();等待不空,一旦消費定量資源調(diào)用notFull.signal();通知生產(chǎn)者生產(chǎn)。

最終代碼示例如下:

@Slf4j
public class ProducerMode {

    //鎖
    private static ReentrantLock lock = new ReentrantLock();
    // 資源未滿
    private Condition notFull = lock.newCondition();
    //資源為空
    private Condition notEmpty = lock.newCondition();

    private Queue<Integer> queue = new PriorityQueue<>(10);
    private int queueMaxSize = 10;

    /**
     * 生產(chǎn)者
     */
    private class Producer extends Thread {
        @Override
        public void run() {

            while (true) {
                lock.lock();

                try {
                    if (queueMaxSize == queue.size()) {
                        log.info("當(dāng)前隊列已滿,通知消費者消費");
                        //等待不滿條件觸發(fā)
                        notFull.await();

                    }

                    queue.offer(1);
                    log.info("生產(chǎn)者補(bǔ)貨,當(dāng)前隊列有 【{}】", queue.size());
                    //通知消費者隊列不空,可以消費
                    notEmpty.signal();
                } catch (Exception e) {
                    log.error("生產(chǎn)者報錯,失敗原因 [{}]", e.getMessage(), e);
                } finally {
                    lock.unlock();
                }


            }
        }



    }

    /**
     * 消費者
     */
    private class Consumer extends Thread {
        @Override
        public void run() {

            while (true) {
                lock.lock();

                try {
                    if (0 == queue.size()) {
                        log.info("當(dāng)前隊列已空,通知生產(chǎn)者補(bǔ)貨");
                        //等待不空條件達(dá)到
                        notEmpty.await();

                    }

                    queue.poll();
                    //通知消費者不滿了
                    notFull.signal();
                    log.info("消費者完成消費,當(dāng)前隊列還剩余 【{}】個元素", queue.size());
                } catch (Exception e) {
                    log.error("生產(chǎn)者報錯,失敗原因 [{}]", e.getMessage(), e);
                } finally {
                    lock.unlock();
                }


            }
        }
    }


    public static void main(String[] args) {
        ProducerMode mode = new ProducerMode();
        Producer producer = mode.new Producer();
        ProducerMode.Consumer consumer = mode.new Consumer();
        producer.start();
        consumer.start();
    }
}

對應(yīng)的我們給出輸出結(jié)果:

四、CyclicBarrier

1. CyclicBarrier 原理和使用示例

CyclicBarrier 也就是循環(huán)柵欄對象,不是很常用,它主要用于等待線程數(shù)就緒后執(zhí)行公共邏輯的業(yè)務(wù)場景。 例如我們希望每湊齊5個線程后執(zhí)行后續(xù)邏輯,我們就可以說明CyclicBarrier 數(shù)值為5,然后每個線程到期后調(diào)用await等待其他線程就緒。

一旦到齊5個,CyclicBarrier 就會通知這些線程開始工作,對應(yīng)的代碼如下所示:

public static void main(String[] args) throws InterruptedException {
        int threadCount = 5;
        CyclicBarrier barrier = new CyclicBarrier(threadCount);

        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                System.out.println("線程 " + Thread.currentThread().getName() + " 開始執(zhí)行任務(wù)");
                try {
                    // 模擬執(zhí)行任務(wù)
                    Thread.sleep(1000);
                    System.out.println("線程 " + Thread.currentThread().getName() + " 到達(dá)屏障");
                    barrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }

                System.out.println("所有線程都到達(dá)屏障,一起繼續(xù)執(zhí)行");
            }).start();
        }
    }

對應(yīng)的我們給出相應(yīng)輸出示例:

2. CyclicBarrier 與CountDownLatch區(qū)別(重點)

  • CountDownLatch用戶事件即主要是業(yè)務(wù)流程上的控制并不是針對線程,CyclicBarrier 循環(huán)柵欄作用于線程,如上代碼必須等待線程到齊后觸發(fā)。
  • 循環(huán)柵欄可重復(fù)使用,CountDownLatch則不能。

五、小結(jié)

通過本次對Java并發(fā)流程工具的實戰(zhàn)探索,我們對Java并發(fā)編程領(lǐng)域有了更為深入且全面的認(rèn)知。 從CountDownLatch到CyclicBarrier,再到Semaphore和Exchanger等工具,每一個都在多線程協(xié)作場景中有著獨特的用途。

  • CountDownLatch如同倒計時器,能讓一組線程等待某個特定事件完成后再繼續(xù)執(zhí)行;
  • CyclicBarrier則像聚會的召集者,使多個線程在特定點上匯聚,然后一起繼續(xù)前行;
  • Semaphore猶如資源的守護(hù)者,精確控制著對有限資源的訪問;
  • Exchanger為兩個線程之間的數(shù)據(jù)交換提供了安全高效的通道。

在實際的代碼實踐中,我們看到這些工具如何巧妙地解決多線程協(xié)作中復(fù)雜的同步和通信問題,極大地提高了程序的并發(fā)處理能力和性能。不僅如此,我們還學(xué)會了根據(jù)不同的業(yè)務(wù)場景,如任務(wù)并行化、資源管理、數(shù)據(jù)交換等,選擇最合適的并發(fā)流程工具,以實現(xiàn)最優(yōu)的解決方案。

然而,Java并發(fā)編程是一個廣闊且復(fù)雜的領(lǐng)域,這些工具在帶來便利的同時,也要求我們對線程安全、資源競爭等問題保持高度警惕。在使用過程中,必須深入理解其原理和潛在風(fēng)險,確保代碼的正確性和穩(wěn)定性。

希望本次的探索能為你在Java并發(fā)編程的道路上點亮一盞明燈,在未來面對各種并發(fā)挑戰(zhàn)時,你能夠熟練運用這些工具,編寫出高效、可靠且易于維護(hù)的多線程程序,為構(gòu)建更強(qiáng)大、更具競爭力的軟件系統(tǒng)奠定堅實的基礎(chǔ)。

責(zé)任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關(guān)推薦

2023-05-15 08:12:38

2023-10-18 09:27:58

Java編程

2019-11-12 09:32:35

高并發(fā)流量協(xié)議

2022-08-04 20:41:42

高并發(fā)流量SQL

2025-03-24 09:57:19

2024-01-31 08:50:41

Guava并發(fā)工具

2023-12-04 13:48:00

編 程Atomic

2021-10-08 08:55:23

FacebookBGP工具

2024-09-29 11:07:46

2024-07-02 11:32:38

2019-06-26 07:11:35

Java流程監(jiān)控開發(fā)

2023-08-25 09:36:43

Java編程

2024-04-07 00:04:00

Go語言Map

2023-04-09 16:34:49

JavaSemaphore開發(fā)

2020-11-30 16:01:03

Semaphore

2020-12-03 11:15:21

CyclicBarri

2018-05-28 14:37:05

數(shù)據(jù)庫NoSQL高并發(fā)

2024-04-10 08:16:20

多線程編程Java并發(fā)編程

2021-02-03 06:15:26

工具postManHttp

2025-03-21 09:01:34

Swift任務(wù)取消機(jī)制協(xié)作式取消
點贊
收藏

51CTO技術(shù)棧公眾號