Concurrent In Java
這一系列只是對JUC各個部分做了說明和介紹,沒人深入原理!
concurrent并發(fā)包,讓你易于編寫并發(fā)程序。并發(fā)下我們經(jīng)常需要使用的基礎(chǔ)設(shè)施和解決的問題有ThreadPool、Lock、管道、集合點、線程之間等待和喚醒、線程間數(shù)據(jù)傳輸、共享資源訪問控制、并發(fā)線程之間的相互等待,等待。
concurrent提供的工具能夠解決絕大部分的場景,還能提高程序吞吐量。
現(xiàn)代的服務(wù)器多采用多核CPU,從而不同線程之間有可能真正地在同時運行而不是cpu時間切片。在處理大計算量的程序上要盡可能利用CPU多核特性,提高系統(tǒng)吞吐量。
并發(fā)編程主要面臨三個問題:
1.如何讓多個線程同時為同一個任務(wù)工作(并發(fā)編程設(shè)計)
2.多個線程之間對共享資源的爭用。
3.多個線程之間如何相互合作、傳遞數(shù)據(jù)。
1. concurrent包提供的集合
concurrent包直接提供了標準集合的一些實現(xiàn),在下面做簡單介紹。在大部分情況下可以使用它們提供高并發(fā)環(huán)境下對集合訪問的吞吐量。
1.1 ConcurrentHashMap
Map的一個并發(fā)實現(xiàn)。在多線程環(huán)境下,它具有很高的吞吐量和具備可靠的數(shù)據(jù)一致性。它支持并發(fā)讀和一定程度的并發(fā)修改(默認16個并發(fā),可以通過構(gòu)造函數(shù)修改)。
HashMap的實現(xiàn)是非線程安全的,高并發(fā)下會get方法常會死鎖,有的時候會表現(xiàn)為CPU居高不下。
- public V get(Object key) {
- if (key == null)
- return getForNullKey();
- int hash = hash(key.hashCode());
- for (Entry e = table[indexFor(hash, table.length)];
- e != null;
- e = e.next) {
- Object k;
- if (e.hash == hash && ((k = e.key) == key || key.equals(k)))
- return e.value;
- }
- return null;
- }
在get操作里面for循環(huán)取對象的操作,由于高并發(fā)同時讀寫,for循環(huán)的結(jié)果變得不可預(yù)知,所以有可能一直循環(huán)。
所以高并發(fā)環(huán)境下盡量不要直接使用HashMap,對系統(tǒng)造成的影響很難排除。
和Collections.synchronizedMap(new HashMap(...))相比,外ConcurrentHashMap在高并發(fā)的環(huán)境下有著更優(yōu)秀的吞吐量。因為ConcurrentHashMap可以支持寫并發(fā),基本原理是內(nèi)部分段,分段的數(shù)量決定著并發(fā)程度。通過concurrencyLevel參數(shù)可以設(shè)置。如果你能預(yù)期并發(fā)數(shù)量那么設(shè)置該參數(shù)可以獲取更優(yōu)吞吐量。
另外為ConcurrentHashMap還實現(xiàn)了:
V putIfAbsent(K key, V value);
boolean remove(Object key, Object value);
boolean replace(K key, V oldValue, V newValue);
V replace(K key, V value);
這四個一致性的操作方法。
1.2 BlockingQueue
BlockingQueue定義了一個接口,繼承了Queue接口。Queue是一種數(shù)據(jù)結(jié)構(gòu),意思是它的項以先入先出(FIFO)順序存儲。
BlockingQueue為我們提供了一些多線程阻塞語義的方法,新增和重定義了一些方法插入:
BlockingQueue是線程安全的,非常適合多個生產(chǎn)者和多個消費者線程之間傳遞數(shù)據(jù)。
形象地理解,BlockingQueue好比有很多格子的傳輸帶系統(tǒng),不過當你(生產(chǎn)者)調(diào)用put方法的時候,如果有空閑的格子那么放入物體后立刻返回,如果沒有空閑格子那么一直處于等待狀態(tài)。add方法意味著如果沒有空閑格子系統(tǒng)就會報警,然后如果處理該報警則按照你的意愿。offer方法優(yōu)先于add方法,它通過返回true 或 flase來告訴你是否放入成功。offer超時方法,如果不空閑的情況下,嘗試等待一段時間。
BlockingQueue有很多實現(xiàn)ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue
補充Dueue是個雙向隊列,可以當做堆棧來使用。
BlockingQueue在ThreadPool中,作為任務(wù)隊列來使用,用來保存沒有立刻執(zhí)行的工作任務(wù)對象。
1.3 SynchronousQueue
SychronousQueue是BlockingQueue的一個實現(xiàn),它看起來是一個隊列,但是其實沒有容量,是特定條件下的一個精簡實現(xiàn)。
做個比喻,SychronousQueue對象就像一個接力棒,現(xiàn)在有兩個運動員交棒者和接棒者(線程)要做交接。在交接點,交棒者沒有交出之前是不能松開的(一種等待狀態(tài)),接棒者在接到棒之前是必須等待。換一句話說不管誰先到交接點,必須處于等待狀態(tài)。
在生產(chǎn)者和消費者模型中。如果生產(chǎn)者向SychronousQueue進行put操作,直到有另外的消費者線程進行take操作時才能返回。對消費者也是一樣,take操作會被阻塞,直到生產(chǎn)者put。
在這種生產(chǎn)者-消費者模型下,生產(chǎn)者和消費者是進行手對手傳遞產(chǎn)品,在消費者消費一個產(chǎn)品之前,生產(chǎn)者必須處于等待狀態(tài)。它給我們提供了在線程之間交換單一元素的極輕量級方法,并且具有阻塞語義。
提示:上面舉例中有寫局限性。其實生產(chǎn)者和消費者進程是可以任意數(shù)量的。M:N。生產(chǎn)線程之間會對SychronousQueue進行爭用,消費者也是一樣。
對SychronousQueue類似于其他語境中“會合通道”或 “連接”點問題。它非常適合于傳遞性設(shè)計,在這種設(shè)計中,在一個線程中運行的對象要將某些信息、事件或任務(wù)傳遞給在另一個線程中運行的對象,它就必須與該對象同步。
1.4Exchanger
是SychronousQueue的雙向?qū)崿F(xiàn)。用來伙伴線程間交互對象。Exchanger 可能在比如遺傳算法和管道設(shè)計中很有用。
形象地說,就是兩個人在預(yù)定的地方交互物品,任何一方?jīng)]到之前都處于等待狀態(tài)。
1.5 CopyOnWriteArrayList 和 CopyOnWriteArraySet
它們分別是List接口和Set接口的實現(xiàn)。正如類名所描述的那樣,當數(shù)據(jù)結(jié)構(gòu)發(fā)生變化的時候,會復(fù)制自身的內(nèi)容,來保證一致性。大家都知道復(fù)制全部副本是非常昂貴的操作,看來這是一個非常不好的實現(xiàn)。事實上沒有最好和最差的方案,只有最合適的方案。一般情況下,處理多線程同步問題,我們傾向使用同步的 ArrayList,但同步也有其成本。
那么在什么情況下使用CopyOnWriteArrayList 或者CopyOnWriteArraySet呢?
數(shù)據(jù)量小。
對數(shù)據(jù)結(jié)構(gòu)的修改是偶然發(fā)生的,相對于讀操作。
舉例來說,如果我們實現(xiàn)觀察者模式的話,作為監(jiān)聽器集合是非常合適的。
1.6 TimeUnit
雖然是個時間單位,但是它也是concurrent包里面的。也許你以前的代碼里面經(jīng)常出現(xiàn)1*60*1000來表示一分鐘,代碼可讀性很差。現(xiàn)在你可以通過TimeUnit來編寫可讀性更好的代碼,concurrent的api里面涉及到時間的地方都會使用該對象。
我之所以先進并發(fā)框架常用的集合,是因為線程池的實現(xiàn)特性都利用了BlockingQueue的一些特性。
#p#
2. ThreadPool
雖然線程和進程相比是輕量級許多,但是線程的創(chuàng)建成本還是不可忽律,所以就有了線程池化的設(shè)計。線程池的創(chuàng)建、管理、回收、任務(wù)隊列管理、任務(wù)分配等細節(jié)問題依然負責,沒有必要重復(fù)發(fā)明輪子,concurrent包已經(jīng)為我們準備了一些優(yōu)秀線程池的實現(xiàn)。
2.1 認識ExecutorService 接口
ExecutorService 接口,它能提供的功能就是用來在將來某一個時刻異步地執(zhí)行一系列任務(wù)。雖然簡單一句話,但是包含了很多需求點。它的實現(xiàn)至少包含了線程池和任務(wù)隊列兩個方面,其實還包括了任務(wù)失敗處理策略等。

