你破壞Java代碼的樣子,真美!
本文轉(zhuǎn)載自微信公眾號(hào)「小明菜市場(chǎng)」,作者小明菜市場(chǎng)。轉(zhuǎn)載本文請(qǐng)聯(lián)系小明菜市場(chǎng)公眾號(hào)。
前言
在之前如果需要處理集合需要先手動(dòng)分成幾部分,然后為每部分創(chuàng)建線程,最后在合適的時(shí)候合并,這是手動(dòng)處理并行集合的方法,在java8中,有了新功能,可以一下開(kāi)啟并行模式。
并行流
認(rèn)識(shí)開(kāi)啟并行流
并行流是什么?是把一個(gè)流內(nèi)容分成多個(gè)數(shù)據(jù)塊,并用不同線程分別處理每個(gè)不同數(shù)據(jù)塊的流。例如,有下面一個(gè)例子,在List中,需要對(duì)List數(shù)據(jù)進(jìn)行分別計(jì)算,其代碼如下所示:
- List<Apple> appleList = new ArrayList<>(); // 假裝數(shù)據(jù)是從庫(kù)里查出來(lái)的
- for (Apple apple : appleList) {
- apple.setPrice(5.0 * apple.getWeight() / 1000);
- }
在這里,時(shí)間復(fù)雜度為O(list.size),隨著list的增加,耗時(shí)也在增加。并行流可以解決這個(gè)問(wèn)題,代碼如下所示:
appleList.parallelStream().forEach(apple -> apple.setPrice(5.0 * apple.getWeight() / 1000));
這里通過(guò)調(diào)parallelStream()說(shuō)明當(dāng)前流為并行流,然后進(jìn)行并行執(zhí)行。并行流內(nèi)部使用了默認(rèn)的ForkJoinPool線程池,默認(rèn)線程數(shù)為處理器的核心數(shù)。
測(cè)試并行流
普通代碼如下所示:
- public static void main(String[] args) throws InterruptedException {
- List<Apple> appleList = initAppleList();
- Date begin = new Date();
- for (Apple apple : appleList) {
- apple.setPrice(5.0 * apple.getWeight() / 1000);
- Thread.sleep(1000);
- }
- Date end = new Date();
- log.info("蘋(píng)果數(shù)量:{}個(gè), 耗時(shí):{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000);
- }
輸出的內(nèi)容為耗時(shí)4s。
并行代碼如下所示:
- List<Apple> appleList = initAppleList();
- Date begin = new Date();
- appleList.parallelStream().forEach(apple ->
- {
- apple.setPrice(5.0 * apple.getWeight() / 1000);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- );
- Date end = new Date();
- log.info("蘋(píng)果數(shù)量:{}個(gè), 耗時(shí):{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000);
輸出結(jié)果為耗時(shí)1s。可以看到耗時(shí)大大提升了3s。
并行流拆分會(huì)影響流的速度
對(duì)于并行流來(lái)說(shuō)需要注意以下幾點(diǎn):
- 對(duì)于 iterate 方法來(lái)處理的前 n 個(gè)數(shù)字來(lái)說(shuō),不管并行與否,它總是慢于循環(huán)的,
- 而對(duì)于 LongStream.rangeClosed() 方法來(lái)說(shuō),就不存在 iterate 的第兩個(gè)痛點(diǎn)了。它生成的是基本類型的值,不用拆裝箱操作,另外它可以直接將要生成的數(shù)字 1 - n 拆分成 1 - n/4, 1n/4 - 2n/4, ... 3n/4 - n 這樣四部分。因此并行狀態(tài)下的 rangeClosed() 是快于 for 循環(huán)外部迭代的
代碼如下所示:
- package lambdasinaction.chap7;
- import java.util.stream.*;
- public class ParallelStreams {
- public static long iterativeSum(long n) {
- long result = 0;
- for (long i = 0; i <= n; i++) {
- result += i;
- }
- return result;
- }
- public static long sequentialSum(long n) {
- return Stream.iterate(1L, i -> i + 1).limit(n).reduce(Long::sum).get();
- }
- public static long parallelSum(long n) {
- return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(Long::sum).get();
- }
- public static long rangedSum(long n) {
- return LongStream.rangeClosed(1, n).reduce(Long::sum).getAsLong();
- }
- public static long parallelRangedSum(long n) {
- return LongStream.rangeClosed(1, n).parallel().reduce(Long::sum).getAsLong();
- }
- }
- package lambdasinaction.chap7;
- import java.util.concurrent.*;
- import java.util.function.*;
- public class ParallelStreamsHarness {
- public static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool();
- public static void main(String[] args) {
- System.out.println("Iterative Sum done in: " + measurePerf(ParallelStreams::iterativeSum, 10_000_000L) + " msecs");
- System.out.println("Sequential Sum done in: " + measurePerf(ParallelStreams::sequentialSum, 10_000_000L) + " msecs");
- System.out.println("Parallel forkJoinSum done in: " + measurePerf(ParallelStreams::parallelSum, 10_000_000L) + " msecs" );
- System.out.println("Range forkJoinSum done in: " + measurePerf(ParallelStreams::rangedSum, 10_000_000L) + " msecs");
- System.out.println("Parallel range forkJoinSum done in: " + measurePerf(ParallelStreams::parallelRangedSum, 10_000_000L) + " msecs" );
- }
- public static <T, R> long measurePerf(Function<T, R> f, T input) {
- long fastest = Long.MAX_VALUE;
- for (int i = 0; i < 10; i++) {
- long start = System.nanoTime();
- R result = f.apply(input);
- long duration = (System.nanoTime() - start) / 1_000_000;
- System.out.println("Result: " + result);
- if (duration < fastest) fastest = duration;
- }
- return fastest;
- }
- }
共享變量會(huì)造成數(shù)據(jù)出現(xiàn)問(wèn)題
- public static long sideEffectSum(long n) {
- Accumulator accumulator = new Accumulator();
- LongStream.rangeClosed(1, n).forEach(accumulator::add);
- return accumulator.total;
- }
- public static long sideEffectParallelSum(long n) {
- Accumulator accumulator = new Accumulator();
- LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
- return accumulator.total;
- }
- public static class Accumulator {
- private long total = 0;
- public void add(long value) {
- total += value;
- }
- }
并行流的注意
- 盡量使用 LongStream / IntStream / DoubleStream 等原始數(shù)據(jù)流代替 Stream 來(lái)處理數(shù)字,以避免頻繁拆裝箱帶來(lái)的額外開(kāi)銷
- 要考慮流的操作流水線的總計(jì)算成本,假設(shè) N 是要操作的任務(wù)總數(shù),Q 是每次操作的時(shí)間。N * Q 就是操作的總時(shí)間,Q 值越大就意味著使用并行流帶來(lái)收益的可能性越大
- 對(duì)于較少的數(shù)據(jù)量,不建議使用并行流
- 容易拆分成塊的流數(shù)據(jù),建議使用并行流