自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

獲取雙異步返回值時(shí),如何保證主線程不阻塞?

開(kāi)發(fā) 后端
ForkJoinPool在于可以充分利用多核CPU的優(yōu)勢(shì),把一個(gè)任務(wù)拆分成多個(gè)小任務(wù),把多個(gè)小任務(wù)放到多個(gè)CPU上并行執(zhí)行,當(dāng)多個(gè)小任務(wù)執(zhí)行完畢后,再將其執(zhí)行結(jié)果合并起來(lái)。

一、前情提要

在上一篇文章中,使用雙異步后,如何保證數(shù)據(jù)一致性?,通過(guò)Future獲取異步返回值,輪詢(xún)判斷Future狀態(tài),如果執(zhí)行完畢或已取消,則通過(guò)get()獲取返回值,get()是阻塞的方法,因此會(huì)阻塞當(dāng)前線程,如果通過(guò)new Runnable()執(zhí)行g(shù)et()方法,那么還是需要返回AsyncResult,然后再通過(guò)主線程去get()獲取異步線程返回結(jié)果。

寫(xiě)法很繁瑣,還會(huì)阻塞主線程。

下面是FutureTask異步執(zhí)行流程圖:

二、JDK8的CompletableFuture

1、ForkJoinPool

Java8中引入了CompletableFuture,它實(shí)現(xiàn)了對(duì)Future的全面升級(jí),可以通過(guò)回調(diào)的方式,獲取異步線程返回值。

CompletableFuture的異步執(zhí)行通過(guò)ForkJoinPool實(shí)現(xiàn), 它使用守護(hù)線程去執(zhí)行任務(wù)。

ForkJoinPool在于可以充分利用多核CPU的優(yōu)勢(shì),把一個(gè)任務(wù)拆分成多個(gè)小任務(wù),把多個(gè)小任務(wù)放到多個(gè)CPU上并行執(zhí)行,當(dāng)多個(gè)小任務(wù)執(zhí)行完畢后,再將其執(zhí)行結(jié)果合并起來(lái)。

Future的異步執(zhí)行是通過(guò)ThreadPoolExecutor實(shí)現(xiàn)的。

2、從ForkJoinPool和ThreadPoolExecutor探索CompletableFuture和Future的區(qū)別

  • ForkJoinPool中的每個(gè)線程都會(huì)有一個(gè)隊(duì)列,而ThreadPoolExecutor只有一個(gè)隊(duì)列,并根據(jù)queue類(lèi)型不同,細(xì)分出各種線程池;
  • ForkJoinPool在使用過(guò)程中,會(huì)創(chuàng)建大量的子任務(wù),會(huì)進(jìn)行大量的gc,但是ThreadPoolExecutor不需要,因?yàn)門(mén)hreadPoolExecutor是任務(wù)分配平均的;
  • ThreadPoolExecutor中每個(gè)異步線程之間是相互獨(dú)立的,當(dāng)執(zhí)行速度快的線程執(zhí)行完畢后,它就會(huì)一直處于空閑的狀態(tài),等待其它線程執(zhí)行完畢;
  • ForkJoinPool中每個(gè)異步線程之間并不是絕對(duì)獨(dú)立的,在ForkJoinPool線程池中會(huì)維護(hù)一個(gè)隊(duì)列來(lái)存放需要執(zhí)行的任務(wù),當(dāng)線程自身任務(wù)執(zhí)行完畢后,它會(huì)從其它線程中獲取未執(zhí)行的任務(wù)并幫助它執(zhí)行,直至所有線程執(zhí)行完畢。

因此,在多線程任務(wù)分配不均時(shí),F(xiàn)orkJoinPool的執(zhí)行效率更高。但是,如果任務(wù)分配均勻,ThreadPoolExecutor的執(zhí)行效率更高,因?yàn)镕orkJoinPool會(huì)創(chuàng)建大量子任務(wù),并對(duì)其進(jìn)行大量的GC,比較耗時(shí)。

三、通過(guò)CompletableFuture優(yōu)化 “通過(guò)Future獲取異步返回值”

1、通過(guò)Future獲取異步返回值關(guān)鍵代碼

