自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Concurrent In Java

開發(fā) 后端
本篇文章一系列只是對JUC各個部分做了說明和介紹,希望對大家有所啟發(fā)。

這一系列只是對JUC各個部分做了說明和介紹,沒人深入原理!

concurrent并發(fā)包,讓你易于編寫并發(fā)程序。并發(fā)下我們經(jīng)常需要使用的基礎(chǔ)設(shè)施和解決的問題有ThreadPool、Lock、管道、集合點、線程之間等待和喚醒、線程間數(shù)據(jù)傳輸、共享資源訪問控制、并發(fā)線程之間的相互等待,等待。

concurrent提供的工具能夠解決絕大部分的場景,還能提高程序吞吐量。

現(xiàn)代的服務(wù)器多采用多核CPU,從而不同線程之間有可能真正地在同時運行而不是cpu時間切片。在處理大計算量的程序上要盡可能利用CPU多核特性,提高系統(tǒng)吞吐量。

并發(fā)編程主要面臨三個問題:

1.如何讓多個線程同時為同一個任務(wù)工作(并發(fā)編程設(shè)計)

2.多個線程之間對共享資源的爭用。

3.多個線程之間如何相互合作、傳遞數(shù)據(jù)。

1. concurrent包提供的集合

concurrent包直接提供了標準集合的一些實現(xiàn),在下面做簡單介紹。在大部分情況下可以使用它們提供高并發(fā)環(huán)境下對集合訪問的吞吐量。

1.1 ConcurrentHashMap

Map的一個并發(fā)實現(xiàn)。在多線程環(huán)境下,它具有很高的吞吐量和具備可靠的數(shù)據(jù)一致性。它支持并發(fā)讀和一定程度的并發(fā)修改(默認16個并發(fā),可以通過構(gòu)造函數(shù)修改)。

HashMap的實現(xiàn)是非線程安全的,高并發(fā)下會get方法常會死鎖,有的時候會表現(xiàn)為CPU居高不下。

  1. public V get(Object key) {  
  2. if (key == null)  
  3. return getForNullKey();  
  4. int hash = hash(key.hashCode());  
  5. for (Entry e = table[indexFor(hash, table.length)];  
  6. e != null;  
  7. e = e.next) {  
  8. Object k;  
  9. if (e.hash == hash && ((k = e.key) == key || key.equals(k)))  
  10. return e.value;  
  11. }  
  12. return null;  
  13. }  
  14.  

在get操作里面for循環(huán)取對象的操作,由于高并發(fā)同時讀寫,for循環(huán)的結(jié)果變得不可預(yù)知,所以有可能一直循環(huán)。

所以高并發(fā)環(huán)境下盡量不要直接使用HashMap,對系統(tǒng)造成的影響很難排除。

和Collections.synchronizedMap(new HashMap(...))相比,外ConcurrentHashMap在高并發(fā)的環(huán)境下有著更優(yōu)秀的吞吐量。因為ConcurrentHashMap可以支持寫并發(fā),基本原理是內(nèi)部分段,分段的數(shù)量決定著并發(fā)程度。通過concurrencyLevel參數(shù)可以設(shè)置。如果你能預(yù)期并發(fā)數(shù)量那么設(shè)置該參數(shù)可以獲取更優(yōu)吞吐量。

另外為ConcurrentHashMap還實現(xiàn)了:

V putIfAbsent(K key, V value);

boolean remove(Object key, Object value);

boolean replace(K key, V oldValue, V newValue);

V replace(K key, V value);

這四個一致性的操作方法。

1.2 BlockingQueue

BlockingQueue定義了一個接口,繼承了Queue接口。Queue是一種數(shù)據(jù)結(jié)構(gòu),意思是它的項以先入先出(FIFO)順序存儲。

BlockingQueue為我們提供了一些多線程阻塞語義的方法,新增和重定義了一些方法插入:

 

 

BlockingQueue是線程安全的,非常適合多個生產(chǎn)者和多個消費者線程之間傳遞數(shù)據(jù)。

形象地理解,BlockingQueue好比有很多格子的傳輸帶系統(tǒng),不過當你(生產(chǎn)者)調(diào)用put方法的時候,如果有空閑的格子那么放入物體后立刻返回,如果沒有空閑格子那么一直處于等待狀態(tài)。add方法意味著如果沒有空閑格子系統(tǒng)就會報警,然后如果處理該報警則按照你的意愿。offer方法優(yōu)先于add方法,它通過返回true 或 flase來告訴你是否放入成功。offer超時方法,如果不空閑的情況下,嘗試等待一段時間。

