自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Java并發(fā)編程,看這篇就夠了!

開發(fā) 后端
本文從應(yīng)用層面總結(jié)了,JVM基本的內(nèi)存模型以及線程對(duì)共享內(nèi)存操作的原子方式,并著重介紹了線程池、FutrueTask、CountDownLatch、CycliBarrier以及Semaphore這幾種在Java并發(fā)編程中經(jīng)常使用的JUC工具類。

[[397754]]

本文轉(zhuǎn)載自微信公眾號(hào)「無敵碼農(nóng)」,作者無敵碼農(nóng)。轉(zhuǎn)載本文請(qǐng)聯(lián)系無敵碼農(nóng)公眾號(hào)。

大家好!我是"無敵碼農(nóng)"。今天的文章將給大家分享Java并發(fā)編程相關(guān)的知識(shí)點(diǎn),雖然類似的文章已有很多,但本文將以更貼近實(shí)際使用場(chǎng)景的方式進(jìn)行闡述。具體將對(duì)Java常見的并發(fā)編程方式和手段進(jìn)行總結(jié),以便可以從使用角度更好地感知Java并發(fā)編程帶來的效果,從而為后續(xù)更深入的理解Java并發(fā)機(jī)制進(jìn)行鋪墊。

Java多線程概述

在Java中使用多線程是提高程序并發(fā)響應(yīng)能力的重要手段,但同時(shí)它也是一把雙刃劍;如果使用不當(dāng)也很容易導(dǎo)致程序出錯(cuò),并且還很難直觀地找到問題。這是因?yàn)椋?)、線程運(yùn)行本身是由操作系統(tǒng)調(diào)度,具有一定的隨機(jī)性;2)、Java共享內(nèi)存模型在多線程環(huán)境下很容易產(chǎn)生線程安全問題;3)、不合理的封裝依賴,極容易導(dǎo)致發(fā)布對(duì)象的不經(jīng)意逸出。

所以,要用好多線程這把劍,就需要對(duì)Java內(nèi)存模型、線程安全問題有較深的認(rèn)識(shí)。但由于Java豐富的生態(tài),在實(shí)際研發(fā)工作中,需要我們自己進(jìn)行并發(fā)處理的場(chǎng)景大都被各類框架或組件給屏蔽了。這也是造成很多Java開發(fā)人員對(duì)并發(fā)編程意識(shí)淡薄的主要原因。

首先從Java內(nèi)存模型的角度理解下使用多線程編程最核心的問題,具體如下圖所示:

如上圖所示,在Java內(nèi)存模型中,對(duì)于用戶程序來說用得最頻繁的就是堆內(nèi)存和棧內(nèi)存,其中堆內(nèi)存主要存放對(duì)象及數(shù)組,例如由new()產(chǎn)生的實(shí)例。而棧內(nèi)存則主要是存儲(chǔ)運(yùn)行方法時(shí)所需的局部變量、操作數(shù)及方法出口等信息。

其中堆內(nèi)存是線程共享的,一個(gè)類被實(shí)例化后生成的對(duì)象、及對(duì)象中定義的成員變量可以被多個(gè)線程共享訪問,這種共享主要體現(xiàn)在多個(gè)線程同時(shí)執(zhí)行、同一個(gè)對(duì)象實(shí)例的某個(gè)方法時(shí),會(huì)將該方法中操作的對(duì)象成員變量分別以多個(gè)副本的方式拷貝到方法棧中進(jìn)行操作,而不是直接修改堆內(nèi)存中對(duì)象的成員變量值;線程操作完成后,會(huì)再次將修改后的變量值同步至堆內(nèi)存中的主內(nèi)存地址,并實(shí)現(xiàn)對(duì)其他線程的可見。

這個(gè)過程雖然看似行云流水,但在JVM中卻至少需要6個(gè)原子步驟才能完成,具體如下圖所示:

如上圖所示,在不考慮對(duì)共享變量進(jìn)行加鎖的情況下,堆內(nèi)存中一個(gè)對(duì)象的成員變量被線程修改大概需要以下6個(gè)步驟:

1、read(讀取):從堆內(nèi)存中的讀取要操作的變量;

2、load(載入):將讀取的變量拷貝到線程棧內(nèi)存;

3、use(使用):將棧內(nèi)存中的變量值傳遞給執(zhí)行引擎;

4、assign(賦值):將從執(zhí)行引擎得到的結(jié)果賦值給棧內(nèi)存中變量;

5、store(存儲(chǔ)):將變更后的棧內(nèi)存中的變量值傳遞到主內(nèi)存;

