我工作三年了,該懂并發(fā)了
本文的組織形式如下,主要會(huì)介紹到同步容器類,操作系統(tǒng)的并發(fā)工具,Java 開發(fā)工具包(只是簡單介紹一下,后面會(huì)有源碼分析)。同步工具類有哪些。
下面我們就來介紹一下 Java 并發(fā)中都涉及哪些模塊,這些并發(fā)模塊都是 Java 并發(fā)類庫所提供的。
同步容器類
同步容器主要包括兩類,一種是本來就是線程安全實(shí)現(xiàn)的容器,這類容器有 Vector、Hashtable、Stack,這類容器的方法上都加了 synchronized 鎖,是線程安全的實(shí)現(xiàn)。
“Vector、Hashtable、Stack 這些容器我們現(xiàn)在幾乎都不在使用,因?yàn)檫@些容器在多線程環(huán)境下的效率不高。
還有一類是由 Collections.synchronizedxxx 實(shí)現(xiàn)的非線程安全的容器,使用 Collections.synchronized 會(huì)把它們封裝起來編程線程安全的容器,舉出兩個(gè)例子
- Collections.synchronizedList
- Collections.synchronizedMap
我們可以通過 Collections 源碼可以看出這些線程安全的實(shí)現(xiàn)
要不為啥要稱 Collections 為集合工具類呢?Collections 會(huì)把這些容器類的狀態(tài)封裝起來,并對(duì)每個(gè)同步方法進(jìn)行同步,使得每次只有一個(gè)線程能夠訪問容器的狀態(tài)。
其中每個(gè) synchronized xxx都是相當(dāng)于創(chuàng)建了一個(gè)靜態(tài)內(nèi)部類。
雖然同步容器類都是線程安全的,但是在某些情況下需要額外的客戶端加鎖來保證一些復(fù)合操作的安全性,復(fù)合操作就是有兩個(gè)及以上的方法組成的操作,比如最典型的就是 若沒有則添加,用偽代碼表示則是
- if(a == null){
- a = get();
- }
比如可以用來判斷 Map 中是否有某個(gè) key,如果沒有則添加進(jìn) Map 中。這些復(fù)合操作在沒有客戶端加鎖的情況下是線程安全的,但是當(dāng)多個(gè)線程并發(fā)修改容器時(shí),可能會(huì)表現(xiàn)出意料之外的行為。例如下面這段代碼
- public class TestVector implements Runnable{
- static Vector vector = new Vector();
- static void addVector(){
- for(int i = 0;i < 10000;i++){
- vector.add(i);
- }
- }
- static Object getVector(){
- int index = vector.size() - 1;
- return vector.get(index);
- }
- static void removeVector(){
- int index = vector.size() - 1;
- vector.remove(index);
- }
- @Override
- public void run() {
- getVector();
- }
- public static void main(String[] args) {
- TestVector testVector = new TestVector();
- testVector.addVector();
- Thread t1 = new Thread(() -> {
- for(int i = 0;i < vector.size();i++){
- getVector();
- }
- });
- Thread t2 = new Thread(() -> {
- for(int i = 0;i < vector.size();i++){
- removeVector();
- }
- });
- t1.start();
- t2.start();
- }
- }
這些方法看似沒有問題,因?yàn)?Vector 能夠保證線程安全性,無論多少個(gè)線程訪問 Vector 也不會(huì)造成 Vector 的內(nèi)部產(chǎn)生破壞,但是從整個(gè)系統(tǒng)來說,是存在線程安全性的,事實(shí)上你運(yùn)行一下,也會(huì)發(fā)現(xiàn)報(bào)錯(cuò)。
會(huì)出現(xiàn)
如果線程 A 在包含這么多元素的基礎(chǔ)上調(diào)用 getVector 方法,會(huì)得到一個(gè)數(shù)值,getVector 只是取得該元素,而并不是從 vector 中移除,removeVector 方法是得到一個(gè)元素進(jìn)行移除,這段代碼的不安全因素就是,因?yàn)榫€程的時(shí)間片是亂序的,而且 getVector 和 removeVector 并不會(huì)保證互斥,所以在 removeVector 方法把某個(gè)值比如 6666 移除后,vector 中就不存在這個(gè) 6666 的元素,此時(shí) getVector 方法取得 6666 ,就會(huì)拋出數(shù)組越界異常。為什么是數(shù)組越界異常呢?可以看一下 vector 的源碼
如果用圖表示的話,則會(huì)是下面這樣。
所以,從系統(tǒng)的層面來看,上面這段代碼也要保證線程安全性才可以,也就是在客戶端加鎖實(shí)現(xiàn),只要我們讓復(fù)合操作使用一把鎖,那么這些操作就和其他單獨(dú)的操作一樣都是原子性的。如下面例子所示
- static Object getVector(){
- synchronized (vector){
- int index = vector.size() - 1;
- return vector.get(index);
- }
- }
- static void removeVector(){
- synchronized (vector) {
- int index = vector.size() - 1;
- vector.remove(index);
- }
- }
也可以通過鎖住 .class 來保證原子性操作,也能達(dá)到同樣的效果。
- static Object getVector(){
- synchronized (TestVector.class){
- int index = vector.size() - 1;
- return vector.get(index);
- }
- }
- static void removeVector(){
- synchronized (TestVector.class) {
- int index = vector.size() - 1;
- vector.remove(index);
- }
- }
在調(diào)用 size 和 get 之間,Vector 的長度可能會(huì)發(fā)生變化,這種變化在對(duì) Vector 進(jìn)行排序時(shí)出現(xiàn),如下所示
- for(int i = 0;i< vector.size();i++){
- doSomething(vector.get(i));
- }
這種迭代的操作正確性取決于運(yùn)氣,即在調(diào)用 size 和 get 之間會(huì)修改 Vector,在單線程環(huán)境中,這種假設(shè)完全成立,但是再有其他線程并發(fā)修改 Vector 時(shí),則可能會(huì)導(dǎo)致麻煩。
我們?nèi)耘f可以通過客戶端加鎖的方式來避免這種情況
- synchronized(vector){
- for(int i = 0;i< vector.size();i++){
- doSomething(vector.get(i));
- }
- }
這種方式為客戶端的可靠性提供了保證,但是犧牲了伸縮性,而且這種在遍歷過程中進(jìn)行加鎖,也不是我們所希望看到的。
fail-fast
針對(duì)上面這種情況,很多集合類都提供了一種 fail-fast 機(jī)制,因?yàn)榇蟛糠旨蟽?nèi)部都是使用 Iterator 進(jìn)行遍歷,在循環(huán)中使用同步鎖的開銷會(huì)很大,而 Iterator 的創(chuàng)建是輕量級(jí)的,所以在集合內(nèi)部如果有并發(fā)修改的操作,集合會(huì)進(jìn)行快速失敗,也就是 fail-fast。當(dāng)他們發(fā)現(xiàn)容器在迭代過程中被修改時(shí),會(huì)拋出 ConcurrentModificationException 異常,這種快速失敗不是一種完備的處理機(jī)制,而只是 善意的捕獲并發(fā)錯(cuò)誤。
如果查看過 ConcurrentModificationException 的注解,你會(huì)發(fā)現(xiàn),ConcurrentModificationException 拋出的原則由兩種,如下
造成這種異常的原因是由于多個(gè)線程在遍歷集合的同時(shí)對(duì)集合類內(nèi)部進(jìn)行了修改,這也就是 fail-fast 機(jī)制。
該注解還聲明了另外一種方式
這個(gè)問題也是很經(jīng)典的一個(gè)問題,我們使用 ArrayList 來舉例子。如下代碼所示
- public static void main(String[] args) {
- List<String> list = new ArrayList<>();
- for (int i = 0 ; i < 10 ; i++ ) {
- list.add(i + "");
- }
- Iterator<String> iterator = list.iterator();
- int i = 0 ;
- while(iterator.hasNext()) {
- if (i == 3) {
- list.remove(3);
- }
- System.out.println(iterator.next());
- i ++;
- }
- }
該段代碼會(huì)發(fā)生異常,因?yàn)樵?ArrayList 內(nèi)部,有兩個(gè)屬性,一個(gè)是 modCount ,一個(gè)是 expectedModCount ,ArrayList 在 remove 等對(duì)集合結(jié)構(gòu)的元素造成數(shù)量上的操作會(huì)有 checkForComodification 的判斷,如下所示,這也是這段代碼的錯(cuò)誤原因。
fail-safe
fail-safe 是 Java 中的一種 安全失敗 機(jī)制,它表示的是在遍歷時(shí)不是直接在原集合上進(jìn)行訪問,而是先復(fù)制原有集合內(nèi)容,在拷貝的集合上進(jìn)行遍歷。由于迭代時(shí)是對(duì)原集合的拷貝進(jìn)行遍歷,所以在遍歷過程中對(duì)原集合所作的修改并不能被迭代器檢測到,所以不會(huì)觸發(fā) ConcurrentModificationException。java.util.concurrent 包下的容器都是安全失敗的,可以在多線程條件下使用,并發(fā)修改。
比如 CopyOnWriteArrayList, 它就是一種 fail-safe 機(jī)制的集合,它就不會(huì)出現(xiàn)異常,例如如下操作
- List<Integer> integers = new CopyOnWriteArrayList<>();
- integers.add(1);
- integers.add(2);
- integers.add(3);
- Iterator<Integer> itr = integers.iterator();
- while (itr.hasNext()) {
- Integer a = itr.next();
- integers.remove(a);
- }
CopyOnWriteArrayList 就是 ArrayList 的一種線程安全的變體,CopyOnWriteArrayList 中的所有可變操作比如 add 和 set 等等都是通過對(duì)數(shù)組進(jìn)行全新復(fù)制來實(shí)現(xiàn)的。
操作系統(tǒng)中的并發(fā)工具
講到并發(fā)容器,就不得不提操作系統(tǒng)級(jí)別實(shí)現(xiàn)了哪些進(jìn)程/線程間的并發(fā)容器,說白了其實(shí)就是數(shù)據(jù)結(jié)構(gòu)的設(shè)計(jì)。下面我們就來一起看一下操作系統(tǒng)級(jí)別的并發(fā)工具
信號(hào)量
信號(hào)量是 E.W.Dijkstra 在 1965 年提出的一種方法,它使用一個(gè)整形變量來累計(jì)喚醒次數(shù),以供之后使用。在他的觀點(diǎn)中,有一個(gè)新的變量類型稱作 信號(hào)量(semaphore)。一個(gè)信號(hào)量的取值可以是 0 ,或任意正數(shù)。0 表示的是不需要任何喚醒,任意的正數(shù)表示的就是喚醒次數(shù)。
Dijkstra 提出了信號(hào)量有兩個(gè)操作,現(xiàn)在通常使用 down 和 up(分別可以用 sleep 和 wakeup 來表示)。down 這個(gè)指令的操作會(huì)檢查值是否大于 0 。如果大于 0 ,則將其值減 1 ;若該值為 0 ,則進(jìn)程將睡眠,而且此時(shí) down 操作將會(huì)繼續(xù)執(zhí)行。檢查數(shù)值、修改變量值以及可能發(fā)生的睡眠操作均為一個(gè)單一的、不可分割的 原子操作(atomic action) 完成。
互斥量
如果不需要信號(hào)量的計(jì)數(shù)能力時(shí),可以使用信號(hào)量的一個(gè)簡單版本,稱為 mutex(互斥量)?;コ饬康膬?yōu)勢就在于在一些共享資源和一段代碼中保持互斥。由于互斥的實(shí)現(xiàn)既簡單又有效,這使得互斥量在實(shí)現(xiàn)用戶空間線程包時(shí)非常有用。
互斥量是一個(gè)處于兩種狀態(tài)之一的共享變量:解鎖(unlocked) 和 加鎖(locked)。這樣,只需要一個(gè)二進(jìn)制位來表示它,不過一般情況下,通常會(huì)用一個(gè) 整型(integer) 來表示。0 表示解鎖,其他所有的值表示加鎖,比 1 大的值表示加鎖的次數(shù)。
mutex 使用兩個(gè)過程,當(dāng)一個(gè)線程(或者進(jìn)程)需要訪問關(guān)鍵區(qū)域時(shí),會(huì)調(diào)用 mutex_lock 進(jìn)行加鎖。如果互斥鎖當(dāng)前處于解鎖狀態(tài)(表示關(guān)鍵區(qū)域可用),則調(diào)用成功,并且調(diào)用線程可以自由進(jìn)入關(guān)鍵區(qū)域。
另一方面,如果 mutex 互斥量已經(jīng)鎖定的話,調(diào)用線程會(huì)阻塞直到關(guān)鍵區(qū)域內(nèi)的線程執(zhí)行完畢并且調(diào)用了 mutex_unlock 。如果多個(gè)線程在 mutex 互斥量上阻塞,將隨機(jī)選擇一個(gè)線程并允許它獲得鎖。
Futexes
隨著并行的增加,有效的同步(synchronization)和鎖定(locking) 對(duì)于性能來說是非常重要的。如果進(jìn)程等待時(shí)間很短,那么自旋鎖(Spin lock) 是非常有效;但是如果等待時(shí)間比較長,那么這會(huì)浪費(fèi) CPU 周期。如果進(jìn)程很多,那么阻塞此進(jìn)程,并僅當(dāng)鎖被釋放的時(shí)候讓內(nèi)核解除阻塞是更有效的方式。不幸的是,這種方式也會(huì)導(dǎo)致另外的問題:它可以在進(jìn)程競爭頻繁的時(shí)候運(yùn)行良好,但是在競爭不是很激烈的情況下內(nèi)核切換的消耗會(huì)非常大,而且更困難的是,預(yù)測鎖的競爭數(shù)量更不容易。
有一種有趣的解決方案是把兩者的優(yōu)點(diǎn)結(jié)合起來,提出一種新的思想,稱為 futex,或者是快速用戶空間互斥(fast user space mutex),是不是聽起來很有意思?
futex 是 Linux 中的特性實(shí)現(xiàn)了基本的鎖定(很像是互斥鎖)而且避免了陷入內(nèi)核中,因?yàn)閮?nèi)核的切換的開銷非常大,這樣做可以大大提高性能。futex 由兩部分組成:內(nèi)核服務(wù)和用戶庫。內(nèi)核服務(wù)提供了了一個(gè) 等待隊(duì)列(wait queue) 允許多個(gè)進(jìn)程在鎖上排隊(duì)等待。除非內(nèi)核明確的對(duì)他們解除阻塞,否則它們不會(huì)運(yùn)行。
Pthreads 中的互斥量
Pthreads 提供了一些功能用來同步線程。最基本的機(jī)制是使用互斥量變量,可以鎖定和解鎖,用來保護(hù)每個(gè)關(guān)鍵區(qū)域。希望進(jìn)入關(guān)鍵區(qū)域的線程首先要嘗試獲取 mutex。如果 mutex 沒有加鎖,線程能夠馬上進(jìn)入并且互斥量能夠自動(dòng)鎖定,從而阻止其他線程進(jìn)入。如果 mutex 已經(jīng)加鎖,調(diào)用線程會(huì)阻塞,直到 mutex 解鎖。如果多個(gè)線程在相同的互斥量上等待,當(dāng)互斥量解鎖時(shí),只有一個(gè)線程能夠進(jìn)入并且重新加鎖。這些鎖并不是必須的,程序員需要正確使用它們。
下面是與互斥量有關(guān)的函數(shù)調(diào)用
和我們想象中的一樣,mutex 能夠被創(chuàng)建和銷毀,扮演這兩個(gè)角色的分別是Phread_mutex_init 和 Pthread_mutex_destroy。mutex 也可以通過 Pthread_mutex_lock 來進(jìn)行加鎖,如果互斥量已經(jīng)加鎖,則會(huì)阻塞調(diào)用者。還有一個(gè)調(diào)用Pthread_mutex_trylock 用來嘗試對(duì)線程加鎖,當(dāng) mutex 已經(jīng)被加鎖時(shí),會(huì)返回一個(gè)錯(cuò)誤代碼而不是阻塞調(diào)用者。這個(gè)調(diào)用允許線程有效的進(jìn)行忙等。最后,Pthread_mutex_unlock 會(huì)對(duì) mutex 解鎖并且釋放一個(gè)正在等待的線程。
除了互斥量以外,Pthreads 還提供了第二種同步機(jī)制:條件變量(condition variables) 。mutex 可以很好的允許或阻止對(duì)關(guān)鍵區(qū)域的訪問。條件變量允許線程由于未滿足某些條件而阻塞。絕大多數(shù)情況下這兩種方法是一起使用的。下面我們進(jìn)一步來研究線程、互斥量、條件變量之間的關(guān)聯(lián)。
下面再來重新認(rèn)識(shí)一下生產(chǎn)者和消費(fèi)者問題:一個(gè)線程將東西放在一個(gè)緩沖區(qū)內(nèi),由另一個(gè)線程將它們?nèi)〕觥H绻a(chǎn)者發(fā)現(xiàn)緩沖區(qū)沒有空槽可以使用了,生產(chǎn)者線程會(huì)阻塞起來直到有一個(gè)線程可以使用。生產(chǎn)者使用 mutex 來進(jìn)行原子性檢查從而不受其他線程干擾。但是當(dāng)發(fā)現(xiàn)緩沖區(qū)已經(jīng)滿了以后,生產(chǎn)者需要一種方法來阻塞自己并在以后被喚醒。這便是條件變量做的工作。
下面是一些與條件變量有關(guān)的最重要的 pthread 調(diào)用
上表中給出了一些調(diào)用用來創(chuàng)建和銷毀條件變量。條件變量上的主要屬性是Pthread_cond_wait 和 Pthread_cond_signal。前者阻塞調(diào)用線程,直到其他線程發(fā)出信號(hào)為止(使用后者調(diào)用)。阻塞的線程通常需要等待喚醒的信號(hào)以此來釋放資源或者執(zhí)行某些其他活動(dòng)。只有這樣阻塞的線程才能繼續(xù)工作。條件變量允許等待與阻塞原子性的進(jìn)程。Pthread_cond_broadcast 用來喚醒多個(gè)阻塞的、需要等待信號(hào)喚醒的線程。
“需要注意的是,條件變量(不像是信號(hào)量)不會(huì)存在于內(nèi)存中。如果將一個(gè)信號(hào)量傳遞給一個(gè)沒有線程等待的條件變量,那么這個(gè)信號(hào)就會(huì)丟失,這個(gè)需要注意
管程
為了能夠編寫更加準(zhǔn)確無誤的程序,Brinch Hansen 和 Hoare 提出了一個(gè)更高級(jí)的同步原語叫做 管程(monitor)。管程有一個(gè)很重要的特性,即在任何時(shí)候管程中只能有一個(gè)活躍的進(jìn)程,這一特性使管程能夠很方便的實(shí)現(xiàn)互斥操作。管程是編程語言的特性,所以編譯器知道它們的特殊性,因此可以采用與其他過程調(diào)用不同的方法來處理對(duì)管程的調(diào)用。通常情況下,當(dāng)進(jìn)程調(diào)用管程中的程序時(shí),該程序的前幾條指令會(huì)檢查管程中是否有其他活躍的進(jìn)程。如果有的話,調(diào)用進(jìn)程將被掛起,直到另一個(gè)進(jìn)程離開管程才將其喚醒。如果沒有活躍進(jìn)程在使用管程,那么該調(diào)用進(jìn)程才可以進(jìn)入。
進(jìn)入管程中的互斥由編譯器負(fù)責(zé),但是一種通用做法是使用 互斥量(mutex) 和 二進(jìn)制信號(hào)量(binary semaphore)。由于編譯器而不是程序員在操作,因此出錯(cuò)的幾率會(huì)大大降低。在任何時(shí)候,編寫管程的程序員都無需關(guān)心編譯器是如何處理的。他只需要知道將所有的臨界區(qū)轉(zhuǎn)換成為管程過程即可。絕不會(huì)有兩個(gè)進(jìn)程同時(shí)執(zhí)行臨界區(qū)中的代碼。
即使管程提供了一種簡單的方式來實(shí)現(xiàn)互斥,但在我們看來,這還不夠。因?yàn)槲覀冞€需要一種在進(jìn)程無法執(zhí)行被阻塞。在生產(chǎn)者-消費(fèi)者問題中,很容易將針對(duì)緩沖區(qū)滿和緩沖區(qū)空的測試放在管程程序中,但是生產(chǎn)者在發(fā)現(xiàn)緩沖區(qū)滿的時(shí)候該如何阻塞呢?
解決的辦法是引入條件變量(condition variables) 以及相關(guān)的兩個(gè)操作 wait 和 signal。當(dāng)一個(gè)管程程序發(fā)現(xiàn)它不能運(yùn)行時(shí)(例如,生產(chǎn)者發(fā)現(xiàn)緩沖區(qū)已滿),它會(huì)在某個(gè)條件變量(如 full)上執(zhí)行 wait 操作。這個(gè)操作造成調(diào)用進(jìn)程阻塞,并且還將另一個(gè)以前等在管程之外的進(jìn)程調(diào)入管程。在前面的 pthread 中我們已經(jīng)探討過條件變量的實(shí)現(xiàn)細(xì)節(jié)了。另一個(gè)進(jìn)程,比如消費(fèi)者可以通過執(zhí)行 signal 來喚醒阻塞的調(diào)用進(jìn)程。
通過臨界區(qū)自動(dòng)的互斥,管程比信號(hào)量更容易保證并行編程的正確性。但是管程也有缺點(diǎn),我們前面說到過管程是一個(gè)編程語言的概念,編譯器必須要識(shí)別管程并用某種方式對(duì)其互斥作出保證。C、Pascal 以及大多數(shù)其他編程語言都沒有管程,所以不能依靠編譯器來遵守互斥規(guī)則。
與管程和信號(hào)量有關(guān)的另一個(gè)問題是,這些機(jī)制都是設(shè)計(jì)用來解決訪問共享內(nèi)存的一個(gè)或多個(gè) CPU 上的互斥問題的。通過將信號(hào)量放在共享內(nèi)存中并用 TSL 或 XCHG 指令來保護(hù)它們,可以避免競爭。但是如果是在分布式系統(tǒng)中,可能同時(shí)具有多個(gè) CPU 的情況,并且每個(gè) CPU 都有自己的私有內(nèi)存呢,它們通過網(wǎng)絡(luò)相連,那么這些原語將會(huì)失效。因?yàn)樾盘?hào)量太低級(jí)了,而管程在少數(shù)幾種編程語言之外無法使用,所以還需要其他方法。
消息傳遞
上面提到的其他方法就是 消息傳遞(messaage passing)。這種進(jìn)程間通信的方法使用兩個(gè)原語send 和 receive ,它們像信號(hào)量而不像管程,是系統(tǒng)調(diào)用而不是語言級(jí)別。示例如下
- send(destination, &message);
- receive(source, &message);
send 方法用于向一個(gè)給定的目標(biāo)發(fā)送一條消息,receive 從一個(gè)給定的源接收一條消息。如果沒有消息,接受者可能被阻塞,直到接收一條消息或者帶著錯(cuò)誤碼返回。
消息傳遞系統(tǒng)現(xiàn)在面臨著許多信號(hào)量和管程所未涉及的問題和設(shè)計(jì)難點(diǎn),尤其對(duì)那些在網(wǎng)絡(luò)中不同機(jī)器上的通信狀況。例如,消息有可能被網(wǎng)絡(luò)丟失。為了防止消息丟失,發(fā)送方和接收方可以達(dá)成一致:一旦接受到消息后,接收方馬上回送一條特殊的 確認(rèn)(acknowledgement) 消息。如果發(fā)送方在一段時(shí)間間隔內(nèi)未收到確認(rèn),則重發(fā)消息。
現(xiàn)在考慮消息本身被正確接收,而返回給發(fā)送著的確認(rèn)消息丟失的情況。發(fā)送者將重發(fā)消息,這樣接受者將收到兩次相同的消息。
對(duì)于接收者來說,如何區(qū)分新的消息和一條重發(fā)的老消息是非常重要的。通常采用在每條原始消息中嵌入一個(gè)連續(xù)的序號(hào)來解決此問題。如果接受者收到一條消息,它具有與前面某一條消息一樣的序號(hào),就知道這條消息是重復(fù)的,可以忽略。
消息系統(tǒng)還必須處理如何命名進(jìn)程的問題,以便在發(fā)送或接收調(diào)用中清晰的指明進(jìn)程。身份驗(yàn)證(authentication) 也是一個(gè)問題,比如客戶端怎么知道它是在與一個(gè)真正的文件服務(wù)器通信,從發(fā)送方到接收方的信息有可能被中間人所篡改。
屏障
最后一個(gè)同步機(jī)制是準(zhǔn)備用于進(jìn)程組而不是進(jìn)程間的生產(chǎn)者-消費(fèi)者情況的。在某些應(yīng)用中劃分了若干階段,并且規(guī)定,除非所有的進(jìn)程都就緒準(zhǔn)備著手下一個(gè)階段,否則任何進(jìn)程都不能進(jìn)入下一個(gè)階段,可以通過在每個(gè)階段的結(jié)尾安裝一個(gè) 屏障(barrier) 來實(shí)現(xiàn)這種行為。當(dāng)一個(gè)進(jìn)程到達(dá)屏障時(shí),它會(huì)被屏障所攔截,直到所有的屏障都到達(dá)為止。屏障可用于一組進(jìn)程同步,如下圖所示
在上圖中我們可以看到,有四個(gè)進(jìn)程接近屏障,這意味著每個(gè)進(jìn)程都在進(jìn)行運(yùn)算,但是還沒有到達(dá)每個(gè)階段的結(jié)尾。過了一段時(shí)間后,A、B、D 三個(gè)進(jìn)程都到達(dá)了屏障,各自的進(jìn)程被掛起,但此時(shí)還不能進(jìn)入下一個(gè)階段呢,因?yàn)檫M(jìn)程 B 還沒有執(zhí)行完畢。結(jié)果,當(dāng)最后一個(gè) C 到達(dá)屏障后,這個(gè)進(jìn)程組才能夠進(jìn)入下一個(gè)階段。
避免鎖:讀-復(fù)制-更新
最快的鎖是根本沒有鎖。問題在于沒有鎖的情況下,我們是否允許對(duì)共享數(shù)據(jù)結(jié)構(gòu)的并發(fā)讀寫進(jìn)行訪問。答案當(dāng)然是不可以。假設(shè)進(jìn)程 A 正在對(duì)一個(gè)數(shù)字?jǐn)?shù)組進(jìn)行排序,而進(jìn)程 B 正在計(jì)算其平均值,而此時(shí)你進(jìn)行 A 的移動(dòng),會(huì)導(dǎo)致 B 會(huì)多次讀到重復(fù)值,而某些值根本沒有遇到過。
然而,在某些情況下,我們可以允許寫操作來更新數(shù)據(jù)結(jié)構(gòu),即便還有其他的進(jìn)程正在使用。竅門在于確保每個(gè)讀操作要么讀取舊的版本,要么讀取新的版本,例如下面的樹
上面的樹中,讀操作從根部到葉子遍歷整個(gè)樹。加入一個(gè)新節(jié)點(diǎn) X 后,為了實(shí)現(xiàn)這一操作,我們要讓這個(gè)節(jié)點(diǎn)在樹中可見之前使它"恰好正確":我們對(duì)節(jié)點(diǎn) X 中的所有值進(jìn)行初始化,包括它的子節(jié)點(diǎn)指針。然后通過原子寫操作,使 X 稱為 A 的子節(jié)點(diǎn)。所有的讀操作都不會(huì)讀到前后不一致的版本
在上面的圖中,我們接著移除 B 和 D。首先,將 A 的左子節(jié)點(diǎn)指針指向 C 。所有原本在 A 中的讀操作將會(huì)后續(xù)讀到節(jié)點(diǎn) C ,而永遠(yuǎn)不會(huì)讀到 B 和 D。也就是說,它們將只會(huì)讀取到新版數(shù)據(jù)。同樣,所有當(dāng)前在 B 和 D 中的讀操作將繼續(xù)按照原始的數(shù)據(jù)結(jié)構(gòu)指針并且讀取舊版數(shù)據(jù)。所有操作均能正確運(yùn)行,我們不需要鎖住任何東西。而不需要鎖住數(shù)據(jù)就能夠移除 B 和 D 的主要原因就是 讀-復(fù)制-更新(Ready-Copy-Update,RCU),將更新過程中的移除和再分配過程分離開。
Java 并發(fā)工具包JDK 1.5 提供了許多種并發(fā)容器來改進(jìn)同步容器的性能,同步容器將所有對(duì)容器狀態(tài)的訪問都串行化,以實(shí)現(xiàn)他們之間的線程安全性。這種方法的代價(jià)是嚴(yán)重降低了并發(fā)性能,當(dāng)多個(gè)線程爭搶容器鎖的同時(shí),嚴(yán)重降低吞吐量。
下面我們就來一起認(rèn)識(shí)一下 Java 中都用了哪些并發(fā)工具
Java 并發(fā)工具綜述
在 Java 5.0 中新增加了 ConcurrentHashMap 用來替代基于散列的 Map 容器;新增加了 CopyOnWriteArrayList 和 CopyOnWriteArraySet 來分別替代 ArrayList 和 Set 接口實(shí)現(xiàn)類;還新增加了兩種容器類型,分別是 Queue 和 BlockingQueue, Queue 是隊(duì)列的意思,它有一些實(shí)現(xiàn)分別是傳統(tǒng)的先進(jìn)先出隊(duì)列 ConcurrentLinkedQueue以及并發(fā)優(yōu)先級(jí)隊(duì)列 PriorityQueue。Queue 是一個(gè)先入先出的隊(duì)列,它的操作不會(huì)阻塞,如果隊(duì)列為空那么獲取元素的操作會(huì)返回空值。PriorityQueue 擴(kuò)展了 Queue,增加了可阻塞的插入和獲取等操作。如果隊(duì)列為空,那么獲取元素的操作將一直阻塞,直到隊(duì)列中出現(xiàn)一個(gè)可用的元素為止。如果隊(duì)列已滿,那么插入操作則一直阻塞,直到隊(duì)列中有可用的空間為止。
Java 6.0 還引入了 ConcurrentSkipListMap 和 ConcurrentSkipListSet 分別作為同步的 SortedMap 和 SortedSet 的并發(fā)替代品。下面我們就展開探討了,設(shè)計(jì)不到底層源碼,因?yàn)楸酒恼轮饕康木褪菫榱嗣枋鲆幌掠心男〇|西以及用了哪些東西。
ConcurrentHashMap
我們先來看一下 ConcurrentHashMap 在并發(fā)集合中的位置
可以看到,ConcurrentHashMap 繼承了 AbstractMap 接口并實(shí)現(xiàn)了 ConcurrentMap 和 Serializable 接口,AbstractMap 和 ConcurrentMap 都是 Map 的實(shí)現(xiàn)類,只不過 AbstractMap 是抽象實(shí)現(xiàn)。
ConcurrentHashMap 和 Hashtable 的構(gòu)造非常相似,只不過 Hashtable 容器在激烈競爭的場景中會(huì)表現(xiàn)出效率低下的現(xiàn)象,這是因?yàn)樗性L問 Hashtable 的線程都想獲取同一把鎖,如果容器里面有多把鎖,并且每一把鎖都只用來鎖定一段數(shù)據(jù),那么當(dāng)多個(gè)線程訪問不同的數(shù)據(jù)段時(shí),就不存在競爭關(guān)系。這就是 ConcurreentHashMap 采用的 分段鎖 實(shí)現(xiàn)。在這種鎖實(shí)現(xiàn)中,任意數(shù)量的讀取線程可以并發(fā)的訪問 Map,執(zhí)行讀取操作的線程和執(zhí)行寫入的線程可以并發(fā)的訪問 Map,并且在讀取的同時(shí)也可以并發(fā)修改 Map。
ConcurrentHashMap 分段鎖實(shí)現(xiàn)帶來的結(jié)果是,在并發(fā)環(huán)境下可以實(shí)現(xiàn)更高的吞吐量,在單線程環(huán)境下只損失非常小的性能。
你知道 HashMap 是具有 fail-fast 機(jī)制的,也就是說它是一種強(qiáng)一致性的集合,在數(shù)據(jù)不一致的情況下會(huì)拋出 ConcurrentModificationException 異常,而 ConcurrentHashMap 是一種 弱一致性 的集合,在并發(fā)修改其內(nèi)部結(jié)構(gòu)時(shí),它不會(huì)拋出 ConcurrentModificationException 異常,弱一致性能夠容忍并發(fā)修改。
在 HashMap 中,我們一般使用的 size、empty、containsKey 等方法都是標(biāo)準(zhǔn)方法,其返回的結(jié)果是一定的,包含就是包含,不包含就是不包含,可以作為判斷條件;而 ConcurrentHashMap 中的這些方法只是參考方法,它不是一個(gè) 精確值,像是 size、empty 這些方法在并發(fā)場景下用處很小,因?yàn)樗麄兊姆祷刂悼偸窃诓粩嘧兓赃@些操作的需求就被弱化了。
在 ConcurrentHashMap 中沒有實(shí)現(xiàn)對(duì) Map 加鎖從而實(shí)現(xiàn)獨(dú)占訪問。在線程安全的 Map 實(shí)現(xiàn) Hashtable 和 Collections.synchronizedMap 中都實(shí)現(xiàn)了獨(dú)占訪問,因此只能單個(gè)線程修改 Map 。ConcurrentHashMap 與這些 Map 容器相比,具有更多的優(yōu)勢和更少的劣勢,只有當(dāng)需要獨(dú)占訪問的需求時(shí)才會(huì)使用 Hashtable 或者是 Collections.synchronizedMap ,否則其他并發(fā)場景下,應(yīng)該使用 ConcurrentHashMap。
ConcurrentMap
ConcurrentMap 是一個(gè)接口,它繼承了 Map 接口并提供了 Map 接口中四個(gè)新的方法,這四個(gè)方法都是 原子性 方法,進(jìn)一步擴(kuò)展了 Map 的功能。
- public interface ConcurrentMap<K, V> extends Map<K, V> {
- // 僅當(dāng) key 沒有相應(yīng)的映射值時(shí)才插入
- V putIfAbsent(K key, V value);
- // 僅當(dāng) key 被映射到 value 時(shí)才移除
- boolean remove(Object key, Object value);
- // 僅當(dāng) key 被映射到 value 時(shí)才移除
- V replace(K key, V value);
- // 僅當(dāng) key 被映射到 oldValue 時(shí)才替換為 newValue
- boolean replace(K key, V oldValue, V newValue);
- }
ConcurrentNavigableMap
java.util.concurrent.ConcurrentNavigableMap 類是 java.util.NavigableMap 的子類,它支持并發(fā)訪問,并且允許其視圖的并發(fā)訪問。
什么是視圖呢?視圖就是集合中的一段數(shù)據(jù)序列,ConcurrentNavigableMap 中支持使用 headMap、subMap、tailMap 返回的視圖。與其重新解釋一下 NavigableMap 中找到的所有方法,不如看一下 ConcurrentNavigableMap 中添加的方法
- headMap 方法:headMap 方法返回一個(gè)嚴(yán)格小于給定鍵的視圖
- tailMap 方法:tailMap 方法返回包含大于或等于給定鍵的視圖。
- subMap 方法:subMap 方法返回給定兩個(gè)參數(shù)的視圖
ConcurrentNavigableMap 接口包含一些可能有用的其他方法
- descendingKeySet()
- descendingMap()
- navigableKeySet()
更多關(guān)于方法的描述這里就不再贅述了,讀者朋友們可自行查閱 javadoc
ConcurrentSkipListMap
ConcurrentSkipListMap 是線程安全的有序的哈希表,適用于高并發(fā)的場景。
ConcurrentSkipListMap 的底層數(shù)據(jù)結(jié)構(gòu)是基于跳表實(shí)現(xiàn)的。ConcurrentSkipListMap 可以提供 Comparable 內(nèi)部排序或者是 Comparator 外部排序,具體取決于使用哪個(gè)構(gòu)造函數(shù)。
ConcurrentSkipListSet
ConcurrentSkipListSet 是線程安全的有序的集合,適用于高并發(fā)的場景。ConcurrentSkipListSet 底層是通過 ConcurrentNavigableMap 來實(shí)現(xiàn)的,它是一個(gè)有序的線程安全的集合。
ConcurrentSkipListSet有序的,基于元素的自然排序或者通過比較器確定的順序;
ConcurrentSkipListSet是線程安全的;
CopyOnWriteArrayList
CopyOnWriteArrayList 是 ArrayList 的變體,在 CopyOnWriteArrayList 中,所有可變操作比如 add、set 其實(shí)都是重新創(chuàng)建了一個(gè)副本,通過對(duì)數(shù)組進(jìn)行復(fù)制而實(shí)現(xiàn)的。
CopyOnWriteArrayList 其內(nèi)部有一個(gè)指向數(shù)組的引用,數(shù)組是不會(huì)被修改的,每次并發(fā)修改 CopyOnWriteArrayList 都相當(dāng)于重新創(chuàng)建副本,CopyOnWriteArrayList 是一種 fail-safe 機(jī)制的,它不會(huì)拋出 ConcurrentModificationException 異常,并且返回元素與迭代器創(chuàng)建時(shí)的元素相同。
每次并發(fā)寫操作都會(huì)創(chuàng)建新的副本,這個(gè)過程存在一定的開銷,所以,一般在規(guī)模很大時(shí),讀操作要遠(yuǎn)遠(yuǎn)多于寫操作時(shí),為了保證線程安全性,會(huì)使用 CopyOnWriteArrayList。
類似的,CopyOnWriteArraySet 的作用也相當(dāng)于替代了 Set 接口。
BlockingQueue
BlockingQueue 譯為 阻塞隊(duì)列,它是 JDK 1.5 添加的新的工具類,它繼承于 Queue 隊(duì)列,并擴(kuò)展了 Queue 的功能。
BlockingQueue 在檢索元素時(shí)會(huì)等待隊(duì)列變成非空,并在存儲(chǔ)元素時(shí)會(huì)等待隊(duì)列變?yōu)榭捎谩lockingQueue 的方法有四種實(shí)現(xiàn)形式,以不同的方式來處理。
- 第一種是拋出異常
- 特殊值:第二種是根據(jù)情況會(huì)返回 null 或者 false
- 阻塞:第三種是無限期的阻塞當(dāng)前線程直到操作變?yōu)榭捎煤?/li>
- 超時(shí):第四種是給定一個(gè)最大的超時(shí)時(shí)間,超過后才會(huì)放棄
BlockingQueue 不允許添加 null 元素,在其實(shí)現(xiàn)類的方法 add、put 或者 offer 后時(shí)添加 null 會(huì)拋出空指針異常。BlockingQueue 會(huì)有容量限制。在任意時(shí)間內(nèi),它都會(huì)有一個(gè) remainCapacity,超過該值之前,任意 put 元素都會(huì)阻塞。
BlockingQueue 一般用于實(shí)現(xiàn)生產(chǎn)者 - 消費(fèi)者 隊(duì)列,如下圖所示
BlockingQueue 有多種實(shí)現(xiàn),下面我們一起來認(rèn)識(shí)一下這些容器。
其中 LinkedBlockingQueue 和 ArrayBlockingQueue 是 FIFO 先入先出隊(duì)列,二者分別和 LinkedList和 ArrayList 對(duì)應(yīng),比同步 List 具有更好的并發(fā)性能。PriorityBlockingQueue 是一個(gè)優(yōu)先級(jí)排序的阻塞隊(duì)列,如果你希望按照某種順序而不是 FIFO 處理元素時(shí)這個(gè)隊(duì)列將非常有用。正如其他有序的容器一樣,PriorityBlockingQueue 既可以按照自然順序來比較元素,也可以使用 Comparator 比較器進(jìn)行外部元素比較。SynchronousQueue 它維護(hù)的是一組線程而不是一組隊(duì)列,實(shí)際上它不是一個(gè)隊(duì)列,它的每個(gè) insert 操作必須等待其他相關(guān)線程的 remove 方法后才能執(zhí)行,反之亦然。
LinkedBlockingQueue
LinkedBlockingQueue 是一種 BlockingQueue 的實(shí)現(xiàn)。
它是一種基于鏈表的構(gòu)造、先入先出的有界阻塞隊(duì)列。隊(duì)列的 head 也就是頭元素是在隊(duì)列中等待時(shí)間最長的元素;隊(duì)列的 tail也就是隊(duì)尾元素是隊(duì)列中等待時(shí)間最短的元素。新的元素會(huì)被插入到隊(duì)尾中,檢索操作將獲取隊(duì)列中的頭部元素。鏈表隊(duì)列通常比基于數(shù)組的隊(duì)列具有更高的吞吐量,但是在大多數(shù)并發(fā)應(yīng)用程序中,可預(yù)測的性能較差。
ArrayBlockingQueue
ArrayBlockingQueue 是一個(gè)用數(shù)組實(shí)現(xiàn)的有界隊(duì)列,此隊(duì)列順序按照先入先出的原則對(duì)元素進(jìn)行排序。
默認(rèn)情況下不保證線程公平的訪問隊(duì)列,所謂公平訪問隊(duì)列指的是阻塞的線程,可以按照阻塞的先后順序訪問,即先阻塞線程先訪問隊(duì)列。非公平性是對(duì)先等待的線程是非公平的。有可能先阻塞的線程最后才訪問隊(duì)列。
PriorityBlockingQueue
PriorityBlockingQueue 是一個(gè)支持優(yōu)先級(jí)的阻塞隊(duì)列,默認(rèn)情況下的元素采取自然順序生序或者降序,也可以自己定義 Comparator 進(jìn)行外部排序。但需要注意的是不能保證同優(yōu)先級(jí)元素的順序。
DelayQueue
DelayQueue 是一個(gè)支持延時(shí)獲取元素的無阻塞隊(duì)列,其中的元素只能在延遲到期后才能使用,DelayQueue 中的隊(duì)列頭是延遲最長時(shí)間的元素,如果沒有延遲,則沒有 head 頭元素,poll 方法會(huì)返回 null。判斷的依據(jù)就是 getDelay(TimeUnit.NANOSECONDS) 方法返回一個(gè)值小于或者等于 0 就會(huì)發(fā)生過期。
TransferQueue
TransferQueue 繼承于 BlockingQueue,它是一個(gè)接口,一個(gè) BlockingQueue 是一個(gè)生產(chǎn)者可能等待消費(fèi)者接受元素,TransferQueue 則更進(jìn)一步,生產(chǎn)者會(huì)一直阻塞直到所添加到隊(duì)列的元素被某一個(gè)消費(fèi)者所消費(fèi),新添加的transfer 方法用來實(shí)現(xiàn)這種約束。
TransferQueue 有下面這些方法:兩個(gè) tryTransfer 方法,一個(gè)是非阻塞的,另一個(gè)是帶有 timeout 參數(shù)設(shè)置超時(shí)時(shí)間的。還有兩個(gè)輔助方法 hasWaitingConsumer 和 getWaitingConcusmerCount。
LinkedTransferQueue
一個(gè)無界的基于鏈表的 TransferQueue。這個(gè)隊(duì)列對(duì)任何給定的生產(chǎn)者進(jìn)行 FIFO 排序,head 是隊(duì)列中存在時(shí)間最長的元素。tail 是隊(duì)列中存在時(shí)間最短的元素。
BlockingDeque
與 BlockingQueue 相對(duì)的還有 BlockingDeque 和 Deque,它們是 JDK1.6 被提出的,分別對(duì) Queue 和 BlockingQueue 做了擴(kuò)展。
Deque 是一個(gè)雙端隊(duì)列,分別實(shí)現(xiàn)了在隊(duì)列頭和隊(duì)列尾的插入。Deque 的實(shí)現(xiàn)有 ArrayDeque、ConcurrentLinkedDeque,BlockingDeque 的實(shí)現(xiàn)有 LinkedBlockingDeque 。
阻塞模式一般用于生產(chǎn)者 - 消費(fèi)者隊(duì)列,而雙端隊(duì)列適用于工作密取。在工作密取的設(shè)計(jì)中,每個(gè)消費(fèi)者都有各自的雙端隊(duì)列,如果一個(gè)消費(fèi)者完成了自己雙端隊(duì)列的任務(wù),就會(huì)去其他雙端隊(duì)列的末尾進(jìn)行消費(fèi)。密取方式要比傳統(tǒng)的生產(chǎn)者 - 消費(fèi)者隊(duì)列具有更高的可伸縮性,這是因?yàn)槊總€(gè)工作密取的工作者都有自己的雙端隊(duì)列,不存在競爭的情況。
ArrayDeque
ArrayDeque 是 Deque 的可動(dòng)態(tài)調(diào)整大小的數(shù)組實(shí)現(xiàn),其內(nèi)部沒有容量限制,他們會(huì)根據(jù)需要進(jìn)行增長。ArrayDeque 不是線程安全的,如果沒有外部加鎖的情況下,不支持多線程訪問。ArrayDeque 禁止空元素,這個(gè)類作為棧使用時(shí)要比 Stack 快,作為 queue 使用時(shí)要比 LinkedList 快。
除了 remove、removeFirstOccurrence、removeLastOccurrence、contains、interator.remove 外,大部分的 ArrayDeque 都以恒定的開銷運(yùn)行。
“注意:ArrayDeque 是 fail-fast 的,如果創(chuàng)建了迭代器之后,卻使用了迭代器外部的 remove 等修改方法,那么這個(gè)類將會(huì)拋出 ConcurrentModificationException 異常。
ConcurrentLinkedDeque
ConcurrentLinkedDeque 是 JDK1.7 引入的雙向鏈表的無界并發(fā)隊(duì)列。它與 ConcurrentLinkedQueue 的區(qū)別是 ConcurrentLinkedDeque 同時(shí)支持 FIFO 和 FILO 兩種操作方式,即可以從隊(duì)列的頭和尾同時(shí)操作(插入/刪除)。ConcurrentLinkedDeque 也支持 happen-before 原則。ConcurrentLinkedDeque 不允許空元素。
LinkedBlockingDeque
LinkedBlockingDeque 是一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列,即可以從隊(duì)列的兩端插入和移除元素。雙向隊(duì)列因?yàn)槎嗔艘粋€(gè)操作隊(duì)列的入口,在多線程同時(shí)入隊(duì)時(shí),也就減少了一半的競爭。LinkedBlockingDeque 把初始容量和構(gòu)造函數(shù)綁定,這樣能夠有效過度拓展。初始容量如果沒有指定,就取的是 Integer.MAX_VALUE,這也是 LinkedBlockingDeque 的默認(rèn)構(gòu)造函數(shù)。
同步工具類
同步工具類可以是任何一個(gè)對(duì)象,只要它根據(jù)自身狀態(tài)來協(xié)調(diào)線程的控制流。阻塞隊(duì)列可以作為同步控制類,其他類型的同步工具類還包括 信號(hào)量(Semaphore)、柵欄(Barrier) 和 閉鎖(Latch)。下面我們就來一起認(rèn)識(shí)一下這些工具類
Semaphore
Semaphore 翻譯過來就是 信號(hào)量,信號(hào)量是什么?它其實(shí)就是一種信號(hào),在操作系統(tǒng)中,也有信號(hào)量的這個(gè)概念,在進(jìn)程間通信的時(shí)候,我們就會(huì)談到信號(hào)量進(jìn)行通信。還有在 Linux 操作系統(tǒng)采取中斷時(shí),也會(huì)向進(jìn)程發(fā)出中斷信號(hào),根據(jù)進(jìn)程的種類和信號(hào)的類型判斷是否應(yīng)該結(jié)束進(jìn)程。
在 Java 中,Semaphore(信號(hào)量)是用來控制同時(shí)訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個(gè)線程,以保證合理的使用公共資源。
Semaphore 管理著一組許可(permit),許可的初始數(shù)量由構(gòu)造函數(shù)來指定。在獲取某個(gè)資源之前,應(yīng)該先從信號(hào)量獲取許可(permit),以確保資源是否可用。當(dāng)線程完成對(duì)資源的操作后,會(huì)把它放在池中并向信號(hào)量返回一個(gè)許可,從而允許其他線程訪問資源,這叫做釋放許可。如果沒有許可的話,那么 acquire 將會(huì)阻塞直到有許可(中斷或者操作超時(shí))為止。release方法將返回一個(gè)許可信號(hào)量。
Semaphore 可以用來實(shí)現(xiàn)流量控制,例如常用的數(shù)據(jù)庫連接池,線程請(qǐng)求資源時(shí),如果數(shù)據(jù)庫連接池為空則阻塞線程,直接返回失敗,如果連接池不為空時(shí)解除阻塞。
CountDownLatch
閉鎖(Latch) 是一種同步工具類,它可以延遲線程的進(jìn)度以直到其到達(dá)終止?fàn)顟B(tài)。閉鎖的作用相當(dāng)于是一扇門,在閉鎖達(dá)到結(jié)束狀態(tài)前,門是一直關(guān)著的,沒有任何線程能夠通過。當(dāng)閉鎖到達(dá)結(jié)束狀態(tài)后,這扇門會(huì)打開并且允許任何線程通過,然后就一直保持打開狀態(tài)。
CountDownLatch 就是閉鎖的一種實(shí)現(xiàn)。它可以使一個(gè)或者多個(gè)線程等待一組事件的發(fā)生。閉鎖有一個(gè)計(jì)數(shù)器,閉鎖需要對(duì)計(jì)數(shù)器進(jìn)行初始化,表示需要等待的次數(shù),閉鎖在調(diào)用 await處進(jìn)行等待,其他線程在調(diào)用 countDown 把閉鎖 count 次數(shù)進(jìn)行遞減,直到遞減為 0 ,喚醒 await。如下代碼所示
- public class TCountDownLatch {
- public static void main(String[] args) {
- CountDownLatch latch = new CountDownLatch(5);
- Increment increment = new Increment(latch);
- Decrement decrement = new Decrement(latch);
- new Thread(increment).start();
- new Thread(decrement).start();
- try {
- Thread.sleep(6000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- class Decrement implements Runnable {
- CountDownLatch countDownLatch;
- public Decrement(CountDownLatch countDownLatch){
- this.countDownLatch = countDownLatch;
- }
- @Override
- public void run() {
- try {
- for(long i = countDownLatch.getCount();i > 0;i--){
- Thread.sleep(1000);
- System.out.println("countdown");
- this.countDownLatch.countDown();
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- class Increment implements Runnable {
- CountDownLatch countDownLatch;
- public Increment(CountDownLatch countDownLatch){
- this.countDownLatch = countDownLatch;
- }
- @Override
- public void run() {
- try {
- System.out.println("await");
- countDownLatch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("Waiter Released");
- }
- }
Future
我們常見的創(chuàng)建多線程的方式有兩種,一種是繼承 Thread 類,一種是實(shí)現(xiàn) Runnable 接口。這兩種方式都沒有返回值。相對(duì)的,創(chuàng)建多線程還有其他三種方式,那就是使用 Callable接口、 Future 接口和 FutureTask 類。Callable 我們之前聊過,這里就不再描述了,我們主要來描述一下 Future 和 FutureTask 接口。
Future 就是對(duì)具體的 Runnable 或者 Callable 任務(wù)的執(zhí)行結(jié)果進(jìn)行一系列的操作,必要時(shí)可通過 get 方法獲取執(zhí)行結(jié)果,這個(gè)方法會(huì)阻塞直到執(zhí)行結(jié)束。Future 中的主要方法有
- public interface Future<V> {
- boolean cancel(boolean mayInterruptIfRunning);
- boolean isCancelled();
- boolean isDone();
- V get() throws InterruptedException, ExecutionException;
- V get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException;
- }
- cancel(boolean mayInterruptIfRunning) : 嘗試取消任務(wù)的執(zhí)行。如果任務(wù)已經(jīng)完成、已經(jīng)被取消或者由于某些原因而無法取消,那么這個(gè)嘗試會(huì)失敗。如果取消成功,或者在調(diào)用 cancel 時(shí)此任務(wù)尚未開始,那么此任務(wù)永遠(yuǎn)不會(huì)執(zhí)行。如果任務(wù)已經(jīng)開始,那么 mayInterruptIfRunning 參數(shù)會(huì)確定是否中斷執(zhí)行任務(wù)以便于嘗試停止該任務(wù)。這個(gè)方法返回后,會(huì)對(duì) isDone 的后續(xù)調(diào)用也返回 true,如果 cancel 返回 true,那么后續(xù)的調(diào)用 isCancelled 也會(huì)返回 true。
- boolean isCancelled():如果此任務(wù)在正常完成之前被取消,則返回 true。
- boolean isDone():如果任務(wù)完成,返回 true。
- V get() throws InterruptedException, ExecutionException:等待必要的計(jì)算完成,然后檢索其結(jié)果
- V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException :必要時(shí)最多等待給定時(shí)間以完成計(jì)算,然后檢索其結(jié)果。
因?yàn)镕uture只是一個(gè)接口,所以是無法直接用來創(chuàng)建對(duì)象使用的,因此就有了下面的FutureTask。
FutureTask
FutureTask 實(shí)現(xiàn)了 RunnableFuture 接口,RunnableFuture 接口是什么呢?
RunnableFuture 接口又繼承了 Runnable 接口和 Future 接口。納尼?在 Java 中不是只允許單繼承么,是的,單繼承更多的是說的類與類之間的繼承關(guān)系,子類繼承父類,擴(kuò)展父類的接口,這個(gè)過程是單向的,就是為了解決多繼承引起的過渡引用問題。而接口之間的繼承是接口的擴(kuò)展,在 Java 編程思想中也印證了這一點(diǎn)
對(duì) RunnableFuture 接口的解釋是:成功執(zhí)行的 run 方法會(huì)使 Future 接口的完成并允許訪問其結(jié)果。所以它既可以作為 Runnable 被線程執(zhí)行,又可以作為 Future 得到 Callable 的返回值。
FutureTask 也可以用作閉鎖,它可以處于以下三種狀態(tài)
- 等待運(yùn)行
- 正在運(yùn)行
- 運(yùn)行完成
FutureTask 在 Executor 框架中表示異步任務(wù),此外還可以表示一些時(shí)間較長的計(jì)算,這些計(jì)算可以在使用計(jì)算結(jié)果之前啟動(dòng)。
FutureTask 具體的源碼我后面會(huì)單獨(dú)出文章進(jìn)行描述。
Barrier
我們上面聊到了通過閉鎖來啟動(dòng)一組相關(guān)的操作,使用閉鎖來等待一組事件的執(zhí)行。閉鎖是一種一次性對(duì)象,一旦進(jìn)入終止?fàn)顟B(tài)后,就不能被 重置。
Barrier 的特點(diǎn)和閉鎖也很類似,它也是阻塞一組線程直到某個(gè)事件發(fā)生。柵欄與閉鎖的區(qū)別在于,所有線程必須同時(shí)到達(dá)柵欄的位置,才能繼續(xù)執(zhí)行,就像我們上面操作系統(tǒng)給出的這幅圖一樣。
ABCD 四條線程,必須同時(shí)到達(dá) Barrier,然后 手牽手一起走過幸福的殿堂。
當(dāng)線程到達(dá) Barrier 的位置時(shí)會(huì)調(diào)用 await 方法,這個(gè)方法會(huì)阻塞直到所有線程都到達(dá) Barrier 的位置,如果所有線程都到達(dá) Barrier 的位置,那么 Barrier 將會(huì)打開使所有線程都被釋放,而 Barrier 將被重置以等待下次使用。如果調(diào)用 await 方法導(dǎo)致超時(shí),或者 await 阻塞的線程被中斷,那么 Barrier 就被認(rèn)為被打破,所有阻塞的 await 都會(huì)拋出 BrokenBarrierException 。如果成功通過柵欄后,await 方法返回一個(gè)唯一索引號(hào),可以利用這些索引號(hào)選舉一個(gè)新的 leader,來處理一下其他工作。
- public class TCyclicBarrier {
- public static void main(String[] args) {
- Runnable runnable = () -> System.out.println("Barrier 1 開始...");
- Runnable runnable2 = () -> System.out.println("Barrier 2 開始...");
- CyclicBarrier barrier1 = new CyclicBarrier(2,runnable);
- CyclicBarrier barrier2 = new CyclicBarrier(2,runnable2);
- CyclicBarrierRunnable b1 = new CyclicBarrierRunnable(barrier1,barrier2);
- CyclicBarrierRunnable b2 = new CyclicBarrierRunnable(barrier1,barrier2);
- new Thread(b1).start();
- new Thread(b2).start();
- }
- }
- class CyclicBarrierRunnable implements Runnable {
- CyclicBarrier barrier1;
- CyclicBarrier barrier2;
- public CyclicBarrierRunnable(CyclicBarrier barrier1,CyclicBarrier barrier2){
- this.barrier1 = barrier1;
- this.barrier2 = barrier2;
- }
- @Override
- public void run() {
- try {
- Thread.sleep(1000);
- System.out.println(Thread.currentThread().getName() + "等待 barrier1" );
- barrier1.await();
- Thread.sleep(1000);
- System.out.println(Thread.currentThread().getName() + "等待 barrier2" );
- barrier2.await();
- } catch (InterruptedException | BrokenBarrierException e) {
- e.printStackTrace();
- }
- System.out.println(Thread.currentThread().getName() +
- " 做完了!");
- }
- }
Exchanger
與 Barrier 相關(guān)聯(lián)的還有一個(gè)工具類就是 Exchanger, Exchanger 是一個(gè)用于線程間協(xié)作的工具類。Exchanger用于進(jìn)行線程間的數(shù)據(jù)交換。
它提供一個(gè)同步點(diǎn),在這個(gè)同步點(diǎn)兩個(gè)線程可以交換彼此的數(shù)據(jù)。這兩個(gè)線程通過exchange 方法交換數(shù)據(jù), 如果第一個(gè)線程先執(zhí)行 exchange方法,它會(huì)一直等待第二個(gè)線程也執(zhí)行 exchange,當(dāng)兩個(gè)線程都到達(dá)同步點(diǎn)時(shí),這兩個(gè)線程就可以交換數(shù)據(jù),將本線程生產(chǎn)出來的數(shù)據(jù)傳遞給對(duì)方。因此使用Exchanger 的重點(diǎn)是成對(duì)的線程使用 exchange() 方法,當(dāng)有一對(duì)線程達(dá)到了同步點(diǎn),就會(huì)進(jìn)行交換數(shù)據(jù)。因此該工具類的線程對(duì)象是成對(duì)的。
下面通過一段例子代碼來講解一下
- public class TExchanger {
- public static void main(String[] args) {
- Exchanger exchanger = new Exchanger();
- ExchangerRunnable exchangerRunnable = new ExchangerRunnable(exchanger,"A");
- ExchangerRunnable exchangerRunnable2 = new ExchangerRunnable(exchanger,"B");
- new Thread(exchangerRunnable).start();
- new Thread(exchangerRunnable2).start();
- }
- }
- class ExchangerRunnable implements Runnable {
- Exchanger exchanger;
- Object object;
- public ExchangerRunnable(Exchanger exchanger,Object object){
- this.exchanger = exchanger;
- this.object = object;
- }
- @Override
- public void run() {
- Object previous = object;
- try {
- object = this.exchanger.exchange(object);
- System.out.println(
- Thread.currentThread().getName() + "改變前是" + previous + "改變后是" + object);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
總結(jié)本篇文章我們從同步容器類入手,主要講了 fail-fast 和 fail-safe 機(jī)制,這兩個(gè)機(jī)制在并發(fā)編程中非常重要。然后我們從操作系統(tǒng)的角度,聊了聊操作系統(tǒng)層面實(shí)現(xiàn)安全性的幾種方式,然后從操作系統(tǒng) -> 并發(fā)我們聊了聊 Java 中的并發(fā)工具包有哪些,以及構(gòu)建并發(fā)的幾種工具類。
本文轉(zhuǎn)載自微信公眾號(hào)「 程序員cxuan」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系 程序員cxuan公眾號(hào)。