BlockingQueue有很多實現(xiàn)ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue

補充Dueue是個雙向隊列,可以當做堆棧來使用。

BlockingQueue在ThreadPool中,作為任務(wù)隊列來使用,用來保存沒有立刻執(zhí)行的工作任務(wù)對象。

1.3 SynchronousQueue

SychronousQueue是BlockingQueue的一個實現(xiàn),它看起來是一個隊列,但是其實沒有容量,是特定條件下的一個精簡實現(xiàn)。

做個比喻,SychronousQueue對象就像一個接力棒,現(xiàn)在有兩個運動員交棒者和接棒者(線程)要做交接。在交接點,交棒者沒有交出之前是不能松開的(一種等待狀態(tài)),接棒者在接到棒之前是必須等待。換一句話說不管誰先到交接點,必須處于等待狀態(tài)。

在生產(chǎn)者和消費者模型中。如果生產(chǎn)者向SychronousQueue進行put操作,直到有另外的消費者線程進行take操作時才能返回。對消費者也是一樣,take操作會被阻塞,直到生產(chǎn)者put。

在這種生產(chǎn)者-消費者模型下,生產(chǎn)者和消費者是進行手對手傳遞產(chǎn)品,在消費者消費一個產(chǎn)品之前,生產(chǎn)者必須處于等待狀態(tài)。它給我們提供了在線程之間交換單一元素的極輕量級方法,并且具有阻塞語義。

提示:上面舉例中有寫局限性。其實生產(chǎn)者和消費者進程是可以任意數(shù)量的。M:N。生產(chǎn)線程之間會對SychronousQueue進行爭用,消費者也是一樣。

對SychronousQueue類似于其他語境中“會合通道”或 “連接”點問題。它非常適合于傳遞性設(shè)計,在這種設(shè)計中,在一個線程中運行的對象要將某些信息、事件或任務(wù)傳遞給在另一個線程中運行的對象,它就必須與該對象同步。

1.4Exchanger

是SychronousQueue的雙向?qū)崿F(xiàn)。用來伙伴線程間交互對象。Exchanger 可能在比如遺傳算法和管道設(shè)計中很有用。

形象地說,就是兩個人在預(yù)定的地方交互物品,任何一方?jīng)]到之前都處于等待狀態(tài)。

1.5 CopyOnWriteArrayList 和 CopyOnWriteArraySet

它們分別是List接口和Set接口的實現(xiàn)。正如類名所描述的那樣,當數(shù)據(jù)結(jié)構(gòu)發(fā)生變化的時候,會復(fù)制自身的內(nèi)容,來保證一致性。大家都知道復(fù)制全部副本是非常昂貴的操作,看來這是一個非常不好的實現(xiàn)。事實上沒有最好和最差的方案,只有最合適的方案。一般情況下,處理多線程同步問題,我們傾向使用同步的 ArrayList,但同步也有其成本。

那么在什么情況下使用CopyOnWriteArrayList 或者CopyOnWriteArraySet呢?

數(shù)據(jù)量小。

對數(shù)據(jù)結(jié)構(gòu)的修改是偶然發(fā)生的,相對于讀操作。

舉例來說,如果我們實現(xiàn)觀察者模式的話,作為監(jiān)聽器集合是非常合適的。

1.6 TimeUnit

雖然是個時間單位,但是它也是concurrent包里面的。也許你以前的代碼里面經(jīng)常出現(xiàn)1*60*1000來表示一分鐘,代碼可讀性很差。現(xiàn)在你可以通過TimeUnit來編寫可讀性更好的代碼,concurrent的api里面涉及到時間的地方都會使用該對象。

我之所以先進并發(fā)框架常用的集合,是因為線程池的實現(xiàn)特性都利用了BlockingQueue的一些特性。

#p#

2. ThreadPool

雖然線程和進程相比是輕量級許多,但是線程的創(chuàng)建成本還是不可忽律,所以就有了線程池化的設(shè)計。線程池的創(chuàng)建、管理、回收、任務(wù)隊列管理、任務(wù)分配等細節(jié)問題依然負責,沒有必要重復(fù)發(fā)明輪子,concurrent包已經(jīng)為我們準備了一些優(yōu)秀線程池的實現(xiàn)。

2.1 認識ExecutorService 接口