6、write(寫入):變更主內(nèi)存中的變量值,此時(shí)新值對(duì)所有線程可見;

由此可見,每個(gè)線程都可以按這幾個(gè)步驟并行操作同一個(gè)共享變量??上攵绻麤]有任何同步措施,那么在多線程環(huán)境下,該共享變量的值將變得飄忽不定,很難得到最終正確的結(jié)果。而這就是所謂的線程安全問題,也是我們?cè)谑褂枚嗑€程編程時(shí),最需要關(guān)注的問題!

線程池的使用

在實(shí)際場(chǎng)景中,多線程的使用并不是單打獨(dú)斗,線程作為寶貴的系統(tǒng)資源,其創(chuàng)建和銷毀都需要耗費(fèi)一定的系統(tǒng)資源;而無限制的創(chuàng)建線程資源,也會(huì)導(dǎo)致系統(tǒng)資源的耗盡。所以,為了重復(fù)使用線程資源、限制線程的創(chuàng)建行為,一般都會(huì)通過線程池來實(shí)現(xiàn)。以Java Web服務(wù)中使用最廣的Tomcat服務(wù)器舉例,為了并行處理網(wǎng)絡(luò)請(qǐng)求就使用了線程池,源碼示例如下:

  1. public boolean processSocket(SocketWrapperBase<S> socketWrapper, 
  2.         SocketEvent event, boolean dispatch) { 
  3.     try { 
  4.         if (socketWrapper == null) { 
  5.             return false
  6.         } 
  7.         SocketProcessorBase<S> sc = null
  8.         if (processorCache != null) { 
  9.             sc = processorCache.pop(); 
  10.         } 
  11.         if (sc == null) { 
  12.             sc = createSocketProcessor(socketWrapper, event); 
  13.         } else { 
  14.             sc.reset(socketWrapper, event); 
  15.         } 
  16.         //這里通過線程池對(duì)線程執(zhí)行進(jìn)行管理 
  17.         Executor executor = getExecutor(); 
  18.         if (dispatch && executor != null) { 
  19.             executor.execute(sc); 
  20.         } else { 
  21.             sc.run(); 
  22.         } 
  23.     } catch (RejectedExecutionException ree) { 
  24.         getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree); 
  25.         return false
  26.     } catch (Throwable t) { 
  27.         ExceptionUtils.handleThrowable(t); 
  28.         // This means we got an OOM or similar creating a thread, or that 
  29.         // the pool and its queue are full 
  30.         getLog().error(sm.getString("endpoint.process.fail"), t); 
  31.         return false
  32.     } 
  33.     return true

上述代碼為Tomcat源碼使用線程池并發(fā)處理網(wǎng)絡(luò)請(qǐng)求的示例,這里以Tomcat為例,主要是因?yàn)榛赟pring Boot、Spring MVC開發(fā)的Web服務(wù)大都運(yùn)行在Tomcat容器,而對(duì)于線程、線程池使用的復(fù)雜度都被屏蔽在中間件和框架中了,所以很多同學(xué)雖然寫了不少Java代碼,但在業(yè)務(wù)研發(fā)中額外使用線程的場(chǎng)景可能并不多,舉這個(gè)例子的目的就是為了提升下并發(fā)編程的意識(shí)!

在Java中使用線程池的主要方式是Executor框架,該框架作為JUC并發(fā)包的一部分,為Java程序提供了一個(gè)靈活的線程池實(shí)現(xiàn)。其邏輯層次如下圖所示:

如圖所示,使用Executor框架,既可以通過直接自定義配置、擴(kuò)展ThreadPoolExecutor來創(chuàng)建一個(gè)線程池,也可以通過Executors類直接調(diào)用“newSingleThreadExecutor()、newFixedThreadPool()、newCachedThreadPool()”這三個(gè)方法來創(chuàng)建具有一定功能特征的線程池。

除此之外,也可以通過自定義配置、擴(kuò)展ScheduledThreadPoolExecutor來創(chuàng)建一個(gè)具有周期性、定時(shí)功能的線程池,例如線程10s后運(yùn)行、線程每分鐘運(yùn)行一次等。同樣,與ThreadPoolExecutor一樣,如果不想自定義配置,也可以通過Executors類直接調(diào)用“newScheduledThreadPool()、newSingleThreadScheduledExecutor()”這兩個(gè)方法來分別創(chuàng)建具備自動(dòng)線程規(guī)模擴(kuò)展能力和線程池中只允許有單個(gè)線程的特定線程池。