(1)將異步方法的返回值改為Future<Integer>,將返回值放到new AsyncResult<>();中

@Async("async-executor")
public void readXls(String filePath, String filename) {
    try {
     // 此代碼為簡(jiǎn)化關(guān)鍵性代碼
        List<Future<Integer>> futureList = new ArrayList<>();
        for (int time = 0; time < times; time++) {
            Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsync();
            futureList.add(sumFuture);
        }
    }catch (Exception e){
        logger.error("readXlsCacheAsync---插入數(shù)據(jù)異常:",e);
    }
}
@Async("async-executor")
public Future<Integer> readXlsCacheAsync() {
    try {
        // 此代碼為簡(jiǎn)化關(guān)鍵性代碼
        return new AsyncResult<>(sum);
    }catch (Exception e){
        return new AsyncResult<>(0);
    }
}

(2)通過(guò)Future<Integer>.get()獲取返回值

public static boolean getFutureResult(List<Future<Integer>> futureList, int excelRow) {
    int[] futureSumArr = new int[futureList.size()];
    for (int i = 0;i<futureList.size();i++) {
        try {
            Future<Integer> future = futureList.get(i);
            while (true) {
                if (future.isDone() && !future.isCancelled()) {
                    Integer futureSum = future.get();
                    logger.info("獲取Future返回值成功"+"----Future:" + future
                            + ",Result:" + futureSum);
                    futureSumArr[i] += futureSum;
                    break;
                } else {
                    logger.info("Future正在執(zhí)行---獲取Future返回值中---等待3秒");
                    Thread.sleep(3000);
                }
            }
        } catch (Exception e) {
            logger.error("獲取Future返回值異常: ", e);
        }
    }
    
    boolean insertFlag = getInsertSum(futureSumArr, excelRow);
    logger.info("獲取所有異步線程Future的返回值成功,Excel插入結(jié)果="+insertFlag);
    return insertFlag;
}

2、通過(guò)CompletableFuture獲取異步返回值關(guān)鍵代碼

(1)將異步方法的返回值改為 int

@Async("async-executor")
public void readXls(String filePath, String filename) {
 List<CompletableFuture<Integer>> completableFutureList = new ArrayList<>();
    for (int time = 0; time < times; time++) {
     // 此代碼為簡(jiǎn)化關(guān)鍵性代碼
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {
         @Override
         public Integer get() {
             return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();
         }
     }).thenApply((result) -> {// 回調(diào)方法
         return thenApplyTest2(result);// supplyAsync返回值 * 1
     }).thenApply((result) -> {
         return thenApplyTest5(result);// thenApply返回值 * 1
     }).exceptionally((e) -> { // 如果執(zhí)行異常:
         logger.error("CompletableFuture.supplyAsync----異常:", e);
         return null;
     });
 
     completableFutureList.add(completableFuture);
    }
}
@Async("async-executor")
public int readXlsCacheAsync() {
    try {
        // 此代碼為簡(jiǎn)化關(guān)鍵性代碼
        return sum;
    }catch (Exception e){
        return -1;
    }
}

(2)通過(guò)completableFuture.get()獲取返回值

public static boolean getCompletableFutureResult(List<CompletableFuture<Integer>> list, int excelRow){
    logger.info("通過(guò)completableFuture.get()獲取每個(gè)異步線程的插入結(jié)果----開(kāi)始");

    int sum = 0;
    for (int i = 0; i < list.size(); i++) {
        Integer result = list.get(i).get();
        sum += result;
    }

    boolean insertFlag = excelRow == sum;
    logger.info("全部執(zhí)行完畢,excelRow={},入庫(kù)={}, 數(shù)據(jù)是否一致={}",excelRow,sum,insertFlag);
    return insertFlag;
}

3、效率對(duì)比

(1)測(cè)試環(huán)境

  • 12個(gè)邏輯處理器的電腦;
  • Excel中包含10萬(wàn)條數(shù)據(jù);
  • Future的自定義線程池,核心線程數(shù)為24;
  • ForkJoinPool的核心線程數(shù)為24;