ExecutorService 接口,它能提供的功能就是用來在將來某一個時刻異步地執(zhí)行一系列任務(wù)。雖然簡單一句話,但是包含了很多需求點。它的實現(xiàn)至少包含了線程池和任務(wù)隊列兩個方面,其實還包括了任務(wù)失敗處理策略等。

 

 

經(jīng)常使用submit方法,用來提交任務(wù)對象。

簡單的例子:

ExecutorService es = Executors.newCachedThreadPool();

es.submit(new Runnable(){

@Override

public void run() {

System.out.println("do some thing");

}

});

es.shutdown();

上面的例子只是完成了提交了一個任務(wù),異步地去執(zhí)行它。但是有些使用場景更為復(fù)雜,比如等待獲得異步任務(wù)的返回結(jié)果,或者最多等上固定的時間。

submit 方法返回一個對象,F(xiàn)uture??雌饋碛悬c別扭,代表將來的對象。其實看一下Future的方法就明白了。

 

其實Future對象代表了一個異步任務(wù)的結(jié)果,可以用來取消任務(wù)、查詢?nèi)蝿?wù)狀態(tài),還有通過get方法獲得異步任務(wù)返回的結(jié)果。當調(diào)用get方法的時候,當前線程被阻塞直到任務(wù)被處理完成或者出現(xiàn)異常。

 

我們可以通過保存Future對象來跟蹤查詢異步任務(wù)的執(zhí)行情況。

顯然Runnable接口中定義的 public void run();方法并不能返回結(jié)果對象,所以concurrent包提供了Callable接口,它可以被用來返回結(jié)果對象。

2.2 ThreadPoolExecutor

ThreadPoolExecutor實現(xiàn)了ExecutorService 接口,也是我們最主要使用的實現(xiàn)類。

首先非常有必要看一些類的最完整的構(gòu)造函數(shù)

 

  1. ThreadPoolExecutor(int corePoolSize,  
  2.  
  3.    int maximumPoolSize,  
  4.  
  5. long keepAliveTime,  
  6.  
  7. TimeUnit unit,  
  8.  
  9. BlockingQueue workQueue,  
  10.  
  11. ThreadFactory threadFactory,  
  12.  
  13. RejectedExecutionHandler handler)  
  14.  

 

ThreadPoolExecutor對象中有個poolSize變量表示當前線程池中正在運行的線程數(shù)量。

注意:這個有關(guān)非常重要的關(guān)系,常常被誤解。poolSize變量和corePoolSize、maximumPoolSize以及workQueue的關(guān)系。

首先線程池被創(chuàng)建初期,還沒有執(zhí)行任何任務(wù)的時候,poolSize等于0;

每次向線程池提交任務(wù)的時候,線程池處理過程如下:

1. 如果poolSize少于 corePoolSize,則首選添加新的線程,而不進行排隊。

2. 如果poolSize等于或多于 corePoolSize,則首選將請求加入隊列workQueue,而不添加新的線程。

3. 如果第二步執(zhí)行失敗(隊已滿),則創(chuàng)建新的線程執(zhí)行任務(wù),但是如果果poolSize已經(jīng)達到maximumPoolSize,那么就拒絕該任務(wù)。如果處理被拒絕的任務(wù)就取決于RejectedExecutionHandler handler的設(shè)置了,默認情況下會拋出異常。

系統(tǒng)存在四種任務(wù)拒絕策略:

在默認的 ThreadPoolExecutor.AbortPolicy 中,處理程序遭到拒絕將拋出運行時 RejectedExecutionException。

在 ThreadPoolExecutor.CallerRunsPolicy 中,線程調(diào)用運行該任務(wù)的 execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務(wù)的提交速度。

在 ThreadPoolExecutor.DiscardPolicy 中,不能執(zhí)行的任務(wù)將被刪除。

在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果執(zhí)行程序尚未關(guān)閉,則位于工作隊列頭部的任務(wù)將被刪除,然后重試執(zhí)行程序(如果再次失敗,則重復(fù)此過程)。

keepAliveTime活動線程如果空閑一段時間是否可以回收,通常只作用于超出corePoolSize的線程。corePoolSize的線程創(chuàng)建了就不會被回收。但是到j(luò)ava 6 之后增加了public void allowCoreThreadTimeOut(boolean value)方法,允許core進程也可以根據(jù)keepAliveTime來回收,默認為false。

決定線程池特性的還有workQueue的實現(xiàn)類,有三種類SynchronousQueue、LinkedBlockingQueue、ArrayBlockingQueue,分別對應(yīng)同步隊列、無界隊列、有界隊列。

(摘自JavaDoc)