而ForkJoinPool是jdk1.8以后新增的一種線程池實(shí)現(xiàn)類型,類似于Fork-Join框架所支持的功能。這是一種可以將一個(gè)大任務(wù)拆分成多個(gè)任務(wù)隊(duì)列,并具體分配給不同線程處理的機(jī)制,而關(guān)鍵的特性在于,通過竊取算法,某個(gè)線程在執(zhí)行完本隊(duì)列任務(wù)后,可以竊取其他隊(duì)列的任務(wù)進(jìn)行執(zhí)行,從而最大限度提高線程的利用效率。

在實(shí)際應(yīng)用中,雖然可以通過Executors方便的創(chuàng)建單個(gè)線程、固定線程或具備自動(dòng)收縮能力的線程池,但一般還是建議直接通過ThreadPoolExecutor或ScheduledThreadPoolExecutor自定義配置,這主要是因?yàn)镋xecutors默認(rèn)創(chuàng)建的線程池,很多采用的是無界隊(duì)列,例如LinkedBlockingQueue,這樣線程就可以被無限制的添加都線程池的任務(wù)執(zhí)行隊(duì)列,如果請(qǐng)求量過大容易造成OOM。

接下來以一個(gè)實(shí)際的例子來演示通過ThreadPoolExecutor如何自定義配置一個(gè)業(yè)務(wù)線程池,具體如下:

1)、配置一個(gè)線程池類

  1. public final class SingleBlockPoolExecutor { 
  2.  
  3.     /** 
  4.      * 自定義配置線程池(線程池核心線程數(shù)、最大線程數(shù)、存活時(shí)間設(shè)置、采用的隊(duì)列類型、線程工廠類、線程池拒絕處理類) 
  5.      */ 
  6.     private final ThreadPoolExecutor pool = new ThreadPoolExecutor(30, 100, 5, TimeUnit.MINUTES, 
  7.             new ArrayBlockingQueue<Runnable>(100), new BlockThreadFactory(), new BlockRejectedExecutionHandler()); 
  8.  
  9.     public ThreadPoolExecutor getPool() { 
  10.         return pool; 
  11.     } 
  12.  
  13.     private SingleBlockPoolExecutor() { 
  14.     } 
  15.  
  16.     /** 
  17.      * 定義線程工廠 
  18.      */ 
  19.     public static class BlockThreadFactory implements ThreadFactory { 
  20.  
  21.         private AtomicInteger count = new AtomicInteger(0); 
  22.  
  23.         @Override 
  24.         public Thread newThread(Runnable r) { 
  25.             Thread t = new Thread(r); 
  26.             String threadName = SingleBlockPoolExecutor.class.getSimpleName() + "-" + count.addAndGet(1); 
  27.             t.setName(threadName); 
  28.             return t; 
  29.         } 
  30.     } 
  31.  
  32.     /** 
  33.      * 定義線程池拒絕機(jī)制處理類 
  34.      */ 
  35.     public static class BlockRejectedExecutionHandler implements RejectedExecutionHandler { 
  36.  
  37.         @Override 
  38.         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
  39.             try { 
  40.                 //被拒線程再次返回阻塞隊(duì)列進(jìn)行等待處理 
  41.                 executor.getQueue().put(r); 
  42.             } catch (InterruptedException e) { 
  43.                 Thread.currentThread().interrupt(); 
  44.             } 
  45.         } 
  46.     } 
  47.  
  48.     /** 
  49.      * 在靜態(tài)內(nèi)部類中持有單例類的實(shí)例,并且可直接被初始化 
  50.      */ 
  51.     private static class Holder { 
  52.  
  53.         private static SingleBlockPoolExecutor instance = new SingleBlockPoolExecutor(); 
  54.     } 
  55.  
  56.     /** 
  57.      * 調(diào)用getInstance方法,事實(shí)上是獲得Holder的instance靜態(tài)屬性 
  58.      * 
  59.      * @return 
  60.      */ 
  61.     public static SingleBlockPoolExecutor getInstance() { 
  62.         return Holder.instance; 
  63.     } 
  64.  
  65.     /** 
  66.      * 線程池銷毀方法 
  67.      */ 
  68.     public void destroy() { 
  69.         if (pool != null) { 
  70.             //線程池銷毀 
  71.             pool.shutdownNow(); 
  72.         } 
  73.     } 