經(jīng)常使用submit方法,用來提交任務(wù)對象。
簡單的例子:
ExecutorService es = Executors.newCachedThreadPool();
es.submit(new Runnable(){
@Override
public void run() {
System.out.println("do some thing");
}
});
es.shutdown();
上面的例子只是完成了提交了一個任務(wù),異步地去執(zhí)行它。但是有些使用場景更為復(fù)雜,比如等待獲得異步任務(wù)的返回結(jié)果,或者最多等上固定的時間。
submit 方法返回一個對象,F(xiàn)uture??雌饋碛悬c別扭,代表將來的對象。其實看一下Future的方法就明白了。

其實Future對象代表了一個異步任務(wù)的結(jié)果,可以用來取消任務(wù)、查詢?nèi)蝿?wù)狀態(tài),還有通過get方法獲得異步任務(wù)返回的結(jié)果。當調(diào)用get方法的時候,當前線程被阻塞直到任務(wù)被處理完成或者出現(xiàn)異常。
我們可以通過保存Future對象來跟蹤查詢異步任務(wù)的執(zhí)行情況。
顯然Runnable接口中定義的 public void run();方法并不能返回結(jié)果對象,所以concurrent包提供了Callable接口,它可以被用來返回結(jié)果對象。
2.2 ThreadPoolExecutor
ThreadPoolExecutor實現(xiàn)了ExecutorService 接口,也是我們最主要使用的實現(xiàn)類。
首先非常有必要看一些類的最完整的構(gòu)造函數(shù)
- ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue
workQueue, - ThreadFactory threadFactory,
- RejectedExecutionHandler handler)
ThreadPoolExecutor對象中有個poolSize變量表示當前線程池中正在運行的線程數(shù)量。
注意:這個有關(guān)非常重要的關(guān)系,常常被誤解。poolSize變量和corePoolSize、maximumPoolSize以及workQueue的關(guān)系。
首先線程池被創(chuàng)建初期,還沒有執(zhí)行任何任務(wù)的時候,poolSize等于0;
每次向線程池提交任務(wù)的時候,線程池處理過程如下:
1. 如果poolSize少于 corePoolSize,則首選添加新的線程,而不進行排隊。
2. 如果poolSize等于或多于 corePoolSize,則首選將請求加入隊列workQueue,而不添加新的線程。
3. 如果第二步執(zhí)行失敗(隊已滿),則創(chuàng)建新的線程執(zhí)行任務(wù),但是如果果poolSize已經(jīng)達到maximumPoolSize,那么就拒絕該任務(wù)。如果處理被拒絕的任務(wù)就取決于RejectedExecutionHandler handler的設(shè)置了,默認情況下會拋出異常。
系統(tǒng)存在四種任務(wù)拒絕策略:
在默認的 ThreadPoolExecutor.AbortPolicy 中,處理程序遭到拒絕將拋出運行時 RejectedExecutionException。
在 ThreadPoolExecutor.CallerRunsPolicy 中,線程調(diào)用運行該任務(wù)的 execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務(wù)的提交速度。
在 ThreadPoolExecutor.DiscardPolicy 中,不能執(zhí)行的任務(wù)將被刪除。
在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果執(zhí)行程序尚未關(guān)閉,則位于工作隊列頭部的任務(wù)將被刪除,然后重試執(zhí)行程序(如果再次失敗,則重復(fù)此過程)。
keepAliveTime活動線程如果空閑一段時間是否可以回收,通常只作用于超出corePoolSize的線程。corePoolSize的線程創(chuàng)建了就不會被回收。但是到j(luò)ava 6 之后增加了public void allowCoreThreadTimeOut(boolean value)方法,允許core進程也可以根據(jù)keepAliveTime來回收,默認為false。
決定線程池特性的還有workQueue的實現(xiàn)類,有三種類SynchronousQueue、LinkedBlockingQueue、ArrayBlockingQueue,分別對應(yīng)同步隊列、無界隊列、有界隊列。
(摘自JavaDoc)
類SynchronousQueue,直接提交。工作隊列的默認選項是 SynchronousQueue,它將任務(wù)直接提交給線程而不保持它們。在此,如果不存在可用于立即運行任務(wù)的線程,則試圖把任務(wù)加入隊列將失敗,因此會構(gòu)造一個新的線程。此策略可以避免在處理可能具有內(nèi)部依賴性的請求集時出現(xiàn)鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務(wù)(設(shè)置maximumPoolSizes 為Integer.MAX_VALUE)。當命令以超過隊列所能處理的平均數(shù)連續(xù)到達時,此策略允許無界線程具有增長的可能性。
LinkedBlockingQueue,無界隊列。使用無界隊列(例如,不具有預(yù)定義容量的 LinkedBlockingQueue)將導(dǎo)致在所有 corePoolSize 線程都忙時新任務(wù)在隊列中等待。這樣,創(chuàng)建的線程就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當每個任務(wù)完全獨立于其他任務(wù),即任務(wù)執(zhí)行互不影響時,適合于使用無界隊列;例如,在 Web 頁服務(wù)器中。這種排隊可用于處理瞬態(tài)突發(fā)請求,當命令以超過隊列所能處理的平均數(shù)連續(xù)到達時,此策略允許無界線程具有增長的可能性。
ArrayBlockingQueue,有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(如 ArrayBlockingQueue)有助于防止資源耗盡,但是可能較難調(diào)整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低 CPU 使用率、操作系統(tǒng)資源和上下文切換開銷,但是可能導(dǎo)致人工降低吞吐量。如果任務(wù)頻繁阻塞(例如,如果它們是 I/O 邊界),則系統(tǒng)可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調(diào)度開銷,這樣也會降低吞吐量。
綜上:構(gòu)造參數(shù)的設(shè)置是互相制約和影響的。只有當你重復(fù)了解其相互關(guān)系的時候、或有特殊需求的時候,才可以自己構(gòu)造ThreadPoolExecutor對象,否則可以使用Executores是個工廠類。
提示使用線程池是注意處理shutdown,確保你系統(tǒng)關(guān)閉的時候主動關(guān)閉shutdown。
2.3 ScheduledExecutorService
擴展了ExecutorService接口,提供時間排程的功能。
schedule方法被用來延遲指定時間來執(zhí)行某個指定任務(wù)。如果你需要周期性重復(fù)執(zhí)行定時任務(wù)可以使用scheduleAtFixedRate或者scheduleWithFixedDelay方法,它們不同的是前者以固定頻率執(zhí)行,后者以相對固定頻率執(zhí)行。
(感謝wenbois2000 提出原先的錯誤,我在這里重新描述!對于原先的錯誤,實在不好意思啊,再次感謝!)不管任務(wù)執(zhí)行耗時是否大于間隔時間,scheduleAtFixedRate和scheduleWithFixedDelay都不會導(dǎo)致同一個任務(wù)并發(fā)地被執(zhí)行。唯一不同的是scheduleWithFixedDelay是當前一個任務(wù)結(jié)束的時刻,開始結(jié)算間隔時間,如0秒開始執(zhí)行第一次任務(wù),任務(wù)耗時5秒,任務(wù)間隔時間3秒,那么第二次任務(wù)執(zhí)行的時間是在第8秒開始。
ScheduledExecutorService的實現(xiàn)類,是ScheduledThreadPoolExecutor。ScheduledThreadPoolExecutor對象包含的線程數(shù)量是沒有可伸縮性的,只會有固定數(shù)量的線程。不過你可以通過其構(gòu)造函數(shù)來設(shè)定線程的優(yōu)先級,來降低定時任務(wù)線程的系統(tǒng)占用。
特別提示:通過ScheduledExecutorService執(zhí)行的周期任務(wù),如果任務(wù)執(zhí)行過程中拋出了異常,那么過ScheduledExecutorService就會停止執(zhí)行任務(wù),且也不會再周期地執(zhí)行該任務(wù)了。所以你如果想保住任務(wù)都一直被周期執(zhí)行,那么catch一切可能的異常。
2.4 Executors
Executores是個工廠類,用來生成ThreadPoolExecutor對象,它提供了一些常用的線程池配置方案,滿足我們大部分場景。
1. newCachedThreadPool
- public static ExecutorService newCachedThreadPool() {
- return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue());
- }
分析下出這個線程池配置的工作模式,當沒有空閑進程時就新建線程執(zhí)行,當有空閑線程時就使用空閑線程執(zhí)行。當線程空閑大60秒時,系統(tǒng)自動回收線程。
該線程池非常適合執(zhí)行短小異步任務(wù)時吞吐量非常高,會重復(fù)利用CPU的能力。但是如果任務(wù)處理IO邊界任務(wù),那么會消耗大量線程切換,降低系統(tǒng)吞吐量。所以執(zhí)行短小的計算任務(wù)非常高效,且當沒有任務(wù)時不會消耗系統(tǒng)資源。
注意:線程池中沒有變量表示線程是否空閑。那么程序是如何控制的呢?不得不贊嘆concurrent實現(xiàn)的非常精巧。當創(chuàng)建出來的線程完成原來的任務(wù)后,會調(diào)用BlockingQueue的Poll方法,對于SynchronousQueue實現(xiàn)而言會阻塞調(diào)用線程,直到另外的線程offer調(diào)用。
然而ThreadPool在分配任務(wù)的時候總是先去嘗試調(diào)用offer方法,所以就會觸發(fā)空閑線程再次調(diào)用。
精妙的是ThreadPoolExecutor的處理邏輯一樣,但是用BlockingQueue實現(xiàn)變了就產(chǎn)生不同的行為。
2. newFixedThreadPool
- public static ExecutorService newFixedThreadPool(int nThreads) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue());
- }
創(chuàng)建固定線程數(shù)量的線程池,采用無界隊列,當有更多任務(wù)的時候?qū)⒈环湃牍ぷ麝犃兄信抨?。如果線程池不經(jīng)常執(zhí)行任務(wù)時,你可以調(diào)用allowCoreThreadTimeOut(boolean value)的方法讓系統(tǒng)自動回收core的進程,以節(jié)約系統(tǒng)資源。
3. newSingleThreadExecutor
- public static ExecutorService newSingleThreadExecutor() {
- return new FinalizableDelegatedExecutorService
- (new ThreadPoolExecutor(1, 1,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue()));
- }
只有一個工作線程的線程池。和newFixedThreadPool(1)相比,不同之處有兩點:
1. 不可以重新配置newSingleThreadExecutor創(chuàng)建出來的線程池。
2. 當創(chuàng)建出來的線程池對象被GC回收時,會自動調(diào)用shutdown方法。
4.newScheduledThreadPool
- public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
- return new ScheduledThreadPoolExecutor(corePoolSize);
- }
生成一個可以執(zhí)行時間調(diào)度的線程池。其實內(nèi)部使用無界工作隊列,線程數(shù)量最多能達到corePoolSize。
2.5 ExecutorCompletionService
這是個巧妙的設(shè)計,內(nèi)部維護了一已經(jīng)完成了任務(wù)結(jié)果隊列,通過take方法可以同步地等待一個個結(jié)果對象。
詳情見http://www.oschina.net/uploads/doc/javase-6-doc-api-zh_CN/java/util/concurrent/ExecutorCompletionService.html
#p#
3. java.util.concurrent.locks
java 早期內(nèi)置synchronized關(guān)鍵字解決多線程對共享資源訪問的一些問題,和其還配套了Object的notify 和 wait方法,用來控制線程之間的同步。
concurrent軟件包提供了更為高級和抽象的Lock工具,能解決更多的問題。
Lock是控制多個線程對共享資源進行訪問的工具。通常Lock限定多線程對同一個共享資源訪問的限制,一次只允許一個線程獲得Lock,即獲得對共享資源的訪問權(quán)限,線程間是互斥的。但是也有一些鎖如果ReadWriteLock是允許部分線程同時訪問共享資源的。
幾個術(shù)語:
爭用:當多個Thread在同一時間內(nèi)(相對概念)想要占有同一個Lock對象。那么JVM會調(diào)度解決爭用。
獲取順序:當多個線程爭用同一個Lock對象,那么JVM就要決定哪個線程將會獲得鎖權(quán)限。存在兩種模式:公平和不公平。 默認都是不公平模式,包括synchronized關(guān)鍵字,jvm決定順序的時候也是采用不公平策略。因為公平策略需要系統(tǒng)記錄大量輔助信息來判斷分配順序,而不公平策略由JVM決定一直快速高效的算法來分配Lock。所以不公平策略的系統(tǒng)吞吐量會比較高(花費更少的空間和計算在分配上),如果沒有特殊需要則默認采用不公平策略。
重入:當前線程獲取指定的鎖對象權(quán)限后,還可以再次獲取該鎖。Lock內(nèi)部會有一個計數(shù)器來表明當前線程獲取了該鎖的數(shù)量。如果一個線程獲取了一個鎖兩次,那么線程必須釋放鎖兩次,才能被看作完全釋放了該鎖,所以編程的時候一定要注意使用重入。synchronized關(guān)鍵字也是支持重入語義的。
3.1 Lock & ReentrantLock
ReentrantLock實現(xiàn)了Lock接口,一個可重入(reentrant)的互斥鎖 Lock,它具有與使用 synchronized 方法和語句所訪問的隱式監(jiān)視器鎖相同的一些基本行為和語義,但功能更強大。
摘自JavaDoc的一段獲取規(guī)則 “當鎖沒有被另一個線程所擁有時,調(diào)用 lock 的線程將成功獲取該鎖并返回。如果當前線程已經(jīng)擁有該鎖,此方法將立即返回。ReentrantLock 將由最近成功獲得鎖,并且還沒有釋放該鎖的線程所擁有?!?/P>
經(jīng)典使用方法。
- public void m() {
- lock.lock(); // block until condition holds
- try {
- // ... method body
- } finally {
- lock.unlock()
- }
- }
ReentrantLock除了實現(xiàn)了Lock規(guī)定的方法外,還實現(xiàn)了tryLock、isLocked 等方法,幫助你實現(xiàn)更多的場景。
Condition
和Object的wait和notify方法類似。ReentrantLock對象附加了Conditon對象,用來完成掛起和喚醒操作,使用lock.newCondition() 方法生成。
一個來自JKD的例子:
- class BoundedBuffer {
- final Lock lock = new ReentrantLock();
- final Condition notFull = lock.newCondition();
- final Condition notEmpty = lock.newCondition();
- final Object[] items = new Object[100];
- int putptr, takeptr, count;
- public void put(Object x) throws InterruptedException {
- lock.lock();
- try {
- while (count == items.length)
- notFull.await();
- items[putptr] = x;
- if (++putptr == items.length) putptr = 0;
- ++count;
- notEmpty.signal();
- } finally {
- lock.unlock();
- }
- }
- public Object take() throws InterruptedException {
- lock.lock();
- try {
- while (count == 0)
- notEmpty.await();
- Object x = items[takeptr];
- if (++takeptr == items.length) takeptr = 0;
- --count;
- notFull.signal();
- return x;
- } finally {
- lock.unlock();
- }
- }
- }
利用Conditon對象可以讓所有對同一個鎖對象進行爭用的Thread之間進行同步。
Lock VS synchronized
除非你有明確的需求或者并發(fā)遇到瓶頸的時候再決定使用ReentrantLock。synchronized在大部分時候還是可以工作的很好,jvm會自動處理和回收鎖。
ReentrantLock提供了更多的選擇和狀態(tài)信息。和
3.2 ReadWriteLock & ReentrantReadWriteLock
列舉一個場景對象X,擁有方法a、b、c。a和b方法不改表X的內(nèi)部狀態(tài),c改變內(nèi)部狀態(tài)。在多線程環(huán)境下,我們要求只讀和寫(變更狀態(tài))是不能同時進行的,而只讀操作是可以同時并發(fā)的,且實際運行過程中讀操作數(shù)量遠遠大于寫操作的數(shù)量。
如果用synchronized關(guān)鍵字的話,兩個只讀方法a、b也會互斥,并發(fā)性能收到限制。
那么這個情況下ReadWriteLock就非常有用,使用也非常簡單。
- class RWDictionary {
- private final Map
m = new TreeMap(); - private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
- private final Lock r = rwl.readLock();
- private final Lock w = rwl.writeLock();
- public Data get(String key) {
- r.lock();
- try { return m.get(key); }
- finally { r.unlock(); }
- }
- public String[] allKeys() {
- r.lock();
- try { return m.keySet().toArray(); }
- finally { r.unlock(); }
- }
- public Data put(String key, Data value) {
- w.lock();
- try { return m.put(key, value); }
- finally { w.unlock(); }
- }
- public void clear() {
- w.lock();
- try { m.clear(); }
- finally { w.unlock(); }
- }
- }
要記得write鎖是獨占的,它一樣可以使用ReentrantLock的Condition功能。
使用任何的鎖都要通過try catch 或者 finally 來處理異常,避免忘記unlock。
#p#
4. 同步輔助類
你提交了一些任務(wù),但你想等它們都完成了再做另外一些事情;你提交了一些任務(wù),但是不想讓它們立刻執(zhí)行,等你喊123開始的時候,它們才開始執(zhí)行;等等這些場景,線程之間需要相互配合,或者等待某一個條件成熟執(zhí)行。這些場景想你就需要用到同步輔助類。
4.1 CountDownLatch
CountDownLatch 內(nèi)部有個計數(shù)器,通過構(gòu)造函數(shù)來指定。這個類就好比是倒計時的電子牌,當?shù)褂嫊r為0的時候就可以一起做一些事情。
摘自JavaDoc的方法介紹
摘自JavaDoc的例子
- class Driver { // ...
- void main() throws InterruptedException {
- CountDownLatch startSignal = new CountDownLatch(1);
- CountDownLatch doneSignal = new CountDownLatch(N);
- for (int i = 0; i < N; ++i) // create and start threads
- new Thread(new Worker(startSignal, doneSignal)).start();
- doSomethingElse(); // don't let run yet
- startSignal.countDown(); // let all threads proceed
- doSomethingElse();
- doneSignal.await(); // wait for all to finish
- }
- }
- class Worker implements Runnable {
- private final CountDownLatch startSignal;
- private final CountDownLatch doneSignal;
- Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
- this.startSignal = startSignal;
- this.doneSignal = doneSignal;
- }
- public void run() {
- try {
- startSignal.await();
- doWork();
- doneSignal.countDown();
- } catch (InterruptedException ex) {} // return;
- }
- void doWork() { ... }
- }
當CountDownLatch(1)的時候,它就好比是個信號槍了。
4.2 CyclicBarrier
- new CyclicBarrier(N,
- new Runnable() {
- public void run() {
- mergeRows(...);
- }
- });
這個同步輔助類,它讓多個線程可以在多個屏障點進行等待,所以叫cyclic,而且有個附加選擇你可以在線程到達屏障點后執(zhí)行一個任務(wù)(在釋放其他線程之前)
為了幫助你理解,假設(shè)一個場景。
有一個任務(wù),A、B、C分別從三個倉庫(甲乙丙)搬運不同3個不同的零件到客戶X的公司,然后再一起組裝機器,完成后一起坐車去公司總部。
這個任務(wù)需要ABC三個線程同時進行,但是由于從倉庫到客戶X那邊距離不等、交通狀態(tài)未知的情況下,所花費的時間也不等。同時由于三個人負責的零件不同,所以安裝機器的時候花費時間也不一樣。這個場景中有兩個需要線程間等待的地方。CyclicBarrier就可以閃亮登場了。
- public class Main3 {
- public static void main(String[] args) {
- CyclicBarrier barrier = new CyclicBarrier(3,new Runnable() {
- @Override
- public void run() {
- System.out.println("到達公共屏障點");
- }
- });
- ExecutorService es = Executors.newCachedThreadPool();
- es.submit(new Worker("A", 5000, 8000, barrier));
- es.submit(new Worker("B", 2000, 16000, barrier));
- es.submit(new Worker("C", 9000, 2000, barrier));
- es.shutdown();
- }
- static class Worker implements Runnable {
- String name;
- int t1;// 搬運零件所需要的時間
- int t2;// 參與組裝工作需要的時間
- CyclicBarrier barrier;
- public Worker(String name, int t1, int t2, CyclicBarrier barrier) {
- super();
- this.name = name;
- this.t1 = t1;
- this.t2 = t2;
- this.barrier = barrier;
- }
- @Override
- public void run() {
- try {
- print(name + " 開始搬運零件");
- Thread.sleep(t1);// 模擬搬運時間
- print(name + " 到達目的地");
- int a = barrier.await(); // 等待其他人
- if(a==0){
- //說明是最后一個到的可以執(zhí)行特殊操作
- }
- print(name + " 開始組裝機器");
- Thread.sleep(t2);// 模擬組裝時間.
- print(name + " 完成組裝機器");
- barrier.await(); // 等待其他人組裝完畢
- print(name + " 一起回總公司");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
- static void print(String x) {
- System.out.println( sdf.format(new Date()) + ": "+x);
- }
- }
4.3 Semaphore
一個經(jīng)典的信號量計數(shù)器。一般被用來控制對共享資源同時訪問線程數(shù)量的控制。
特殊情況下信號量設(shè)置為1,那么就類似互斥鎖的功能。
此類的構(gòu)造方法可選地接受一個公平 參數(shù)。當設(shè)置為 false 時,此類不對線程獲取鎖的順序做任何保證。和之前提到的爭用獲取順序一樣,在非公平模式下,系統(tǒng)將獲得更好的吞吐量,jvm也會保證在非公平模式下讓所有線程得到訪問機會。
【編輯推薦】