類SynchronousQueue,直接提交。工作隊列的默認選項是 SynchronousQueue,它將任務(wù)直接提交給線程而不保持它們。在此,如果不存在可用于立即運行任務(wù)的線程,則試圖把任務(wù)加入隊列將失敗,因此會構(gòu)造一個新的線程。此策略可以避免在處理可能具有內(nèi)部依賴性的請求集時出現(xiàn)鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務(wù)(設(shè)置maximumPoolSizes 為Integer.MAX_VALUE)。當命令以超過隊列所能處理的平均數(shù)連續(xù)到達時,此策略允許無界線程具有增長的可能性。

LinkedBlockingQueue,無界隊列。使用無界隊列(例如,不具有預(yù)定義容量的 LinkedBlockingQueue)將導(dǎo)致在所有 corePoolSize 線程都忙時新任務(wù)在隊列中等待。這樣,創(chuàng)建的線程就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當每個任務(wù)完全獨立于其他任務(wù),即任務(wù)執(zhí)行互不影響時,適合于使用無界隊列;例如,在 Web 頁服務(wù)器中。這種排隊可用于處理瞬態(tài)突發(fā)請求,當命令以超過隊列所能處理的平均數(shù)連續(xù)到達時,此策略允許無界線程具有增長的可能性。

ArrayBlockingQueue,有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(如 ArrayBlockingQueue)有助于防止資源耗盡,但是可能較難調(diào)整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低 CPU 使用率、操作系統(tǒng)資源和上下文切換開銷,但是可能導(dǎo)致人工降低吞吐量。如果任務(wù)頻繁阻塞(例如,如果它們是 I/O 邊界),則系統(tǒng)可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調(diào)度開銷,這樣也會降低吞吐量。

綜上:構(gòu)造參數(shù)的設(shè)置是互相制約和影響的。只有當你重復(fù)了解其相互關(guān)系的時候、或有特殊需求的時候,才可以自己構(gòu)造ThreadPoolExecutor對象,否則可以使用Executores是個工廠類。

提示使用線程池是注意處理shutdown,確保你系統(tǒng)關(guān)閉的時候主動關(guān)閉shutdown。

2.3 ScheduledExecutorService

擴展了ExecutorService接口,提供時間排程的功能。

 

 

schedule方法被用來延遲指定時間來執(zhí)行某個指定任務(wù)。如果你需要周期性重復(fù)執(zhí)行定時任務(wù)可以使用scheduleAtFixedRate或者scheduleWithFixedDelay方法,它們不同的是前者以固定頻率執(zhí)行,后者以相對固定頻率執(zhí)行。

(感謝wenbois2000 提出原先的錯誤,我在這里重新描述!對于原先的錯誤,實在不好意思啊,再次感謝!)不管任務(wù)執(zhí)行耗時是否大于間隔時間,scheduleAtFixedRate和scheduleWithFixedDelay都不會導(dǎo)致同一個任務(wù)并發(fā)地被執(zhí)行。唯一不同的是scheduleWithFixedDelay是當前一個任務(wù)結(jié)束的時刻,開始結(jié)算間隔時間,如0秒開始執(zhí)行第一次任務(wù),任務(wù)耗時5秒,任務(wù)間隔時間3秒,那么第二次任務(wù)執(zhí)行的時間是在第8秒開始。

ScheduledExecutorService的實現(xiàn)類,是ScheduledThreadPoolExecutor。ScheduledThreadPoolExecutor對象包含的線程數(shù)量是沒有可伸縮性的,只會有固定數(shù)量的線程。不過你可以通過其構(gòu)造函數(shù)來設(shè)定線程的優(yōu)先級,來降低定時任務(wù)線程的系統(tǒng)占用。

特別提示:通過ScheduledExecutorService執(zhí)行的周期任務(wù),如果任務(wù)執(zhí)行過程中拋出了異常,那么過ScheduledExecutorService就會停止執(zhí)行任務(wù),且也不會再周期地執(zhí)行該任務(wù)了。所以你如果想保住任務(wù)都一直被周期執(zhí)行,那么catch一切可能的異常。

2.4 Executors

Executores是個工廠類,用來生成ThreadPoolExecutor對象,它提供了一些常用的線程池配置方案,滿足我們大部分場景。

1. newCachedThreadPool

 

  1. public static ExecutorService newCachedThreadPool() {  
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
  3. 60L, TimeUnit.SECONDS,  
  4. new SynchronousQueue());  
  5. }  
  6.  

 

