自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

擁抱Java 8并行流吧,速度飛起!

開發(fā) 后端
Java8 為我們提供了并行流,可以一鍵開啟并行模式。是不是很酷呢?讓我們來看看吧。

[[334438]]

前言

在 Java7 之前,如果想要并行處理一個集合,我們需要以下幾步:

  1.  手動分成幾部分 
  2.  為每部分創(chuàng)建線程
  3.  在適當?shù)臅r候合并。并且還需要關注多個線程之間共享變量的修改問題。

而 Java8 為我們提供了并行流,可以一鍵開啟并行模式。是不是很酷呢?讓我們來看看吧

并行流

認識和開啟并行流

什么是并行流: 并行流就是將一個流的內(nèi)容分成多個數(shù)據(jù)塊,并用不同的線程分別處理每個不同數(shù)據(jù)塊的流。例如有這么一個需求:

有一個 List 集合,而 list 中每個 apple 對象只有重量,我們也知道 apple 的單價是 5元/kg,現(xiàn)在需要計算出每個 apple 的單價,傳統(tǒng)的方式是這樣: 

  1. List<Apple> appleList = new ArrayList<>(); // 假裝數(shù)據(jù)是從庫里查出來的  
  2. for (Apple apple : appleList) {  
  3.     apple.setPrice(5.0 * apple.getWeight() / 1000);  

我們通過迭代器遍歷 list 中的 apple 對象,完成了每個 apple 價格的計算。而這個算法的時間復雜度是 O(list.size()) 隨著 list 大小的增加,耗時也會跟著線性增加。并行流可以大大縮短這個時間。

并行流處理該集合的方法如下: 

  1. appleList.parallelStream().forEach(apple -> apple.setPrice(5.0 * apple.getWeight() / 1000)); 

和普通流的區(qū)別是這里調(diào)用的 parallelStream() 方法。當然也可以通過 stream.parallel() 將普通流轉換成并行流。推薦看下:Java 8 創(chuàng)建 Stream 的 10 種方式,更多可以關注Java技術棧公眾號回復java獲取系列教程。

并行流也能通過 sequential() 方法轉換為順序流,但要注意:流的并行和順序轉換不會對流本身做任何實際的變化,僅僅是打了個標記而已。并且在一條流水線上對流進行多次并行 / 順序的轉換,生效的是最后一次的方法調(diào)用

并行流如此方便,它的線程從那里來呢?有多少個?怎么配置呢?

并行流內(nèi)部使用了默認的 ForkJoinPool 線程池。默認的線程數(shù)量就是處理器的核心數(shù),而配置系統(tǒng)核心屬性:java.util.concurrent.ForkJoinPool.common.parallelism 可以改變線程池大小。不過該值是全局變量。

改變他會影響所有并行流。目前還無法為每個流配置專屬的線程數(shù)。一般來說采用處理器核心數(shù)是不錯的選擇

測試并行流的性能

為了更容易的測試性能,我們在每次計算完蘋果價格后,讓線程睡 1s,表示在這期間執(zhí)行了其他 IO 相關的操作,并輸出程序執(zhí)行耗時,順序執(zhí)行的耗時: 

  1. public static void main(String[] args) throws InterruptedException {  
  2.     List<Apple> appleList = initAppleList();  
  3.     Date begin = new Date();  
  4.     for (Apple apple : appleList) {  
  5.         apple.setPrice(5.0 * apple.getWeight() / 1000);  
  6.         Thread.sleep(1000);  
  7.     }  
  8.     Date end = new Date();  
  9.     log.info("蘋果數(shù)量:{}個, 耗時:{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000);  

并行版本 

  1. List<Apple> appleList = initAppleList();  
  2. Date begin = new Date();  
  3. appleList.parallelStream()  
  4. .forEach(apple ->  
  5.          {  
  6.              apple.setPrice(5.0 * apple.getWeight() / 1000);  
  7.              try {  
  8.                  Thread.sleep(1000);  
  9.              } catch (InterruptedException e) {  
  10.                  e.printStackTrace();  
  11.              }  
  12.          }  
  13.         );  
  14. Date end = new Date();  
  15. log.info("蘋果數(shù)量:{}個, 耗時:{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000); 

耗時情況

跟我們的預測一致,我的電腦是 四核I5 處理器,開啟并行后四個處理器每人執(zhí)行一個線程,最后 1s 完成了任務!

并行流可以隨便用嗎?

可拆分性影響流的速度

通過上面的測試,有的人會輕易得到一個結論:并行流很快,我們可以完全放棄 foreach/fori/iter 外部迭代,使用 Stream 提供的內(nèi)部迭代來實現(xiàn)了。

事實真的是這樣嗎?并行流真的如此完美嗎?答案當然是否定的。大家可以復制下面的代碼,在自己的電腦上測試。測試完后可以發(fā)現(xiàn),并行流并不總是最快的處理方式。

  1.  對于 iterate 方法來處理的前 n 個數(shù)字來說,不管并行與否,它總是慢于循環(huán)的,非并行版本可以理解為流化操作沒有循環(huán)更偏向底層導致的慢??刹⑿邪姹臼菫槭裁绰兀窟@里有兩個需要注意的點:

      2.  iterate 生成的是裝箱的對象,必須拆箱成數(shù)字才能求和

      3.  我們很難把 iterate 分成多個獨立的塊來并行執(zhí)行

          這個問題很有意思,我們必須意識到某些流操作比其他操作更容易并行化。對于 iterate 來說,每次應用這個函數(shù)都要依賴于前一次應用的結果。因此在這種情況下,我們不僅不能有效的將流劃分成小塊處理。反而還因為并行化再次增加了開支。

      4.  而對于 LongStream.rangeClosed() 方法來說,就不存在 iterate 的第兩個痛點了。它生成的是基本類型的值,不用拆裝箱操作,另外它可以直接將要生成的數(shù)字 1 - n 拆分成 1 - n/4, 1n/4 - 2n/4, ... 3n/4 - n 這樣四部分。因此并行狀態(tài)下的 rangeClosed() 是快于 for 循環(huán)外部迭代的 

  1. package lambdasinaction.chap7;  
  2. import java.util.stream.*;  
  3. public class ParallelStreams {  
  4.     public static long iterativeSum(long n) {  
  5.         long result = 0 
  6.         for (long i = 0; i <= n; i++) {  
  7.             result += i;  
  8.         }  
  9.         return result;  
  10.     }  
  11.     public static long sequentialSum(long n) {  
  12.         return Stream.iterate(1L, i -> i + 1).limit(n).reduce(Long::sum).get();  
  13.     }  
  14.     public static long parallelSum(long n) {  
  15.         return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(Long::sum).get();  
  16.     }  
  17.     public static long rangedSum(long n) {  
  18.         return LongStream.rangeClosed(1, n).reduce(Long::sum).getAsLong();  
  19.     }  
  20.     public static long parallelRangedSum(long n) {  
  21.         return LongStream.rangeClosed(1, n).parallel().reduce(Long::sum).getAsLong();  
  22.     }  
  23.  
  24. package lambdasinaction.chap7;  
  25. import java.util.concurrent.*;  
  26. import java.util.function.*;  
  27. public class ParallelStreamsHarness {  
  28.     public static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool();  
  29.     public static void main(String[] args) {  
  30.         System.out.println("Iterative Sum done in: " + measurePerf(ParallelStreams::iterativeSum, 10_000_000L) + " msecs");  
  31.         System.out.println("Sequential Sum done in: " + measurePerf(ParallelStreams::sequentialSum, 10_000_000L) + " msecs");  
  32.         System.out.println("Parallel forkJoinSum done in: " + measurePerf(ParallelStreams::parallelSum, 10_000_000L) + " msecs" );  
  33.         System.out.println("Range forkJoinSum done in: " + measurePerf(ParallelStreams::rangedSum, 10_000_000L) + " msecs");  
  34.         System.out.println("Parallel range forkJoinSum done in: " + measurePerf(ParallelStreams::parallelRangedSum, 10_000_000L) + " msecs" );  
  35.     }  
  36.     public static <T, R> long measurePerf(Function<T, R> f, T input) {  
  37.         long fastest = Long.MAX_VALUE;  
  38.         for (int i = 0; i < 10; i++) {  
  39.             long start = System.nanoTime();  
  40.             R result = f.apply(input);  
  41.             long duration = (System.nanoTime() - start) / 1_000_000;  
  42.             System.out.println("Result: " + result);  
  43.             if (duration < fastestfastest = duration 
  44.         }  
  45.         return fastest;  
  46.     }  

共享變量修改的問題

并行流雖然輕易的實現(xiàn)了多線程,但是仍未解決多線程中共享變量的修改問題。下面代碼中存在共享變量 total,分別使用順序流和并行流計算前n個自然數(shù)的和 

  1. public static long sideEffectSum(long n) {  
  2.     Accumulator accumulator = new Accumulator();  
  3.     LongStream.rangeClosed(1, n).forEach(accumulator::add);  
  4.     return accumulator.total;  
  5.  
  6. public static long sideEffectParallelSum(long n) {  
  7.     Accumulator accumulator = new Accumulator();  
  8.     LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);  
  9.     return accumulator.total;  
  10.  
  11. public static class Accumulator {  
  12.     private long total = 0 
  13.     public void add(long value) {  
  14.         total += value;  
  15.     }  

順序執(zhí)行每次輸出的結果都是:50000005000000,而并行執(zhí)行的結果卻五花八門了。這是因為每次訪問 totle 都會存在數(shù)據(jù)競爭,關于數(shù)據(jù)競爭的原因,大家可以看看關于 volatile 的博客。因此當代碼中存在修改共享變量的操作時,是不建議使用并行流的。

并行流的使用注意

在并行流的使用上有下面幾點需要注意:

  •  盡量使用 LongStream / IntStream / DoubleStream 等原始數(shù)據(jù)流代替 Stream 來處理數(shù)字,以避免頻繁拆裝箱帶來的額外開銷
  •  要考慮流的操作流水線的總計算成本,假設 N 是要操作的任務總數(shù),Q 是每次操作的時間。N * Q 就是操作的總時間,Q 值越大就意味著使用并行流帶來收益的可能性越大

例如:前端傳來幾種類型的資源,需要存儲到數(shù)據(jù)庫。每種資源對應不同的表。我們可以視作類型數(shù)為 N,存儲數(shù)據(jù)庫的網(wǎng)絡耗時 + 插入操作耗時為 Q。一般情況下網(wǎng)絡耗時都是比較大的。因此該操作就比較適合并行處理。當然當類型數(shù)目大于核心數(shù)時,該操作的性能提升就會打一定的折扣了。更好的優(yōu)化方法在日后的博客會為大家奉上

  •  對于較少的數(shù)據(jù)量,不建議使用并行流
  •  容易拆分成塊的流數(shù)據(jù),建議使用并行流

以下是一些常見的集合框架對應流的可拆分性能表:

可拆分性
ArrayList 極佳
LinkedList
IntStream.range 極佳
Stream.iterate
HashSet
TreeSet

碼字不易,如果你覺得讀完以后有收獲,不妨點個推薦讓更多的人看到吧! 

 

責任編輯:龐桂玉 來源: Java技術棧
相關推薦

2014-06-05 08:47:52

Spark 1.0Mapreduce

2019-09-02 08:58:27

Python編譯器編程語言

2021-08-09 19:01:36

并行場景程序

2018-03-05 10:27:47

電腦卡頓舊電腦

2013-09-26 16:25:47

微軟甲骨文Windows Azu

2023-11-07 12:00:41

數(shù)據(jù)并行Java 8數(shù)據(jù)

2024-04-19 08:28:57

JavaAPI場景

2011-03-07 14:15:33

standby數(shù)據(jù)庫

2018-05-15 11:05:36

Wifi速度數(shù)字

2023-11-10 18:03:04

業(yè)務場景SQL

2010-10-22 14:43:09

移動開發(fā)

2020-05-05 22:48:18

工業(yè)物聯(lián)網(wǎng)IIOT物聯(lián)網(wǎng)

2023-05-12 07:40:01

Java8API工具

2014-07-15 13:57:53

Java8

2009-04-23 08:59:37

Windows 7微軟操作系統(tǒng)

2017-06-14 16:15:13

擁抱開發(fā)者技術落地微軟

2019-06-27 10:32:57

Java開發(fā)代碼

2017-04-25 09:38:19

戴爾

2014-04-16 07:48:56

Java 8Permgen

2023-10-12 08:29:06

線程池Java
點贊
收藏

51CTO技術棧公眾號