如上述代碼所示,通過單例模式配置了一個(gè)線程池。在對(duì)ThreadPoolExecutor的配置中,需要設(shè)置“核心線程數(shù)、最大線程數(shù)、存活時(shí)間設(shè)置、采用的隊(duì)列類型、線程工廠類、線程池拒絕處理類”,這幾個(gè)核心參數(shù)。

2)、定義系統(tǒng)全局線程池管理類

  1. public class AsyncManager { 
  2.  
  3.     /** 
  4.      * 任務(wù)處理公共線程池 
  5.      */ 
  6.     public static final ExecutorService service = SingleBlockPoolExecutor.getInstance().getPool(); 
  7.  

在應(yīng)用中,除了框架定義的線程池外,如果自定義線程池,為了方便統(tǒng)一管理和使用,可以建立一個(gè)全局管理類,如上所示,該類通過靜態(tài)變量的方式初始化了前面我們所定義的線程池。

3)、業(yè)務(wù)中使用

  1. @Service 
  2. @Slf4j 
  3. public class OrderServiceImpl implements OrderService { 
  4.  
  5.     @Override 
  6.     public CreateOrderBO createOrder(CreateOrderDTO createOrderDTO) { 
  7.         //1、同步處理核心業(yè)務(wù)邏輯 
  8.         log.info("同步處理業(yè)務(wù)邏輯"); 
  9.         //2、通過線程池提交,異步處理非核心邏輯,例如日志埋點(diǎn) 
  10.         AsyncManager.service.execute(() -> { 
  11.             System.out.println("線程->" + Thread.currentThread().getName() + ",正在執(zhí)行異步日志處理任務(wù)"); 
  12.         }); 
  13.         return CreateOrderBO.builder().result(true).build(); 
  14.     } 

如上代碼所示,業(yè)務(wù)中需要通過線程池異步處理時(shí),可以通過線程池管理類獲取對(duì)應(yīng)的線程池,并向其提交執(zhí)行線程任務(wù)。

FutureTask實(shí)現(xiàn)異步結(jié)果返回

在使用Thread或Runnable實(shí)現(xiàn)的線程處理中,一般是不能返回線程處理結(jié)果的。但如果希望在調(diào)用線程異步處理完成后,能夠獲得線程異步處理的結(jié)果,那么就可以通過FutureTask框架實(shí)現(xiàn)。示例代碼如下:

  1. @Service 
  2. @Slf4j 
  3. public class OrderServiceImpl implements OrderService { 
  4.  
  5.     @Override 
  6.     public CreateOrderBO createOrder(CreateOrderDTO createOrderDTO) { 
  7.         //Future異步處理返回執(zhí)行結(jié)果 
  8.         //定義接收線程執(zhí)行結(jié)果的FutureTask對(duì)象 
  9.         List<Future<Integer>> results = Collections.synchronizedList(new ArrayList<>()); 
  10.         //實(shí)現(xiàn)Callable接口定義線程執(zhí)行邏輯 
  11.         results.add(AsyncManager.service.submit(new Callable<Integer>() { 
  12.             @Override 
  13.             public Integer call() throws Exception { 
  14.                 int a = 1, b = 2; 
  15.                 System.out.println("Callable接口執(zhí)行中"); 
  16.                 return a + b; 
  17.             } 
  18.         })); 
  19.         //輸出線程返回結(jié)果 
  20.         for (Future<Integer> future : results) { 
  21.             try { 
  22.                 //這里獲取結(jié)果,等待時(shí)間設(shè)置200毫秒 
  23.                 System.out.println("a+b=" + future.get(200, TimeUnit.MILLISECONDS)); 
  24.             } catch (InterruptedException e) { 
  25.                 e.printStackTrace(); 
  26.             } catch (ExecutionException e) { 
  27.                 e.printStackTrace(); 
  28.             } catch (TimeoutException e) { 
  29.                 e.printStackTrace(); 
  30.             } 
  31.         } 
  32.         //判斷線程是否執(zhí)行完畢,完畢則獲取執(zhí)行結(jié)果 
  33.         return CreateOrderBO.builder().result(true).build(); 
  34.     } 

如上述代碼,如果希望線程返回執(zhí)行結(jié)果,那么可以通過實(shí)現(xiàn)Callable接口定義線程類,并通過FutureTask接收線程處理結(jié)果。不過在實(shí)際使用時(shí),需要注意線程暫時(shí)未執(zhí)行完成情況下的業(yè)務(wù)處理邏輯。

CountDownLatch實(shí)現(xiàn)線程并行同步

在并發(fā)編程中,一個(gè)復(fù)雜的業(yè)務(wù)邏輯可以通過多個(gè)線程并發(fā)執(zhí)行來提高速度;但如果需要同步等待這些線程執(zhí)行完后才能進(jìn)行后續(xù)的邏輯,那么就可以通過CountDownLatch來實(shí)現(xiàn)對(duì)多個(gè)線程執(zhí)行的同步匯聚。其邏輯示意圖如下:

從原理上看CountDownLatch實(shí)際上是在其內(nèi)部創(chuàng)建并維護(hù)了一個(gè)volatile類型的整數(shù)計(jì)數(shù)器,當(dāng)調(diào)用countDown()方法時(shí),會(huì)嘗試將整數(shù)計(jì)數(shù)器-1,當(dāng)調(diào)用wait()方法時(shí),當(dāng)前線程就會(huì)判斷整數(shù)計(jì)數(shù)器是否為0,如果為0,則繼續(xù)往下執(zhí)行,如果不為0,則使當(dāng)前線程進(jìn)入阻塞狀態(tài),直到某個(gè)線程將計(jì)數(shù)器設(shè)置為0,才會(huì)喚醒在await()方法中等待的線程繼續(xù)執(zhí)行。

常見的代碼使用示例如下:

1)、創(chuàng)建執(zhí)行具體業(yè)務(wù)邏輯的線程處理類

  1. public class DataDealTask implements Runnable { 
  2.  
  3.     private List<Integer> list; 
  4.     private CountDownLatch latch; 
  5.  
  6.     public DataDealTask(List<Integer> list, CountDownLatch latch) { 
  7.         this.list = list; 
  8.         this.latch = latch; 
  9.     } 
  10.  
  11.     @Override 
  12.     public void run() { 
  13.         try { 
  14.             System.out.println("線程->" + Thread.currentThread().getName() + ",處理" + list.size()); 
  15.         } finally { 
  16.             //處理完計(jì)數(shù)器遞減 
  17.             latch.countDown(); 
  18.         } 
  19.     } 

該線程處理類,在實(shí)例化時(shí)接收除了待處理數(shù)據(jù)參數(shù)外,還會(huì)接收CountDownLatch對(duì)象,在執(zhí)行完線程邏輯,注意,無論成功或失敗,都需要調(diào)用countDown()方法。

2)、具體的使用方法

  1. @Service 
  2. @Slf4j 
  3. public class OrderServiceImpl implements OrderService { 
  4.  
  5.     @Override 
  6.     public CreateOrderBO createOrder(CreateOrderDTO createOrderDTO) { 
  7.         //CountDownLatch的使用示例 
  8.         //模擬待處理數(shù)據(jù)生成 
  9.         Integer[] array = {10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 101, 102}; 
  10.         List<Integer> list = new ArrayList<>(); 
  11.         Arrays.asList(array).stream().map(o -> list.add(o)).collect(Collectors.toList()); 
  12.         //對(duì)數(shù)據(jù)進(jìn)行分組處理(5條記錄為1組) 
  13.         Map<String, List<?>> entityMap = this.groupListByAvg(list, 6); 
  14.         //根據(jù)數(shù)據(jù)分組數(shù)量,確定同步計(jì)數(shù)器的值 
  15.         CountDownLatch latch = new CountDownLatch(entityMap.size()); 
  16.         Iterator<Entry<String, List<?>>> it = entityMap.entrySet().iterator(); 
  17.         try { 
  18.             //將分組數(shù)據(jù)分批提交給不同線程處理 
  19.             while (it.hasNext()) { 
  20.                 DataDealTask dataDealTask = new DataDealTask((List<Integer>) it.next().getValue(), latch); 
  21.                 AsyncManager.service.submit(dataDealTask); 
  22.             } 
  23.             //等待分批處理線程處理完成 
  24.             latch.await(); 
  25.         } catch (InterruptedException e) { 
  26.             e.printStackTrace(); 
  27.         } 
  28.         return CreateOrderBO.builder().result(true).build(); 
  29.     } 

如上所示代碼,在業(yè)務(wù)邏輯中如果處理數(shù)據(jù)量多,則可以通過分組的方式并行處理,而等待所有線程處理完成后,再同步返回調(diào)用方。這種場(chǎng)景就可以通過CountDownLatch來實(shí)現(xiàn)同步!

CycliBarrier柵欄實(shí)現(xiàn)線程階段性同步

CountDownLatch的功能主要是實(shí)現(xiàn)線程的一次性同步。而在實(shí)際的業(yè)務(wù)場(chǎng)景中也可能存在這樣的情況,執(zhí)行一個(gè)階段性的任務(wù),例如”階段1->階段2->階段3->階段4->階段5"。那么在并發(fā)處理這個(gè)階段性任務(wù)時(shí),就要在每個(gè)階段設(shè)置柵欄,只有當(dāng)所有線程執(zhí)行到某個(gè)階段點(diǎn)之后,才能繼續(xù)推進(jìn)下一個(gè)階段任務(wù)的執(zhí)行,其邏輯如圖所示:

針對(duì)上述場(chǎng)景,就可以通過CycliBarrier來實(shí)現(xiàn)。而從實(shí)現(xiàn)上看,CyclicBarrier使用了基于ReentrantLock的互斥鎖實(shí)現(xiàn);在CyclicBarrier的內(nèi)部有一個(gè)計(jì)數(shù)器 count,當(dāng)count不為0時(shí),每個(gè)線程在到達(dá)同步點(diǎn)會(huì)先調(diào)用await方法將自己阻塞,并將計(jì)數(shù)器會(huì)減1,直到計(jì)數(shù)器減為0的時(shí)候,所有因調(diào)用await方法而被阻塞的線程就會(huì)被喚醒繼續(xù)執(zhí)行。并進(jìn)入下一輪阻塞,此時(shí)在new CyclicBarrier(parties) 時(shí)設(shè)置的parties值,會(huì)被賦值給 count 從而實(shí)現(xiàn)復(fù)用。

例如,計(jì)算某個(gè)部門的員工工資,要求在所有員工工資都計(jì)算完之后才能進(jìn)行下一步整合操作。其代碼示例如下:

  1. @Slf4j 
  2. @Service 
  3. public class SalaryStatisticServiceImpl implements SalaryStatisticService { 
  4.  
  5.     /** 
  6.      * 模擬部門員工存儲(chǔ)數(shù)據(jù) 
  7.      */ 
  8.     public static Map<String, List<EmployeeSalaryInfo>> employeeMap = Collections.synchronizedMap(new HashMap<>()); 
  9.  
  10.     static { 
  11.         EmployeeSalaryInfo employeeA = new EmployeeSalaryInfo(); 
  12.         employeeA.setEmployeeNo("100"); 
  13.         employeeA.setBaseSalaryAmount(10000); 
  14.         employeeA.setSubsidyAmount(3000); 
  15.         EmployeeSalaryInfo employeeB = new EmployeeSalaryInfo(); 
  16.         employeeB.setEmployeeNo("101"); 
  17.         employeeB.setBaseSalaryAmount(30000); 
  18.         employeeB.setSubsidyAmount(3000); 
  19.         List<EmployeeSalaryInfo> list = new ArrayList<>(); 
  20.         list.add(employeeA); 
  21.         list.add(employeeB); 
  22.         employeeMap.put("10", list); 
  23.     } 
  24.  
  25.     @Override 
  26.  
  27.     public StatisticReportBO statisticReport(StatisticReportDTO statisticReportDTO) { 
  28.         //查詢部門下所有員工信息(模擬) 
  29.         List<EmployeeSalaryInfo> employeeSalaryInfos = employeeMap.get(statisticReportDTO.getDepartmentNo()); 
  30.         if (employeeSalaryInfos == null) { 
  31.             log.info("部門員工信息不存在"); 
  32.             return StatisticReportBO.builder().build(); 
  33.         } 
  34.         //定義統(tǒng)計(jì)總工資的安全變量 
  35.         AtomicInteger totalSalary = new AtomicInteger(); 
  36.         //開啟柵欄(在各線程觸發(fā)之后觸發(fā)) 
  37.         CyclicBarrier cyclicBarrier = new CyclicBarrier(employeeSalaryInfos.size(), new Runnable() { 
  38.             //執(zhí)行順序-B1(隨機(jī)) 
  39.             //該線程不會(huì)阻塞主線程 
  40.             @Override 
  41.             public void run() { 
  42.                 log.info("匯總已分別計(jì)算出的兩個(gè)員工的工資->" + totalSalary.get() + ",執(zhí)行順序->B"); 
  43.             } 
  44.         }); 
  45.         //執(zhí)行順序-A 
  46.         for (EmployeeSalaryInfo e : employeeSalaryInfos) { 
  47.             AsyncManager.service.submit(new Callable<Integer>() { 
  48.                 @Override 
  49.                 public Integer call() { 
  50.                     int totalAmount = e.getSubsidyAmount() + e.getBaseSalaryAmount(); 
  51.                     log.info("計(jì)算出員工{}", e.getEmployeeNo() + "的工資->" + totalAmount + ",執(zhí)行順序->A"); 
  52.                     //匯總總工資 
  53.                     totalSalary.addAndGet(totalAmount); 
  54.                     try { 
  55.                         //等待其他線程同步 
  56.                         cyclicBarrier.await(); 
  57.                     } catch (InterruptedException e) { 
  58.                         e.printStackTrace(); 
  59.                     } catch (BrokenBarrierException e) { 
  60.                         e.printStackTrace(); 
  61.                     } 
  62.                     return totalAmount; 
  63.                 } 
  64.             }); 
  65.  
  66.         } 
  67.         //執(zhí)行順序-A/B(之前或之后隨機(jī),totalSalary值不能保證一定會(huì)得到,所以CyclicBarrier更適合無返回的可重復(fù)并行計(jì)算) 
  68.         //封裝響應(yīng)參數(shù) 
  69.         StatisticReportBO statisticReportBO = StatisticReportBO.builder().employeeCount(employeeSalaryInfos.size()) 
  70.                 .departmentNo(statisticReportDTO.getDepartmentNo()) 
  71.                 .salaryTotalAmount(totalSalary.get()).build(); 
  72.         log.info("封裝接口響應(yīng)參數(shù),執(zhí)行順序->A/B"); 
  73.         return statisticReportBO; 
  74.     } 
  75.  
  76.     @Data 
  77.     public static class EmployeeSalaryInfo { 
  78.  
  79.         /** 
  80.          * 員工編號(hào) 
  81.          */ 
  82.         private String employeeNo; 
  83.         /** 
  84.          * 基本工資 
  85.          */ 
  86.         private Integer baseSalaryAmount; 
  87.         /** 
  88.          * 補(bǔ)助金額 
  89.          */ 
  90.         private Integer subsidyAmount; 
  91.     } 

上述代碼的執(zhí)行結(jié)果如下:

  1. [kPoolExecutor-1] c.w.c.s.impl.SalaryStatisticServiceImpl  : 計(jì)算出員工100的工資->13000,執(zhí)行順序- 
  2. [kPoolExecutor-2] c.w.c.s.impl.SalaryStatisticServiceImpl  : 計(jì)算出員工101的工資->33000,執(zhí)行順序- 
  3. [kPoolExecutor-2] c.w.c.s.impl.SalaryStatisticServiceImpl  : 匯總已分別計(jì)算出的兩個(gè)員工的工資->46000, 
  4. [nio-8080-exec-2] c.w.c.s.impl.SalaryStatisticServiceImpl  : 封裝接口響應(yīng)參數(shù),執(zhí)行順序->A/B       

從上述結(jié)果可以看出,受CycliBarrier控制的線程會(huì)等待其他線程執(zhí)行完成后同步向后執(zhí)行,并且CycliBarrier并不會(huì)阻塞主線程,所以最后響應(yīng)參數(shù)封裝代碼可能在CycliBarrier匯總線程之前執(zhí)行,也可能在其之后執(zhí)行,使用時(shí)需要注意!

Semaphore(信號(hào)量)限制訪問資源的線程數(shù)

Semaphore可以實(shí)現(xiàn)對(duì)某個(gè)共享資源訪問線程數(shù)的限制,實(shí)現(xiàn)限流功能。以停車場(chǎng)線程為例,代碼如下:

  1. @Service 
  2. @Slf4j 
  3. public class ParkServiceImpl implements ParkService { 
  4.  
  5.     /** 
  6.      * 模擬停車場(chǎng)的車位數(shù) 
  7.      */ 
  8.     private static Semaphore semaphore = new Semaphore(2); 
  9.  
  10.     @Override 
  11.     public AccessParkBO accessPark(AccessParkDTO accessParkDTO) { 
  12.         AsyncManager.service.execute(() -> { 
  13.             if (semaphore.availablePermits() == 0) { 
  14.                 log.info(Thread.currentThread().getName() + ",車牌號(hào)->" + accessParkDTO.getCarNo() + ",車位不足請(qǐng)耐心等待"); 
  15.             } else { 
  16.                 try { 
  17.                     //獲取令牌嘗試進(jìn)入停車場(chǎng) 
  18.                     semaphore.acquire(); 
  19.                     log.info(Thread.currentThread().getName() + ",車牌號(hào)->" + accessParkDTO.getCarNo() + ",成功進(jìn)入停車場(chǎng)"); 
  20.                     //模擬車輛在停車場(chǎng)停留的時(shí)間(30秒) 
  21.                     Thread.sleep(30000); 
  22.                     //釋放令牌,騰出停車場(chǎng)車位 
  23.                     semaphore.release(); 
  24.                     log.info(Thread.currentThread().getName() + ",車牌號(hào)->" + accessParkDTO.getCarNo() + ",駛出停車場(chǎng)"); 
  25.                 } catch (InterruptedException e) { 
  26.                     e.printStackTrace(); 
  27.                 } 
  28.             } 
  29.  
  30.         }); 
  31.         //封裝返回信息 
  32.         return AccessParkBO.builder().carNo(accessParkDTO.getCarNo()) 
  33.                 .currentPositionCount(semaphore.availablePermits()) 
  34.                 .isPermitAccess(semaphore.availablePermits() > 0 ? true : false).build(); 
  35.     } 
  36. }     

上述代碼模擬停車場(chǎng)有2車位,并且每輛車進(jìn)入車場(chǎng)后會(huì)停留30秒,然后并行模擬3次停車請(qǐng)求,具體執(zhí)行效果如下:

  1. [kPoolExecutor-1] c.w.c.service.impl.ParkServiceImpl       : SingleBlockPoolExecutor-1,車牌號(hào)->10,成功進(jìn)入停車場(chǎng)  順序->A 
  2. [kPoolExecutor-2] c.w.c.service.impl.ParkServiceImpl       : SingleBlockPoolExecutor-2,車牌號(hào)->20,成功進(jìn)入停車場(chǎng)  順序->A 
  3. [kPoolExecutor-3] c.w.c.service.impl.ParkServiceImpl       : SingleBlockPoolExecutor-3,車牌號(hào)->30,車位不足請(qǐng)耐心等待00,執(zhí)行順序->B 
  4. [kPoolExecutor-1] c.w.c.service.impl.ParkServiceImpl       : SingleBlockPoolExecutor-1,車牌號(hào)->10,駛出停車場(chǎng)     
  5. [kPoolExecutor-2] c.w.c.service.impl.ParkServiceImpl       : SingleBlockPoolExecutor-2,車牌號(hào)->20,駛出停車場(chǎng)     
  6. [kPoolExecutor-4] c.w.c.service.impl.ParkServiceImpl       : SingleBlockPoolExecutor-4,車牌號(hào)->30,成功進(jìn)入停車場(chǎng)   

可以看到由于通過Semaphore限制了可允許進(jìn)入的線程數(shù)是2個(gè),所以第三次請(qǐng)求會(huì)被拒絕,直到前兩次請(qǐng)求通過.release()方法釋放證書后第4次請(qǐng)求才會(huì)被允許進(jìn)入!

后記

本文從應(yīng)用層面總結(jié)了,JVM基本的內(nèi)存模型以及線程對(duì)共享內(nèi)存操作的原子方式,并著重介紹了線程池、FutrueTask、CountDownLatch、CycliBarrier以及Semaphore這幾種在Java并發(fā)編程中經(jīng)常使用的JUC工具類。

 

責(zé)任編輯:武曉燕 來源: 無敵碼農(nóng)
相關(guān)推薦

2021-09-30 07:59:06

zookeeper一致性算法CAP

2019-08-16 09:41:56

UDP協(xié)議TCP

2022-03-29 08:23:56

項(xiàng)目數(shù)據(jù)SIEM

2024-08-27 11:00:56

單例池緩存bean

2017-03-30 22:41:55

虛擬化操作系統(tǒng)軟件

2021-09-10 13:06:45

HDFS底層Hadoop

2023-09-25 08:32:03

Redis數(shù)據(jù)結(jié)構(gòu)

2023-10-04 00:32:01

數(shù)據(jù)結(jié)構(gòu)Redis

2023-11-07 07:46:02

GatewayKubernetes

2021-07-28 13:29:57

大數(shù)據(jù)PandasCSV

2025-02-03 07:00:00

Java接口工具

2021-04-11 08:30:40

VRAR虛擬現(xiàn)實(shí)技術(shù)

2018-09-26 11:02:46

微服務(wù)架構(gòu)組件

2021-10-21 06:52:17

ZooKeeper分布式配置

2022-08-18 20:45:30

HTTP協(xié)議數(shù)據(jù)

2023-12-07 09:07:58

2021-11-10 07:47:48

Traefik邊緣網(wǎng)關(guān)

2023-11-22 07:54:33

Xargs命令Linux

2021-12-13 10:43:45

HashMapJava集合容器

2023-11-03 08:53:15

StrconvGolang
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)