Java并發(fā)編程,看這篇就夠了!
本文轉(zhuǎn)載自微信公眾號(hào)「無敵碼農(nóng)」,作者無敵碼農(nóng)。轉(zhuǎn)載本文請(qǐng)聯(lián)系無敵碼農(nóng)公眾號(hào)。
大家好!我是"無敵碼農(nóng)"。今天的文章將給大家分享Java并發(fā)編程相關(guān)的知識(shí)點(diǎn),雖然類似的文章已有很多,但本文將以更貼近實(shí)際使用場(chǎng)景的方式進(jìn)行闡述。具體將對(duì)Java常見的并發(fā)編程方式和手段進(jìn)行總結(jié),以便可以從使用角度更好地感知Java并發(fā)編程帶來的效果,從而為后續(xù)更深入的理解Java并發(fā)機(jī)制進(jìn)行鋪墊。
Java多線程概述
在Java中使用多線程是提高程序并發(fā)響應(yīng)能力的重要手段,但同時(shí)它也是一把雙刃劍;如果使用不當(dāng)也很容易導(dǎo)致程序出錯(cuò),并且還很難直觀地找到問題。這是因?yàn)椋?)、線程運(yùn)行本身是由操作系統(tǒng)調(diào)度,具有一定的隨機(jī)性;2)、Java共享內(nèi)存模型在多線程環(huán)境下很容易產(chǎn)生線程安全問題;3)、不合理的封裝依賴,極容易導(dǎo)致發(fā)布對(duì)象的不經(jīng)意逸出。
所以,要用好多線程這把劍,就需要對(duì)Java內(nèi)存模型、線程安全問題有較深的認(rèn)識(shí)。但由于Java豐富的生態(tài),在實(shí)際研發(fā)工作中,需要我們自己進(jìn)行并發(fā)處理的場(chǎng)景大都被各類框架或組件給屏蔽了。這也是造成很多Java開發(fā)人員對(duì)并發(fā)編程意識(shí)淡薄的主要原因。
首先從Java內(nèi)存模型的角度理解下使用多線程編程最核心的問題,具體如下圖所示:
如上圖所示,在Java內(nèi)存模型中,對(duì)于用戶程序來說用得最頻繁的就是堆內(nèi)存和棧內(nèi)存,其中堆內(nèi)存主要存放對(duì)象及數(shù)組,例如由new()產(chǎn)生的實(shí)例。而棧內(nèi)存則主要是存儲(chǔ)運(yùn)行方法時(shí)所需的局部變量、操作數(shù)及方法出口等信息。
其中堆內(nèi)存是線程共享的,一個(gè)類被實(shí)例化后生成的對(duì)象、及對(duì)象中定義的成員變量可以被多個(gè)線程共享訪問,這種共享主要體現(xiàn)在多個(gè)線程同時(shí)執(zhí)行、同一個(gè)對(duì)象實(shí)例的某個(gè)方法時(shí),會(huì)將該方法中操作的對(duì)象成員變量分別以多個(gè)副本的方式拷貝到方法棧中進(jìn)行操作,而不是直接修改堆內(nèi)存中對(duì)象的成員變量值;線程操作完成后,會(huì)再次將修改后的變量值同步至堆內(nèi)存中的主內(nèi)存地址,并實(shí)現(xiàn)對(duì)其他線程的可見。
這個(gè)過程雖然看似行云流水,但在JVM中卻至少需要6個(gè)原子步驟才能完成,具體如下圖所示:
如上圖所示,在不考慮對(duì)共享變量進(jìn)行加鎖的情況下,堆內(nèi)存中一個(gè)對(duì)象的成員變量被線程修改大概需要以下6個(gè)步驟:
1、read(讀取):從堆內(nèi)存中的讀取要操作的變量;
2、load(載入):將讀取的變量拷貝到線程棧內(nèi)存;
3、use(使用):將棧內(nèi)存中的變量值傳遞給執(zhí)行引擎;
4、assign(賦值):將從執(zhí)行引擎得到的結(jié)果賦值給棧內(nèi)存中變量;
5、store(存儲(chǔ)):將變更后的棧內(nèi)存中的變量值傳遞到主內(nèi)存;
6、write(寫入):變更主內(nèi)存中的變量值,此時(shí)新值對(duì)所有線程可見;
由此可見,每個(gè)線程都可以按這幾個(gè)步驟并行操作同一個(gè)共享變量??上攵绻麤]有任何同步措施,那么在多線程環(huán)境下,該共享變量的值將變得飄忽不定,很難得到最終正確的結(jié)果。而這就是所謂的線程安全問題,也是我們?cè)谑褂枚嗑€程編程時(shí),最需要關(guān)注的問題!
線程池的使用
在實(shí)際場(chǎng)景中,多線程的使用并不是單打獨(dú)斗,線程作為寶貴的系統(tǒng)資源,其創(chuàng)建和銷毀都需要耗費(fèi)一定的系統(tǒng)資源;而無限制的創(chuàng)建線程資源,也會(huì)導(dǎo)致系統(tǒng)資源的耗盡。所以,為了重復(fù)使用線程資源、限制線程的創(chuàng)建行為,一般都會(huì)通過線程池來實(shí)現(xiàn)。以Java Web服務(wù)中使用最廣的Tomcat服務(wù)器舉例,為了并行處理網(wǎng)絡(luò)請(qǐng)求就使用了線程池,源碼示例如下:
- public boolean processSocket(SocketWrapperBase<S> socketWrapper,
- SocketEvent event, boolean dispatch) {
- try {
- if (socketWrapper == null) {
- return false;
- }
- SocketProcessorBase<S> sc = null;
- if (processorCache != null) {
- sc = processorCache.pop();
- }
- if (sc == null) {
- sc = createSocketProcessor(socketWrapper, event);
- } else {
- sc.reset(socketWrapper, event);
- }
- //這里通過線程池對(duì)線程執(zhí)行進(jìn)行管理
- Executor executor = getExecutor();
- if (dispatch && executor != null) {
- executor.execute(sc);
- } else {
- sc.run();
- }
- } catch (RejectedExecutionException ree) {
- getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
- return false;
- } catch (Throwable t) {
- ExceptionUtils.handleThrowable(t);
- // This means we got an OOM or similar creating a thread, or that
- // the pool and its queue are full
- getLog().error(sm.getString("endpoint.process.fail"), t);
- return false;
- }
- return true;
- }
上述代碼為Tomcat源碼使用線程池并發(fā)處理網(wǎng)絡(luò)請(qǐng)求的示例,這里以Tomcat為例,主要是因?yàn)榛赟pring Boot、Spring MVC開發(fā)的Web服務(wù)大都運(yùn)行在Tomcat容器,而對(duì)于線程、線程池使用的復(fù)雜度都被屏蔽在中間件和框架中了,所以很多同學(xué)雖然寫了不少Java代碼,但在業(yè)務(wù)研發(fā)中額外使用線程的場(chǎng)景可能并不多,舉這個(gè)例子的目的就是為了提升下并發(fā)編程的意識(shí)!
在Java中使用線程池的主要方式是Executor框架,該框架作為JUC并發(fā)包的一部分,為Java程序提供了一個(gè)靈活的線程池實(shí)現(xiàn)。其邏輯層次如下圖所示:
如圖所示,使用Executor框架,既可以通過直接自定義配置、擴(kuò)展ThreadPoolExecutor來創(chuàng)建一個(gè)線程池,也可以通過Executors類直接調(diào)用“newSingleThreadExecutor()、newFixedThreadPool()、newCachedThreadPool()”這三個(gè)方法來創(chuàng)建具有一定功能特征的線程池。
除此之外,也可以通過自定義配置、擴(kuò)展ScheduledThreadPoolExecutor來創(chuàng)建一個(gè)具有周期性、定時(shí)功能的線程池,例如線程10s后運(yùn)行、線程每分鐘運(yùn)行一次等。同樣,與ThreadPoolExecutor一樣,如果不想自定義配置,也可以通過Executors類直接調(diào)用“newScheduledThreadPool()、newSingleThreadScheduledExecutor()”這兩個(gè)方法來分別創(chuàng)建具備自動(dòng)線程規(guī)模擴(kuò)展能力和線程池中只允許有單個(gè)線程的特定線程池。
而ForkJoinPool是jdk1.8以后新增的一種線程池實(shí)現(xiàn)類型,類似于Fork-Join框架所支持的功能。這是一種可以將一個(gè)大任務(wù)拆分成多個(gè)任務(wù)隊(duì)列,并具體分配給不同線程處理的機(jī)制,而關(guān)鍵的特性在于,通過竊取算法,某個(gè)線程在執(zhí)行完本隊(duì)列任務(wù)后,可以竊取其他隊(duì)列的任務(wù)進(jìn)行執(zhí)行,從而最大限度提高線程的利用效率。
在實(shí)際應(yīng)用中,雖然可以通過Executors方便的創(chuàng)建單個(gè)線程、固定線程或具備自動(dòng)收縮能力的線程池,但一般還是建議直接通過ThreadPoolExecutor或ScheduledThreadPoolExecutor自定義配置,這主要是因?yàn)镋xecutors默認(rèn)創(chuàng)建的線程池,很多采用的是無界隊(duì)列,例如LinkedBlockingQueue,這樣線程就可以被無限制的添加都線程池的任務(wù)執(zhí)行隊(duì)列,如果請(qǐng)求量過大容易造成OOM。
接下來以一個(gè)實(shí)際的例子來演示通過ThreadPoolExecutor如何自定義配置一個(gè)業(yè)務(wù)線程池,具體如下:
1)、配置一個(gè)線程池類
- public final class SingleBlockPoolExecutor {
- /**
- * 自定義配置線程池(線程池核心線程數(shù)、最大線程數(shù)、存活時(shí)間設(shè)置、采用的隊(duì)列類型、線程工廠類、線程池拒絕處理類)
- */
- private final ThreadPoolExecutor pool = new ThreadPoolExecutor(30, 100, 5, TimeUnit.MINUTES,
- new ArrayBlockingQueue<Runnable>(100), new BlockThreadFactory(), new BlockRejectedExecutionHandler());
- public ThreadPoolExecutor getPool() {
- return pool;
- }
- private SingleBlockPoolExecutor() {
- }
- /**
- * 定義線程工廠
- */
- public static class BlockThreadFactory implements ThreadFactory {
- private AtomicInteger count = new AtomicInteger(0);
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r);
- String threadName = SingleBlockPoolExecutor.class.getSimpleName() + "-" + count.addAndGet(1);
- t.setName(threadName);
- return t;
- }
- }
- /**
- * 定義線程池拒絕機(jī)制處理類
- */
- public static class BlockRejectedExecutionHandler implements RejectedExecutionHandler {
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- try {
- //被拒線程再次返回阻塞隊(duì)列進(jìn)行等待處理
- executor.getQueue().put(r);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
- /**
- * 在靜態(tài)內(nèi)部類中持有單例類的實(shí)例,并且可直接被初始化
- */
- private static class Holder {
- private static SingleBlockPoolExecutor instance = new SingleBlockPoolExecutor();
- }
- /**
- * 調(diào)用getInstance方法,事實(shí)上是獲得Holder的instance靜態(tài)屬性
- *
- * @return
- */
- public static SingleBlockPoolExecutor getInstance() {
- return Holder.instance;
- }
- /**
- * 線程池銷毀方法
- */
- public void destroy() {
- if (pool != null) {
- //線程池銷毀
- pool.shutdownNow();
- }
- }
- }
如上述代碼所示,通過單例模式配置了一個(gè)線程池。在對(duì)ThreadPoolExecutor的配置中,需要設(shè)置“核心線程數(shù)、最大線程數(shù)、存活時(shí)間設(shè)置、采用的隊(duì)列類型、線程工廠類、線程池拒絕處理類”,這幾個(gè)核心參數(shù)。
2)、定義系統(tǒng)全局線程池管理類
- public class AsyncManager {
- /**
- * 任務(wù)處理公共線程池
- */
- public static final ExecutorService service = SingleBlockPoolExecutor.getInstance().getPool();
- }
在應(yīng)用中,除了框架定義的線程池外,如果自定義線程池,為了方便統(tǒng)一管理和使用,可以建立一個(gè)全局管理類,如上所示,該類通過靜態(tài)變量的方式初始化了前面我們所定義的線程池。
3)、業(yè)務(wù)中使用
- @Service
- @Slf4j
- public class OrderServiceImpl implements OrderService {
- @Override
- public CreateOrderBO createOrder(CreateOrderDTO createOrderDTO) {
- //1、同步處理核心業(yè)務(wù)邏輯
- log.info("同步處理業(yè)務(wù)邏輯");
- //2、通過線程池提交,異步處理非核心邏輯,例如日志埋點(diǎn)
- AsyncManager.service.execute(() -> {
- System.out.println("線程->" + Thread.currentThread().getName() + ",正在執(zhí)行異步日志處理任務(wù)");
- });
- return CreateOrderBO.builder().result(true).build();
- }
- }
如上代碼所示,業(yè)務(wù)中需要通過線程池異步處理時(shí),可以通過線程池管理類獲取對(duì)應(yīng)的線程池,并向其提交執(zhí)行線程任務(wù)。
FutureTask實(shí)現(xiàn)異步結(jié)果返回
在使用Thread或Runnable實(shí)現(xiàn)的線程處理中,一般是不能返回線程處理結(jié)果的。但如果希望在調(diào)用線程異步處理完成后,能夠獲得線程異步處理的結(jié)果,那么就可以通過FutureTask框架實(shí)現(xiàn)。示例代碼如下:
- @Service
- @Slf4j
- public class OrderServiceImpl implements OrderService {
- @Override
- public CreateOrderBO createOrder(CreateOrderDTO createOrderDTO) {
- //Future異步處理返回執(zhí)行結(jié)果
- //定義接收線程執(zhí)行結(jié)果的FutureTask對(duì)象
- List<Future<Integer>> results = Collections.synchronizedList(new ArrayList<>());
- //實(shí)現(xiàn)Callable接口定義線程執(zhí)行邏輯
- results.add(AsyncManager.service.submit(new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
- int a = 1, b = 2;
- System.out.println("Callable接口執(zhí)行中");
- return a + b;
- }
- }));
- //輸出線程返回結(jié)果
- for (Future<Integer> future : results) {
- try {
- //這里獲取結(jié)果,等待時(shí)間設(shè)置200毫秒
- System.out.println("a+b=" + future.get(200, TimeUnit.MILLISECONDS));
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- //判斷線程是否執(zhí)行完畢,完畢則獲取執(zhí)行結(jié)果
- return CreateOrderBO.builder().result(true).build();
- }
- }
如上述代碼,如果希望線程返回執(zhí)行結(jié)果,那么可以通過實(shí)現(xiàn)Callable接口定義線程類,并通過FutureTask接收線程處理結(jié)果。不過在實(shí)際使用時(shí),需要注意線程暫時(shí)未執(zhí)行完成情況下的業(yè)務(wù)處理邏輯。
CountDownLatch實(shí)現(xiàn)線程并行同步
在并發(fā)編程中,一個(gè)復(fù)雜的業(yè)務(wù)邏輯可以通過多個(gè)線程并發(fā)執(zhí)行來提高速度;但如果需要同步等待這些線程執(zhí)行完后才能進(jìn)行后續(xù)的邏輯,那么就可以通過CountDownLatch來實(shí)現(xiàn)對(duì)多個(gè)線程執(zhí)行的同步匯聚。其邏輯示意圖如下:
從原理上看CountDownLatch實(shí)際上是在其內(nèi)部創(chuàng)建并維護(hù)了一個(gè)volatile類型的整數(shù)計(jì)數(shù)器,當(dāng)調(diào)用countDown()方法時(shí),會(huì)嘗試將整數(shù)計(jì)數(shù)器-1,當(dāng)調(diào)用wait()方法時(shí),當(dāng)前線程就會(huì)判斷整數(shù)計(jì)數(shù)器是否為0,如果為0,則繼續(xù)往下執(zhí)行,如果不為0,則使當(dāng)前線程進(jìn)入阻塞狀態(tài),直到某個(gè)線程將計(jì)數(shù)器設(shè)置為0,才會(huì)喚醒在await()方法中等待的線程繼續(xù)執(zhí)行。
常見的代碼使用示例如下:
1)、創(chuàng)建執(zhí)行具體業(yè)務(wù)邏輯的線程處理類
- public class DataDealTask implements Runnable {
- private List<Integer> list;
- private CountDownLatch latch;
- public DataDealTask(List<Integer> list, CountDownLatch latch) {
- this.list = list;
- this.latch = latch;
- }
- @Override
- public void run() {
- try {
- System.out.println("線程->" + Thread.currentThread().getName() + ",處理" + list.size());
- } finally {
- //處理完計(jì)數(shù)器遞減
- latch.countDown();
- }
- }
- }
該線程處理類,在實(shí)例化時(shí)接收除了待處理數(shù)據(jù)參數(shù)外,還會(huì)接收CountDownLatch對(duì)象,在執(zhí)行完線程邏輯,注意,無論成功或失敗,都需要調(diào)用countDown()方法。
2)、具體的使用方法
- @Service
- @Slf4j
- public class OrderServiceImpl implements OrderService {
- @Override
- public CreateOrderBO createOrder(CreateOrderDTO createOrderDTO) {
- //CountDownLatch的使用示例
- //模擬待處理數(shù)據(jù)生成
- Integer[] array = {10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 101, 102};
- List<Integer> list = new ArrayList<>();
- Arrays.asList(array).stream().map(o -> list.add(o)).collect(Collectors.toList());
- //對(duì)數(shù)據(jù)進(jìn)行分組處理(5條記錄為1組)
- Map<String, List<?>> entityMap = this.groupListByAvg(list, 6);
- //根據(jù)數(shù)據(jù)分組數(shù)量,確定同步計(jì)數(shù)器的值
- CountDownLatch latch = new CountDownLatch(entityMap.size());
- Iterator<Entry<String, List<?>>> it = entityMap.entrySet().iterator();
- try {
- //將分組數(shù)據(jù)分批提交給不同線程處理
- while (it.hasNext()) {
- DataDealTask dataDealTask = new DataDealTask((List<Integer>) it.next().getValue(), latch);
- AsyncManager.service.submit(dataDealTask);
- }
- //等待分批處理線程處理完成
- latch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return CreateOrderBO.builder().result(true).build();
- }
- }
如上所示代碼,在業(yè)務(wù)邏輯中如果處理數(shù)據(jù)量多,則可以通過分組的方式并行處理,而等待所有線程處理完成后,再同步返回調(diào)用方。這種場(chǎng)景就可以通過CountDownLatch來實(shí)現(xiàn)同步!
CycliBarrier柵欄實(shí)現(xiàn)線程階段性同步
CountDownLatch的功能主要是實(shí)現(xiàn)線程的一次性同步。而在實(shí)際的業(yè)務(wù)場(chǎng)景中也可能存在這樣的情況,執(zhí)行一個(gè)階段性的任務(wù),例如”階段1->階段2->階段3->階段4->階段5"。那么在并發(fā)處理這個(gè)階段性任務(wù)時(shí),就要在每個(gè)階段設(shè)置柵欄,只有當(dāng)所有線程執(zhí)行到某個(gè)階段點(diǎn)之后,才能繼續(xù)推進(jìn)下一個(gè)階段任務(wù)的執(zhí)行,其邏輯如圖所示:
針對(duì)上述場(chǎng)景,就可以通過CycliBarrier來實(shí)現(xiàn)。而從實(shí)現(xiàn)上看,CyclicBarrier使用了基于ReentrantLock的互斥鎖實(shí)現(xiàn);在CyclicBarrier的內(nèi)部有一個(gè)計(jì)數(shù)器 count,當(dāng)count不為0時(shí),每個(gè)線程在到達(dá)同步點(diǎn)會(huì)先調(diào)用await方法將自己阻塞,并將計(jì)數(shù)器會(huì)減1,直到計(jì)數(shù)器減為0的時(shí)候,所有因調(diào)用await方法而被阻塞的線程就會(huì)被喚醒繼續(xù)執(zhí)行。并進(jìn)入下一輪阻塞,此時(shí)在new CyclicBarrier(parties) 時(shí)設(shè)置的parties值,會(huì)被賦值給 count 從而實(shí)現(xiàn)復(fù)用。
例如,計(jì)算某個(gè)部門的員工工資,要求在所有員工工資都計(jì)算完之后才能進(jìn)行下一步整合操作。其代碼示例如下:
- @Slf4j
- @Service
- public class SalaryStatisticServiceImpl implements SalaryStatisticService {
- /**
- * 模擬部門員工存儲(chǔ)數(shù)據(jù)
- */
- public static Map<String, List<EmployeeSalaryInfo>> employeeMap = Collections.synchronizedMap(new HashMap<>());
- static {
- EmployeeSalaryInfo employeeA = new EmployeeSalaryInfo();
- employeeA.setEmployeeNo("100");
- employeeA.setBaseSalaryAmount(10000);
- employeeA.setSubsidyAmount(3000);
- EmployeeSalaryInfo employeeB = new EmployeeSalaryInfo();
- employeeB.setEmployeeNo("101");
- employeeB.setBaseSalaryAmount(30000);
- employeeB.setSubsidyAmount(3000);
- List<EmployeeSalaryInfo> list = new ArrayList<>();
- list.add(employeeA);
- list.add(employeeB);
- employeeMap.put("10", list);
- }
- @Override
- public StatisticReportBO statisticReport(StatisticReportDTO statisticReportDTO) {
- //查詢部門下所有員工信息(模擬)
- List<EmployeeSalaryInfo> employeeSalaryInfos = employeeMap.get(statisticReportDTO.getDepartmentNo());
- if (employeeSalaryInfos == null) {
- log.info("部門員工信息不存在");
- return StatisticReportBO.builder().build();
- }
- //定義統(tǒng)計(jì)總工資的安全變量
- AtomicInteger totalSalary = new AtomicInteger();
- //開啟柵欄(在各線程觸發(fā)之后觸發(fā))
- CyclicBarrier cyclicBarrier = new CyclicBarrier(employeeSalaryInfos.size(), new Runnable() {
- //執(zhí)行順序-B1(隨機(jī))
- //該線程不會(huì)阻塞主線程
- @Override
- public void run() {
- log.info("匯總已分別計(jì)算出的兩個(gè)員工的工資->" + totalSalary.get() + ",執(zhí)行順序->B");
- }
- });
- //執(zhí)行順序-A
- for (EmployeeSalaryInfo e : employeeSalaryInfos) {
- AsyncManager.service.submit(new Callable<Integer>() {
- @Override
- public Integer call() {
- int totalAmount = e.getSubsidyAmount() + e.getBaseSalaryAmount();
- log.info("計(jì)算出員工{}", e.getEmployeeNo() + "的工資->" + totalAmount + ",執(zhí)行順序->A");
- //匯總總工資
- totalSalary.addAndGet(totalAmount);
- try {
- //等待其他線程同步
- cyclicBarrier.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (BrokenBarrierException e) {
- e.printStackTrace();
- }
- return totalAmount;
- }
- });
- }
- //執(zhí)行順序-A/B(之前或之后隨機(jī),totalSalary值不能保證一定會(huì)得到,所以CyclicBarrier更適合無返回的可重復(fù)并行計(jì)算)
- //封裝響應(yīng)參數(shù)
- StatisticReportBO statisticReportBO = StatisticReportBO.builder().employeeCount(employeeSalaryInfos.size())
- .departmentNo(statisticReportDTO.getDepartmentNo())
- .salaryTotalAmount(totalSalary.get()).build();
- log.info("封裝接口響應(yīng)參數(shù),執(zhí)行順序->A/B");
- return statisticReportBO;
- }
- @Data
- public static class EmployeeSalaryInfo {
- /**
- * 員工編號(hào)
- */
- private String employeeNo;
- /**
- * 基本工資
- */
- private Integer baseSalaryAmount;
- /**
- * 補(bǔ)助金額
- */
- private Integer subsidyAmount;
- }
- }
上述代碼的執(zhí)行結(jié)果如下:
- [kPoolExecutor-1] c.w.c.s.impl.SalaryStatisticServiceImpl : 計(jì)算出員工100的工資->13000,執(zhí)行順序-
- [kPoolExecutor-2] c.w.c.s.impl.SalaryStatisticServiceImpl : 計(jì)算出員工101的工資->33000,執(zhí)行順序-
- [kPoolExecutor-2] c.w.c.s.impl.SalaryStatisticServiceImpl : 匯總已分別計(jì)算出的兩個(gè)員工的工資->46000,
- [nio-8080-exec-2] c.w.c.s.impl.SalaryStatisticServiceImpl : 封裝接口響應(yīng)參數(shù),執(zhí)行順序->A/B
從上述結(jié)果可以看出,受CycliBarrier控制的線程會(huì)等待其他線程執(zhí)行完成后同步向后執(zhí)行,并且CycliBarrier并不會(huì)阻塞主線程,所以最后響應(yīng)參數(shù)封裝代碼可能在CycliBarrier匯總線程之前執(zhí)行,也可能在其之后執(zhí)行,使用時(shí)需要注意!
Semaphore(信號(hào)量)限制訪問資源的線程數(shù)
Semaphore可以實(shí)現(xiàn)對(duì)某個(gè)共享資源訪問線程數(shù)的限制,實(shí)現(xiàn)限流功能。以停車場(chǎng)線程為例,代碼如下:
- @Service
- @Slf4j
- public class ParkServiceImpl implements ParkService {
- /**
- * 模擬停車場(chǎng)的車位數(shù)
- */
- private static Semaphore semaphore = new Semaphore(2);
- @Override
- public AccessParkBO accessPark(AccessParkDTO accessParkDTO) {
- AsyncManager.service.execute(() -> {
- if (semaphore.availablePermits() == 0) {
- log.info(Thread.currentThread().getName() + ",車牌號(hào)->" + accessParkDTO.getCarNo() + ",車位不足請(qǐng)耐心等待");
- } else {
- try {
- //獲取令牌嘗試進(jìn)入停車場(chǎng)
- semaphore.acquire();
- log.info(Thread.currentThread().getName() + ",車牌號(hào)->" + accessParkDTO.getCarNo() + ",成功進(jìn)入停車場(chǎng)");
- //模擬車輛在停車場(chǎng)停留的時(shí)間(30秒)
- Thread.sleep(30000);
- //釋放令牌,騰出停車場(chǎng)車位
- semaphore.release();
- log.info(Thread.currentThread().getName() + ",車牌號(hào)->" + accessParkDTO.getCarNo() + ",駛出停車場(chǎng)");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- //封裝返回信息
- return AccessParkBO.builder().carNo(accessParkDTO.getCarNo())
- .currentPositionCount(semaphore.availablePermits())
- .isPermitAccess(semaphore.availablePermits() > 0 ? true : false).build();
- }
- }
上述代碼模擬停車場(chǎng)有2車位,并且每輛車進(jìn)入車場(chǎng)后會(huì)停留30秒,然后并行模擬3次停車請(qǐng)求,具體執(zhí)行效果如下:
- [kPoolExecutor-1] c.w.c.service.impl.ParkServiceImpl : SingleBlockPoolExecutor-1,車牌號(hào)->10,成功進(jìn)入停車場(chǎng) 順序->A
- [kPoolExecutor-2] c.w.c.service.impl.ParkServiceImpl : SingleBlockPoolExecutor-2,車牌號(hào)->20,成功進(jìn)入停車場(chǎng) 順序->A
- [kPoolExecutor-3] c.w.c.service.impl.ParkServiceImpl : SingleBlockPoolExecutor-3,車牌號(hào)->30,車位不足請(qǐng)耐心等待00,執(zhí)行順序->B
- [kPoolExecutor-1] c.w.c.service.impl.ParkServiceImpl : SingleBlockPoolExecutor-1,車牌號(hào)->10,駛出停車場(chǎng)
- [kPoolExecutor-2] c.w.c.service.impl.ParkServiceImpl : SingleBlockPoolExecutor-2,車牌號(hào)->20,駛出停車場(chǎng)
- [kPoolExecutor-4] c.w.c.service.impl.ParkServiceImpl : SingleBlockPoolExecutor-4,車牌號(hào)->30,成功進(jìn)入停車場(chǎng)
可以看到由于通過Semaphore限制了可允許進(jìn)入的線程數(shù)是2個(gè),所以第三次請(qǐng)求會(huì)被拒絕,直到前兩次請(qǐng)求通過.release()方法釋放證書后第4次請(qǐng)求才會(huì)被允許進(jìn)入!
后記
本文從應(yīng)用層面總結(jié)了,JVM基本的內(nèi)存模型以及線程對(duì)共享內(nèi)存操作的原子方式,并著重介紹了線程池、FutrueTask、CountDownLatch、CycliBarrier以及Semaphore這幾種在Java并發(fā)編程中經(jīng)常使用的JUC工具類。