分析下出這個線程池配置的工作模式,當沒有空閑進程時就新建線程執(zhí)行,當有空閑線程時就使用空閑線程執(zhí)行。當線程空閑大60秒時,系統(tǒng)自動回收線程。

該線程池非常適合執(zhí)行短小異步任務(wù)時吞吐量非常高,會重復(fù)利用CPU的能力。但是如果任務(wù)處理IO邊界任務(wù),那么會消耗大量線程切換,降低系統(tǒng)吞吐量。所以執(zhí)行短小的計算任務(wù)非常高效,且當沒有任務(wù)時不會消耗系統(tǒng)資源。

注意:線程池中沒有變量表示線程是否空閑。那么程序是如何控制的呢?不得不贊嘆concurrent實現(xiàn)的非常精巧。當創(chuàng)建出來的線程完成原來的任務(wù)后,會調(diào)用BlockingQueue的Poll方法,對于SynchronousQueue實現(xiàn)而言會阻塞調(diào)用線程,直到另外的線程offer調(diào)用。

然而ThreadPool在分配任務(wù)的時候總是先去嘗試調(diào)用offer方法,所以就會觸發(fā)空閑線程再次調(diào)用。

精妙的是ThreadPoolExecutor的處理邏輯一樣,但是用BlockingQueue實現(xiàn)變了就產(chǎn)生不同的行為。

2. newFixedThreadPool

 

  1. public static ExecutorService newFixedThreadPool(int nThreads) {  
  2.  
  3. return new ThreadPoolExecutor(nThreads, nThreads,  
  4.  
  5. 0L, TimeUnit.MILLISECONDS,  
  6.  
  7. new LinkedBlockingQueue());  
  8.  
  9. }  
  10.  

 

創(chuàng)建固定線程數(shù)量的線程池,采用無界隊列,當有更多任務(wù)的時候?qū)⒈环湃牍ぷ麝犃兄信抨?。如果線程池不經(jīng)常執(zhí)行任務(wù)時,你可以調(diào)用allowCoreThreadTimeOut(boolean value)的方法讓系統(tǒng)自動回收core的進程,以節(jié)約系統(tǒng)資源。

3. newSingleThreadExecutor

 

  1. public static ExecutorService newSingleThreadExecutor() {  
  2.  
  3. return new FinalizableDelegatedExecutorService  
  4.  
  5. (new ThreadPoolExecutor(11,  
  6.  
  7. 0L, TimeUnit.MILLISECONDS,  
  8.  
  9. new LinkedBlockingQueue()));  
  10.  
  11. }  
  12.  

只有一個工作線程的線程池。和newFixedThreadPool(1)相比,不同之處有兩點:

1. 不可以重新配置newSingleThreadExecutor創(chuàng)建出來的線程池。

2. 當創(chuàng)建出來的線程池對象被GC回收時,會自動調(diào)用shutdown方法。

4.newScheduledThreadPool

 

  1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {  
  2.        return new ScheduledThreadPoolExecutor(corePoolSize);  
  3.    }  

 

生成一個可以執(zhí)行時間調(diào)度的線程池。其實內(nèi)部使用無界工作隊列,線程數(shù)量最多能達到corePoolSize。

2.5 ExecutorCompletionService

這是個巧妙的設(shè)計,內(nèi)部維護了一已經(jīng)完成了任務(wù)結(jié)果隊列,通過take方法可以同步地等待一個個結(jié)果對象。

詳情見http://www.oschina.net/uploads/doc/javase-6-doc-api-zh_CN/java/util/concurrent/ExecutorCompletionService.html

#p#

3. java.util.concurrent.locks

java 早期內(nèi)置synchronized關(guān)鍵字解決多線程對共享資源訪問的一些問題,和其還配套了Object的notify 和 wait方法,用來控制線程之間的同步。

concurrent軟件包提供了更為高級和抽象的Lock工具,能解決更多的問題。

Lock是控制多個線程對共享資源進行訪問的工具。通常Lock限定多線程對同一個共享資源訪問的限制,一次只允許一個線程獲得Lock,即獲得對共享資源的訪問權(quán)限,線程間是互斥的。但是也有一些鎖如果ReadWriteLock是允許部分線程同時訪問共享資源的。

幾個術(shù)語:

爭用:當多個Thread在同一時間內(nèi)(相對概念)想要占有同一個Lock對象。那么JVM會調(diào)度解決爭用。