(2)統(tǒng)計(jì)四種情況下10萬(wàn)數(shù)據(jù)入庫(kù)時(shí)間

  • 不獲取異步返回值
  • 通過(guò)Future獲取異步返回值
  • 通過(guò)CompletableFuture獲取異步返回值,默認(rèn)ForkJoinPool線程池的核心線程數(shù)為本機(jī)邏輯處理器數(shù)量,測(cè)試電腦為12;
  • 通過(guò)CompletableFuture獲取異步返回值,修改ForkJoinPool線程池的核心線程數(shù)為24。

備注:因?yàn)镃ompletableFuture不阻塞主線程,主線程執(zhí)行時(shí)間只有2秒,表格中統(tǒng)計(jì)的是異步線程全部執(zhí)行完成的時(shí)間。

(3)設(shè)置核心線程數(shù)

將核心線程數(shù)CorePoolSize設(shè)置成CPU的處理器數(shù)量,是不是效率最高的?

// 獲取CPU的處理器數(shù)量
int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;// 測(cè)試電腦是24

因?yàn)樵诮涌诒徽{(diào)用后,開(kāi)啟異步線程,執(zhí)行入庫(kù)任務(wù),因?yàn)闇y(cè)試機(jī)最多同時(shí)開(kāi)啟24線程處理任務(wù),故將10萬(wàn)條數(shù)據(jù)拆分成等量的24份,也就是10萬(wàn)/24 = 4166,那么我設(shè)置成4200,是不是效率最佳呢?

測(cè)試的過(guò)程中發(fā)現(xiàn),好像真的是這樣的。

自定義ForkJoinPool線程池
@Autowired
@Qualifier("asyncTaskExecutor")
private Executor asyncTaskExecutor;

@Override
public void readXls(String filePath, String filename) {
  List<CompletableFuture<Integer>> completableFutureList = new ArrayList<>();
    for (int time = 0; time < times; time++) {
  CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {
         @Override
         public Integer get() {
             try {
                 return readExcelDbJdk8Service.readXlsCacheAsync(sheet, row, start, finalEnd, insertBuilder);
             } catch (Exception e) {
                 logger.error("CompletableFuture----readXlsCacheAsync---異常:", e);
                 return -1;
             }
         };
     },asyncTaskExecutor);
 
     completableFutureList.add(completableFuture);
 }

 // 不會(huì)阻塞主線程
    CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()])).whenComplete((r,e) -> {
        try {
            int insertSum = getCompletableFutureResult(completableFutureList, excelRow);
        } catch (Exception ex) {
            return;
        }
    });
}
自定義線程池
/**
 * 自定義異步線程池
 */
@Bean("asyncTaskExecutor")
public AsyncTaskExecutor asyncTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    //設(shè)置線程名稱(chēng)
    executor.setThreadNamePrefix("asyncTask-Executor");
    //設(shè)置最大線程數(shù)
    executor.setMaxPoolSize(200);
    //設(shè)置核心線程數(shù)
    executor.setCorePoolSize(24);
    //設(shè)置線程空閑時(shí)間,默認(rèn)60
    executor.setKeepAliveSeconds(200);
    //設(shè)置隊(duì)列容量
    executor.setQueueCapacity(50);
    /**
     * 當(dāng)線程池的任務(wù)緩存隊(duì)列已滿(mǎn)并且線程池中的線程數(shù)目達(dá)到maximumPoolSize,如果還有任務(wù)到來(lái)就會(huì)采取任務(wù)拒絕策略
     * 通常有以下四種策略:
     * ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。
     * ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。
     * ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程)
     * ThreadPoolExecutor.CallerRunsPolicy:重試添加當(dāng)前的任務(wù),自動(dòng)重復(fù)調(diào)用 execute() 方法,直到成功
     */
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.initialize();
    return executor;
}

(4)統(tǒng)計(jì)分析

效率對(duì)比:

③通過(guò)CompletableFuture獲取異步返回值(12線程) <  ②通過(guò)Future獲取異步返回值 <  ④通過(guò)CompletableFuture獲取異步返回值(24線程) <  ①不獲取異步返回值

不獲取異步返回值時(shí)性能最優(yōu),這不廢話(huà)嘛~

