通過線程池方式改造Stream.parallel()并行流
大家好,我是哪吒。
上一篇簡單聊一聊公平鎖和非公平鎖,parallel并行流,提到了一個IntStream.rangeClosed并行流問題,很多小伙伴,對這個比較陌生,想用線程池的方式改造一下。
一、IntStream.rangeClosed并行流
@Data
public class LockTest1 {
public static void main(String[] args) {
IntStream.rangeClosed(1, 100000).parallel().forEach(i -> new LockTest1().increase());
System.out.println(time);
}
private static int time = 0;
private static Object lock = new Object();
public void increase() {
synchronized (lock) {
time++;
}
}
}
二、線程池方式改造
不會那些新特性,還是原始的香啊,寫起代碼,得心應(yīng)手。
1、創(chuàng)建線程池
這時候,有些小伙伴,又陷入了選擇恐懼癥。用哪個線程池比較好呢?
簡單回顧一下:
- 單線程池newSingleThreadExecutor(),只有一個核心線程的線程池,保證任務(wù)按FIFO順序一個個執(zhí)行;
- 固定線程數(shù)線程池newFixedThreadPool(10),固定數(shù)量的可復(fù)用的線程數(shù),來執(zhí)行任務(wù)。當線程數(shù)達到最大核心線程數(shù),則加入隊列等待有空閑線程時再執(zhí)行;
- 可緩存線程池newCachedThreadPool(),創(chuàng)建的都是非核心線程,而且最大線程數(shù)為Interge的最大值,空閑線程存活時間是1分鐘。如果有大量耗時的任務(wù),則不適該創(chuàng)建方式,它只適用于生命周期短的任務(wù);
- 固定線程數(shù)newScheduledThreadPool(10),支持定時和周期性任務(wù)newScheduledThreadPool(10),顧名思義,在固定線程數(shù)的前提下,添加了定時任務(wù)。
最常用的還是固定線程數(shù)線程池newFixedThreadPool(10)。
@Data
public class LockTest2 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(200);
for (int i = 0; i < 100000; i++) {
Thread0926 thread = new Thread0926();
executorService.execute(thread);
}
System.out.println(time);
}
private static int time = 0;
private static Object lock = new Object();
public void increase() {
synchronized (lock) {
time++;
}
}
}
2、線程類
public class Thread0926 implements Runnable{
@Override
public void run() {
LockTest2 lockTest = new LockTest2();
lockTest.increase();
}
}
3、信心滿滿,走起來
我草,這不對啊,不應(yīng)該是100000嘛?又把老子整不會了~
三、再次解決并發(fā)時i++原子性問題
上一篇測試過,使用synchronized代碼塊是可以解決i++線程安全問題的,這次怎么不好使了?
上面的代碼中,synchronized (lock)鎖住了time++,lock是靜態(tài)變量,所以屬于類級別的鎖。但是新建的線程是一個新的類,超出了鎖的范圍,所以失效。
那么,在當前類中,開啟線程,是不是就可以了呢?試一下
public class LockTest4 {
private static int time = 0;
private static Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 200; i++) {
Thread thread = new Thread(() -> {
for (int j = 0; j < 500; j++) {
synchronized (lock) {
time++;
}
}
});
thread.start();
}
Thread.sleep(3000);
System.out.println(time);
}
}
當然,在synchronized代碼塊中,使用synchronized (LockTest4.class)也是可以的,效果是一樣的。
四、并行流與多線程
并行流的本質(zhì)的是并行,多線程的本質(zhì)是并發(fā)。
并行指當多核CPU中的一個CPU執(zhí)行一個線程時,其它CPU能夠同時執(zhí)行另一個線程,兩個線程之間不會搶占CPU資源,可以同時運行。
并發(fā)指在一段時間內(nèi)CPU處理多個線程,這些線程會搶占CPU資源,CPU資源根據(jù)時間片周期在多個線程之間來回切換,多個線程在一段時間內(nèi)同時運行,而在同一時刻不是同時運行的。
1、并行和并發(fā)的區(qū)別?
- 并行指多個線程在一段時間的每個時刻都同時運行,并發(fā)指多個線程在一段時間內(nèi)同時運行(不是同一時刻,一段時間內(nèi)交叉執(zhí)行)。
- 并行的多個線程不會搶占系統(tǒng)資源,并發(fā)的多個線程會搶占系統(tǒng)資源。
- 并行是多CPU的產(chǎn)物,單核CPU中只有并發(fā),沒有并行。
2、并行和并發(fā)的使用場景
(1)IO密集場景
場景應(yīng)用程序開發(fā),提供http接口、數(shù)據(jù)庫查詢、微服務(wù)調(diào)用都是IO請求,IO請求時幾乎不消耗cpu,這是為了提供cup使用率,建議使用多線程并發(fā),線程數(shù)可以遠大于cpu核數(shù)。
(2)cup密集場景
對應(yīng)大量的加減乘除運算、md5、hash等運算操作,需要持續(xù)使用cpu,需要讓多核cpu并行運算,適合使用forkjoin并行計算。
技術(shù)場景多線程不足,使用多線程技術(shù),也能提高性能,但是線程設(shè)置過大會浪費cpu線程切換的時間,如果線程任務(wù)分配不均勻,會導(dǎo)致有的cpu忙碌有的cpu空閑。