獲取順序:當多個線程爭用同一個Lock對象,那么JVM就要決定哪個線程將會獲得鎖權(quán)限。存在兩種模式:公平和不公平。 默認都是不公平模式,包括synchronized關(guān)鍵字,jvm決定順序的時候也是采用不公平策略。因為公平策略需要系統(tǒng)記錄大量輔助信息來判斷分配順序,而不公平策略由JVM決定一直快速高效的算法來分配Lock。所以不公平策略的系統(tǒng)吞吐量會比較高(花費更少的空間和計算在分配上),如果沒有特殊需要則默認采用不公平策略。

重入:當前線程獲取指定的鎖對象權(quán)限后,還可以再次獲取該鎖。Lock內(nèi)部會有一個計數(shù)器來表明當前線程獲取了該鎖的數(shù)量。如果一個線程獲取了一個鎖兩次,那么線程必須釋放鎖兩次,才能被看作完全釋放了該鎖,所以編程的時候一定要注意使用重入。synchronized關(guān)鍵字也是支持重入語義的。

3.1 Lock & ReentrantLock

ReentrantLock實現(xiàn)了Lock接口,一個可重入(reentrant)的互斥鎖 Lock,它具有與使用 synchronized 方法和語句所訪問的隱式監(jiān)視器鎖相同的一些基本行為和語義,但功能更強大。

摘自JavaDoc的一段獲取規(guī)則 “當鎖沒有被另一個線程所擁有時,調(diào)用 lock 的線程將成功獲取該鎖并返回。如果當前線程已經(jīng)擁有該鎖,此方法將立即返回。ReentrantLock 將由最近成功獲得鎖,并且還沒有釋放該鎖的線程所擁有?!?/P>

經(jīng)典使用方法。

 

  1. public void m() {   
  2.     lock.lock();  // block until condition holds  
  3.     try {  
  4.       // ... method body  
  5.     } finally {  
  6.       lock.unlock()  
  7.     }  
  8.   }  
  9.  

 

ReentrantLock除了實現(xiàn)了Lock規(guī)定的方法外,還實現(xiàn)了tryLock、isLocked 等方法,幫助你實現(xiàn)更多的場景。

Condition

和Object的wait和notify方法類似。ReentrantLock對象附加了Conditon對象,用來完成掛起和喚醒操作,使用lock.newCondition() 方法生成。

一個來自JKD的例子:

 

  1. class BoundedBuffer {  
  2.   final Lock lock = new ReentrantLock();  
  3.   final Condition notFull  = lock.newCondition();   
  4.   final Condition notEmpty = lock.newCondition();   
  5.  
  6.   final Object[] items = new Object[100];  
  7.   int putptr, takeptr, count;  
  8.  
  9.   public void put(Object x) throws InterruptedException {  
  10.     lock.lock();  
  11.     try {  
  12.       while (count == items.length)   
  13.         notFull.await();  
  14.       items[putptr] = x;   
  15.       if (++putptr == items.length) putptr = 0;  
  16.       ++count;  
  17.       notEmpty.signal();  
  18.     } finally {  
  19.       lock.unlock();  
  20.     }  
  21.   }  
  22.  
  23.   public Object take() throws InterruptedException {  
  24.     lock.lock();  
  25.     try {  
  26.       while (count == 0)   
  27.         notEmpty.await();  
  28.       Object x = items[takeptr];   
  29.       if (++takeptr == items.length) takeptr = 0;  
  30.       --count;  
  31.       notFull.signal();  
  32.       return x;  
  33.     } finally {  
  34.       lock.unlock();  
  35.     }  
  36.   }   
  37. }  

 

利用Conditon對象可以讓所有對同一個鎖對象進行爭用的Thread之間進行同步。

Lock VS synchronized

除非你有明確的需求或者并發(fā)遇到瓶頸的時候再決定使用ReentrantLock。synchronized在大部分時候還是可以工作的很好,jvm會自動處理和回收鎖。

ReentrantLock提供了更多的選擇和狀態(tài)信息。和

3.2 ReadWriteLock & ReentrantReadWriteLock

列舉一個場景對象X,擁有方法a、b、c。a和b方法不改表X的內(nèi)部狀態(tài),c改變內(nèi)部狀態(tài)。在多線程環(huán)境下,我們要求只讀和寫(變更狀態(tài))是不能同時進行的,而只讀操作是可以同時并發(fā)的,且實際運行過程中讀操作數(shù)量遠遠大于寫操作的數(shù)量。

如果用synchronized關(guān)鍵字的話,兩個只讀方法a、b也會互斥,并發(fā)性能收到限制。