核心線程數(shù)相同的情況下,CompletableFuture的入庫(kù)效率要優(yōu)于Future的入庫(kù)效率,10萬(wàn)條數(shù)據(jù)大概要快4秒鐘,這還是相當(dāng)驚人的,優(yōu)化的價(jià)值就在于此。

四、通過(guò)CompletableFuture.allOf解決阻塞主線程問(wèn)題

1、語(yǔ)法

CompletableFuture.allOf(CompletableFuture的可變數(shù)組).whenComplete((r,e) -> {})。

2、代碼實(shí)例

getCompletableFutureResult方法在 “3.2.2 通過(guò)completableFuture.get()獲取返回值”。

// 不會(huì)阻塞主線程
CompletableFuture.allOf(completableFutureList.toArray(new   CompletableFuture[completableFutureList.size()])).whenComplete((r,e) -> {
    logger.info("全部執(zhí)行完畢,解決主線程阻塞問(wèn)題~");
    try {
        int insertSum = getCompletableFutureResult(completableFutureList, excelRow);
    } catch (Exception ex) {
        logger.error("全部執(zhí)行完畢,解決主線程阻塞問(wèn)題,異常:", ex);
        return;
    }
});

// 會(huì)阻塞主線程
//getCompletableFutureResult(completableFutureList, excelRow);

logger.info("CompletableFuture----會(huì)阻塞主線程嗎?");

五、CompletableFuture中花俏的語(yǔ)法糖

1、runAsync

runAsync 方法不支持返回值。

可以通過(guò)runAsync執(zhí)行沒(méi)有返回值的異步方法。

不會(huì)阻塞主線程。

// 分批異步讀取Excel內(nèi)容并入庫(kù)
int finalEnd = end;
CompletableFuture.runAsync(() -> readExcelDbJdk8Service.readXlsCacheAsyncMybatis();

2、supplyAsync

supplyAsync也可以異步處理任務(wù),傳入的對(duì)象實(shí)現(xiàn)了Supplier接口。將Supplier作為參數(shù)并返回CompletableFuture結(jié)果值,這意味著它不接受任何輸入?yún)?shù),而是將result作為輸出返回。

會(huì)阻塞主線程。

supplyAsync()方法關(guān)鍵代碼:

int finalEnd = end;
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {
    @Override
    public Integer get() {
        return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();
    }
});
@Override
public int readXlsCacheAsyncMybatis() {
    // 不為人知的操作
    // 返回異步方法執(zhí)行結(jié)果即可
 return 100;
}

六、順序執(zhí)行異步任務(wù)

1、thenRun

thenRun()不接受參數(shù),也沒(méi)有返回值,與runAsync()配套使用,恰到好處。

// JDK8的CompletableFuture
CompletableFuture.runAsync(() -> readExcelDbJdk8Service.readXlsCacheAsyncMybatis())
.thenRun(() -> logger.info("CompletableFuture----.thenRun()方法測(cè)試"));

2、thenAccept

thenAccept()接受參數(shù),沒(méi)有返回值。

supplyAsync + thenAccept

  • 異步線程順序執(zhí)行
  • supplyAsync的異步返回值,可以作為thenAccept的參數(shù)使用
  • 不會(huì)阻塞主線程
CompletableFuture.supplyAsync(new Supplier<Integer>() {
    @Override
    public Integer get() {
        return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();
    }
}).thenAccept(x -> logger.info(".thenAccept()方法測(cè)試:" + x));

但是,此時(shí)無(wú)法通過(guò)completableFuture.get()獲取supplyAsync的返回值了。

3、thenApply

thenApply在thenAccept的基礎(chǔ)上,可以再次通過(guò)completableFuture.get()獲取返回值。

supplyAsync + thenApply,典型的鏈?zhǔn)骄幊獭?/p>

  • 異步線程內(nèi)方法順序執(zhí)行。
  • supplyAsync 的返回值,作為第 1 個(gè)thenApply的參數(shù),進(jìn)行業(yè)務(wù)處理。
  • 第 1 個(gè)thenApply的返回值,作為第 2 個(gè)thenApply的參數(shù),進(jìn)行業(yè)務(wù)處理。
  • 最后,通過(guò)future.get()方法獲取最終的返回值。
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {
 @Override
    public Integer get() {
        return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();
    }
}).thenApply((result) -> {
    return thenApplyTest2(result);// supplyAsync返回值 * 2
}).thenApply((result) -> {
    return thenApplyTest5(result);// thenApply返回值 * 5
});