那么這個情況下ReadWriteLock就非常有用,使用也非常簡單。

 

  1. class RWDictionary {  
  2.    private final Map m = new TreeMap();  
  3.    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();  
  4.    private final Lock r = rwl.readLock();  
  5.    private final Lock w = rwl.writeLock();  
  6.  
  7.    public Data get(String key) {  
  8.        r.lock();  
  9.        try { return m.get(key); }  
  10.        finally { r.unlock(); }  
  11.    }  
  12.    public String[] allKeys() {  
  13.        r.lock();  
  14.        try { return m.keySet().toArray(); }  
  15.        finally { r.unlock(); }  
  16.    }  
  17.    public Data put(String key, Data value) {  
  18.        w.lock();  
  19.        try { return m.put(key, value); }  
  20.        finally { w.unlock(); }  
  21.    }  
  22.    public void clear() {  
  23.        w.lock();  
  24.        try { m.clear(); }  
  25.        finally { w.unlock(); }  
  26.    }  
  27. }  
  28.  

 

要記得write鎖是獨占的,它一樣可以使用ReentrantLock的Condition功能。

使用任何的鎖都要通過try catch 或者 finally 來處理異常,避免忘記unlock。

#p#

4. 同步輔助類

你提交了一些任務(wù),但你想等它們都完成了再做另外一些事情;你提交了一些任務(wù),但是不想讓它們立刻執(zhí)行,等你喊123開始的時候,它們才開始執(zhí)行;等等這些場景,線程之間需要相互配合,或者等待某一個條件成熟執(zhí)行。這些場景想你就需要用到同步輔助類。

4.1 CountDownLatch

CountDownLatch 內(nèi)部有個計數(shù)器,通過構(gòu)造函數(shù)來指定。這個類就好比是倒計時的電子牌,當?shù)褂嫊r為0的時候就可以一起做一些事情。

摘自JavaDoc的方法介紹

 

 