logger.info("readXlsCacheAsyncMybatis插入數(shù)據(jù) * 2 * 5 = " + completableFuture.get());

七、CompletableFuture合并任務(wù)

  • thenCombine,多個(gè)異步任務(wù)并行處理,有返回值,最后合并結(jié)果返回新的CompletableFuture對(duì)象。
  • thenAcceptBoth,多個(gè)異步任務(wù)并行處理,無(wú)返回值。
  • acceptEither,多個(gè)異步任務(wù)并行處理,無(wú)返回值。
  • applyToEither,,多個(gè)異步任務(wù)并行處理,有返回值。

CompletableFuture合并任務(wù)的代碼實(shí)例,這里就不多贅述了,一些語(yǔ)法糖而已,大家切記陷入低水平勤奮的怪圈。

八、CompletableFuture VS Future總結(jié)

本文中以下幾個(gè)方面對(duì)比了CompletableFuture和Future的差異:

  • ForkJoinPool和ThreadPoolExecutor的實(shí)現(xiàn)原理,探索了CompletableFuture和Future的差異。
  • 通過(guò)代碼實(shí)例的形式簡(jiǎn)單介紹了CompletableFuture中花俏的語(yǔ)法糖。
  • 通過(guò)CompletableFuture優(yōu)化了 “通過(guò)Future獲取異步返回值”。
  • 通過(guò)CompletableFuture.allOf解決阻塞主線程問(wèn)題。

Future提供了異步執(zhí)行的能力,但Future.get()會(huì)通過(guò)輪詢(xún)的方式獲取異步返回值,get()方法還會(huì)阻塞主線程。

輪詢(xún)的方式非常消耗CPU資源,阻塞的方式顯然與我們的異步初衷背道而馳。

JDK8提供的CompletableFuture實(shí)現(xiàn)了Future接口,添加了很多Future不具備的功能,比如鏈?zhǔn)骄幊?、異常處理回調(diào)函數(shù)、獲取異步結(jié)果不阻塞不輪詢(xún)、合并異步任務(wù)等。

獲取異步線程結(jié)果后,我們可以通過(guò)添加事務(wù)的方式,實(shí)現(xiàn)Excel入庫(kù)操作的數(shù)據(jù)一致性。

異步多線程情況下如何實(shí)現(xiàn)事務(wù)?

有的小伙伴可能會(huì)說(shuō):

這還不簡(jiǎn)單?添加@Transactional注解,如果發(fā)生異?;蛉霂?kù)數(shù)據(jù)量不符,直接回滾就可以了~

那么,真的是這樣嗎?我們下期見(jiàn)~

責(zé)任編輯:姜華 來(lái)源: 哪吒編程
相關(guān)推薦

2010-03-15 18:34:08

Java多線程

2022-07-06 07:08:58

CPythonPython返回值

2010-03-17 17:37:17

Java線程返回值

2020-10-26 09:19:11

線程池消息

2023-08-07 14:52:33

WindowsExplorer進(jìn)程

2021-08-13 11:31:23

HTTP

2024-01-22 08:52:00

AQS雙異步數(shù)據(jù)一致性

2009-12-25 17:21:13

ADO返回值

2010-07-09 13:20:37

HART協(xié)議

2009-12-07 11:11:41

WCF返回值

2010-07-21 10:32:05

Perl函數(shù)返回值

2024-06-17 00:02:00

線程安全HashMapJDK 1.7

2023-01-26 02:07:51

HashSet線程安全

2024-02-26 08:10:00

Redis數(shù)據(jù)數(shù)據(jù)庫(kù)

2009-11-17 16:16:59

PHP遞歸函數(shù)

2009-09-07 03:07:11

C# Main方法

2024-11-11 07:05:00

Redis哨兵模式主從復(fù)制

2024-08-30 08:23:06

2022-02-23 13:31:26

RVO編譯器優(yōu)化

2010-03-02 16:50:34

WCF返回值
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)