摘自JavaDoc的例子

 

  1. class Driver { // ...  
  2.   void main() throws InterruptedException {  
  3.     CountDownLatch startSignal = new CountDownLatch(1);  
  4.     CountDownLatch doneSignal = new CountDownLatch(N);  
  5.  
  6.     for (int i = 0; i < N; ++i) // create and start threads  
  7.       new Thread(new Worker(startSignal, doneSignal)).start();  
  8.  
  9.     doSomethingElse();            // don't let run yet  
  10.     startSignal.countDown();      // let all threads proceed  
  11.     doSomethingElse();  
  12.     doneSignal.await();           // wait for all to finish  
  13.   }  
  14. }  
  15.  
  16. class Worker implements Runnable {  
  17.   private final CountDownLatch startSignal;  
  18.   private final CountDownLatch doneSignal;  
  19.   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {  
  20.      this.startSignal = startSignal;  
  21.      this.doneSignal = doneSignal;  
  22.   }  
  23.   public void run() {  
  24.      try {  
  25.        startSignal.await();  
  26.        doWork();  
  27.        doneSignal.countDown();  
  28. catch (InterruptedException ex) {} // return;  
  29.   }  
  30.  
  31.   void doWork() { ... }  
  32. }  
  33.  

 

當CountDownLatch(1)的時候,它就好比是個信號槍了。

4.2 CyclicBarrier

  1. new CyclicBarrier(N,   
  2.             new Runnable() {  
  3.               public void run() {   
  4.                mergeRows(...);   
  5.     }  
  6. }); 

這個同步輔助類,它讓多個線程可以在多個屏障點進行等待,所以叫cyclic,而且有個附加選擇你可以在線程到達屏障點后執(zhí)行一個任務(wù)(在釋放其他線程之前)

 

 

為了幫助你理解,假設(shè)一個場景。

有一個任務(wù),A、B、C分別從三個倉庫(甲乙丙)搬運不同3個不同的零件到客戶X的公司,然后再一起組裝機器,完成后一起坐車去公司總部。

這個任務(wù)需要ABC三個線程同時進行,但是由于從倉庫到客戶X那邊距離不等、交通狀態(tài)未知的情況下,所花費的時間也不等。同時由于三個人負責的零件不同,所以安裝機器的時候花費時間也不一樣。這個場景中有兩個需要線程間等待的地方。CyclicBarrier就可以閃亮登場了。

 

  1. public class Main3 {  
  2.  
  3. public static void main(String[] args) {  
  4.  
  5. CyclicBarrier barrier = new CyclicBarrier(3,new Runnable() {  
  6.  
  7. @Override 
  8.  
  9. public void run() {  
  10.  
  11. System.out.println("到達公共屏障點");  
  12.  
  13. }  
  14.  
  15. });  
  16.  
  17. ExecutorService es = Executors.newCachedThreadPool();  
  18.  
  19. es.submit(new Worker("A"50008000, barrier));  
  20.  
  21. es.submit(new Worker("B"200016000, barrier));  
  22.  
  23. es.submit(new Worker("C"90002000, barrier));  
  24.  
  25. es.shutdown();  
  26.  
  27. }  
  28.  
  29. static class Worker implements Runnable {  
  30.  
  31. String name;  
  32.  
  33. int t1;// 搬運零件所需要的時間  
  34.  
  35. int t2;// 參與組裝工作需要的時間  
  36.  
  37. CyclicBarrier barrier;  
  38.  
  39. public Worker(String name, int t1, int t2, CyclicBarrier barrier) {  
  40.  
  41. super();  
  42.  
  43. this.name = name;  
  44.  
  45. this.t1 = t1;  
  46.  
  47. this.t2 = t2;  
  48.  
  49. this.barrier = barrier;  
  50.  
  51. }  
  52.  
  53. @Override 
  54.  
  55. public void run() {  
  56.  
  57. try {  
  58.  
  59. print(name + " 開始搬運零件");  
  60.  
  61. Thread.sleep(t1);// 模擬搬運時間  
  62.  
  63. print(name + " 到達目的地");  
  64.  
  65. int a = barrier.await(); // 等待其他人  
  66.  
  67. if(a==0){  
  68.  
  69. //說明是最后一個到的可以執(zhí)行特殊操作  
  70.  
  71. }  
  72.  
  73. print(name + " 開始組裝機器");  
  74.  
  75. Thread.sleep(t2);// 模擬組裝時間.  
  76.  
  77. print(name + " 完成組裝機器");  
  78.  
  79. barrier.await(); // 等待其他人組裝完畢  
  80.  
  81. print(name + " 一起回總公司");  
  82.  
  83. catch (Exception e) {  
  84.  
  85. e.printStackTrace();  
  86.  
  87. }  
  88.  
  89. }  
  90.  
  91. }  
  92.  
  93. static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");  
  94.  
  95. static void print(String x) {  
  96.  
  97. System.out.println( sdf.format(new Date()) + ": "+x);  
  98.  
  99. }  
  100.  
  101. }  

 

4.3 Semaphore

一個經(jīng)典的信號量計數(shù)器。一般被用來控制對共享資源同時訪問線程數(shù)量的控制。

特殊情況下信號量設(shè)置為1,那么就類似互斥鎖的功能。

此類的構(gòu)造方法可選地接受一個公平 參數(shù)。當設(shè)置為 false 時,此類不對線程獲取鎖的順序做任何保證。和之前提到的爭用獲取順序一樣,在非公平模式下,系統(tǒng)將獲得更好的吞吐量,jvm也會保證在非公平模式下讓所有線程得到訪問機會。

【編輯推薦】

  1. java.util.concurrent 您不知道的 5 件事
  2. util.concurrent移植到C#
責任編輯:金賀 來源: JavaEye博客
相關(guān)推薦

2013-02-26 09:23:16

JavaJava類接口

2021-11-17 07:44:29

React 前端 組件

2010-07-12 10:03:50

ibmdwjava

2009-08-14 14:50:41

util.concur

2021-01-13 11:29:43

Python多線程異步

2024-01-17 12:44:23

Python并發(fā)編程

2024-12-24 08:03:56

2020-11-06 13:25:38

React Concu

2013-03-22 09:56:29

大數(shù)據(jù)應(yīng)用框架MapReduce

2024-10-31 09:30:05

線程池工具Java

2022-06-15 07:32:35

Lock線程Java

2025-03-28 04:00:00

互聯(lián)網(wǎng)Java讀操作

2009-07-09 10:28:19

線程池JDK5

2022-12-15 19:27:33

多線程代碼性能

2021-06-07 14:04:13

并發(fā)編程Future

2013-04-03 11:07:46

JavaJava線程

2009-06-18 08:51:03

Spring3.0 M

2020-07-08 12:05:55

Java線程池策略

2021-01-20 08:36:15

工具AtomicRefer JDK

2025-04-03 07:41:55

API阻塞隊列數(shù)據(jù)
點贊
收藏

51CTO技術(shù)棧公眾號