自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

我工作三年了,該懂并發(fā)了

系統(tǒng)
本文的組織形式如下,主要會(huì)介紹到同步容器類,操作系統(tǒng)的并發(fā)工具,Java 開發(fā)工具包(只是簡單介紹一下,后面會(huì)有源碼分析)。同步工具類有哪些。

 [[340486]]

本文的組織形式如下,主要會(huì)介紹到同步容器類,操作系統(tǒng)的并發(fā)工具,Java 開發(fā)工具包(只是簡單介紹一下,后面會(huì)有源碼分析)。同步工具類有哪些。

 

下面我們就來介紹一下 Java 并發(fā)中都涉及哪些模塊,這些并發(fā)模塊都是 Java 并發(fā)類庫所提供的。

同步容器類

同步容器主要包括兩類,一種是本來就是線程安全實(shí)現(xiàn)的容器,這類容器有 Vector、Hashtable、Stack,這類容器的方法上都加了 synchronized 鎖,是線程安全的實(shí)現(xiàn)。

“Vector、Hashtable、Stack 這些容器我們現(xiàn)在幾乎都不在使用,因?yàn)檫@些容器在多線程環(huán)境下的效率不高。

還有一類是由 Collections.synchronizedxxx 實(shí)現(xiàn)的非線程安全的容器,使用 Collections.synchronized 會(huì)把它們封裝起來編程線程安全的容器,舉出兩個(gè)例子

  • Collections.synchronizedList
  • Collections.synchronizedMap

我們可以通過 Collections 源碼可以看出這些線程安全的實(shí)現(xiàn)

 

要不為啥要稱 Collections 為集合工具類呢?Collections 會(huì)把這些容器類的狀態(tài)封裝起來,并對(duì)每個(gè)同步方法進(jìn)行同步,使得每次只有一個(gè)線程能夠訪問容器的狀態(tài)。

其中每個(gè) synchronized xxx都是相當(dāng)于創(chuàng)建了一個(gè)靜態(tài)內(nèi)部類。

 

雖然同步容器類都是線程安全的,但是在某些情況下需要額外的客戶端加鎖來保證一些復(fù)合操作的安全性,復(fù)合操作就是有兩個(gè)及以上的方法組成的操作,比如最典型的就是 若沒有則添加,用偽代碼表示則是

  1. if(a == null){ 
  2.   a = get(); 

比如可以用來判斷 Map 中是否有某個(gè) key,如果沒有則添加進(jìn) Map 中。這些復(fù)合操作在沒有客戶端加鎖的情況下是線程安全的,但是當(dāng)多個(gè)線程并發(fā)修改容器時(shí),可能會(huì)表現(xiàn)出意料之外的行為。例如下面這段代碼

  1. public class TestVector implements Runnable{ 
  2.  
  3.     static Vector vector = new Vector(); 
  4.     static void addVector(){ 
  5.         for(int i = 0;i < 10000;i++){ 
  6.             vector.add(i); 
  7.         } 
  8.     } 
  9.  
  10.     static Object getVector(){ 
  11.         int index = vector.size() - 1; 
  12.         return vector.get(index); 
  13.     } 
  14.  
  15.     static void removeVector(){ 
  16.         int index = vector.size() - 1; 
  17.         vector.remove(index); 
  18.     } 
  19.  
  20.     @Override 
  21.     public void run() { 
  22.         getVector(); 
  23.     } 
  24.  
  25.     public static void main(String[] args) { 
  26.         TestVector testVector = new TestVector(); 
  27.         testVector.addVector(); 
  28.         Thread t1 = new Thread(() -> { 
  29.             for(int i = 0;i < vector.size();i++){ 
  30.                 getVector(); 
  31.             } 
  32.         }); 
  33.  
  34.         Thread t2 = new Thread(() -> { 
  35.             for(int i = 0;i < vector.size();i++){ 
  36.                 removeVector(); 
  37.             } 
  38.         }); 
  39.  
  40.         t1.start(); 
  41.         t2.start(); 
  42.     } 
  43.  

這些方法看似沒有問題,因?yàn)?Vector 能夠保證線程安全性,無論多少個(gè)線程訪問 Vector 也不會(huì)造成 Vector 的內(nèi)部產(chǎn)生破壞,但是從整個(gè)系統(tǒng)來說,是存在線程安全性的,事實(shí)上你運(yùn)行一下,也會(huì)發(fā)現(xiàn)報(bào)錯(cuò)。

會(huì)出現(xiàn)

 

如果線程 A 在包含這么多元素的基礎(chǔ)上調(diào)用 getVector 方法,會(huì)得到一個(gè)數(shù)值,getVector 只是取得該元素,而并不是從 vector 中移除,removeVector 方法是得到一個(gè)元素進(jìn)行移除,這段代碼的不安全因素就是,因?yàn)榫€程的時(shí)間片是亂序的,而且 getVector 和 removeVector 并不會(huì)保證互斥,所以在 removeVector 方法把某個(gè)值比如 6666 移除后,vector 中就不存在這個(gè) 6666 的元素,此時(shí) getVector 方法取得 6666 ,就會(huì)拋出數(shù)組越界異常。為什么是數(shù)組越界異常呢?可以看一下 vector 的源碼

 

如果用圖表示的話,則會(huì)是下面這樣。

 

所以,從系統(tǒng)的層面來看,上面這段代碼也要保證線程安全性才可以,也就是在客戶端加鎖實(shí)現(xiàn),只要我們讓復(fù)合操作使用一把鎖,那么這些操作就和其他單獨(dú)的操作一樣都是原子性的。如下面例子所示

  1. static Object getVector(){ 
  2.   synchronized (vector){ 
  3.     int index = vector.size() - 1; 
  4.     return vector.get(index); 
  5.   } 
  6.  
  7. static void removeVector(){ 
  8.   synchronized (vector) { 
  9.     int index = vector.size() - 1; 
  10.     vector.remove(index); 
  11.   } 

也可以通過鎖住 .class 來保證原子性操作,也能達(dá)到同樣的效果。

  1. static Object getVector(){ 
  2.   synchronized (TestVector.class){ 
  3.     int index = vector.size() - 1; 
  4.     return vector.get(index); 
  5.   } 
  6.  
  7. static void removeVector(){ 
  8.   synchronized (TestVector.class) { 
  9.     int index = vector.size() - 1; 
  10.     vector.remove(index); 
  11.   } 

在調(diào)用 size 和 get 之間,Vector 的長度可能會(huì)發(fā)生變化,這種變化在對(duì) Vector 進(jìn)行排序時(shí)出現(xiàn),如下所示

  1. for(int i = 0;i< vector.size();i++){ 
  2.   doSomething(vector.get(i)); 

這種迭代的操作正確性取決于運(yùn)氣,即在調(diào)用 size 和 get 之間會(huì)修改 Vector,在單線程環(huán)境中,這種假設(shè)完全成立,但是再有其他線程并發(fā)修改 Vector 時(shí),則可能會(huì)導(dǎo)致麻煩。

我們?nèi)耘f可以通過客戶端加鎖的方式來避免這種情況

  1. synchronized(vector){ 
  2.   for(int i = 0;i< vector.size();i++){ 
  3.     doSomething(vector.get(i)); 
  4.   }   

這種方式為客戶端的可靠性提供了保證,但是犧牲了伸縮性,而且這種在遍歷過程中進(jìn)行加鎖,也不是我們所希望看到的。

fail-fast

針對(duì)上面這種情況,很多集合類都提供了一種 fail-fast 機(jī)制,因?yàn)榇蟛糠旨蟽?nèi)部都是使用 Iterator 進(jìn)行遍歷,在循環(huán)中使用同步鎖的開銷會(huì)很大,而 Iterator 的創(chuàng)建是輕量級(jí)的,所以在集合內(nèi)部如果有并發(fā)修改的操作,集合會(huì)進(jìn)行快速失敗,也就是 fail-fast。當(dāng)他們發(fā)現(xiàn)容器在迭代過程中被修改時(shí),會(huì)拋出 ConcurrentModificationException 異常,這種快速失敗不是一種完備的處理機(jī)制,而只是 善意的捕獲并發(fā)錯(cuò)誤。

如果查看過 ConcurrentModificationException 的注解,你會(huì)發(fā)現(xiàn),ConcurrentModificationException 拋出的原則由兩種,如下

 

造成這種異常的原因是由于多個(gè)線程在遍歷集合的同時(shí)對(duì)集合類內(nèi)部進(jìn)行了修改,這也就是 fail-fast 機(jī)制。

該注解還聲明了另外一種方式

 

這個(gè)問題也是很經(jīng)典的一個(gè)問題,我們使用 ArrayList 來舉例子。如下代碼所示

  1. public static void main(String[] args) { 
  2.   List<String> list = new ArrayList<>(); 
  3.   for (int i = 0 ; i < 10 ; i++ ) { 
  4.     list.add(i + ""); 
  5.   } 
  6.   Iterator<String> iterator = list.iterator(); 
  7.   int i = 0 ; 
  8.   while(iterator.hasNext()) { 
  9.     if (i == 3) { 
  10.       list.remove(3); 
  11.     } 
  12.     System.out.println(iterator.next()); 
  13.     i ++; 
  14.   } 

該段代碼會(huì)發(fā)生異常,因?yàn)樵?ArrayList 內(nèi)部,有兩個(gè)屬性,一個(gè)是 modCount ,一個(gè)是 expectedModCount ,ArrayList 在 remove 等對(duì)集合結(jié)構(gòu)的元素造成數(shù)量上的操作會(huì)有 checkForComodification 的判斷,如下所示,這也是這段代碼的錯(cuò)誤原因。

 

fail-safe

fail-safe 是 Java 中的一種 安全失敗 機(jī)制,它表示的是在遍歷時(shí)不是直接在原集合上進(jìn)行訪問,而是先復(fù)制原有集合內(nèi)容,在拷貝的集合上進(jìn)行遍歷。由于迭代時(shí)是對(duì)原集合的拷貝進(jìn)行遍歷,所以在遍歷過程中對(duì)原集合所作的修改并不能被迭代器檢測到,所以不會(huì)觸發(fā) ConcurrentModificationException。java.util.concurrent 包下的容器都是安全失敗的,可以在多線程條件下使用,并發(fā)修改。

比如 CopyOnWriteArrayList, 它就是一種 fail-safe 機(jī)制的集合,它就不會(huì)出現(xiàn)異常,例如如下操作

  1. List<Integer> integers = new CopyOnWriteArrayList<>(); 
  2. integers.add(1); 
  3. integers.add(2); 
  4. integers.add(3); 
  5. Iterator<Integer> itr = integers.iterator(); 
  6. while (itr.hasNext()) { 
  7.     Integer a = itr.next(); 
  8.     integers.remove(a); 

CopyOnWriteArrayList 就是 ArrayList 的一種線程安全的變體,CopyOnWriteArrayList 中的所有可變操作比如 add 和 set 等等都是通過對(duì)數(shù)組進(jìn)行全新復(fù)制來實(shí)現(xiàn)的。

操作系統(tǒng)中的并發(fā)工具

講到并發(fā)容器,就不得不提操作系統(tǒng)級(jí)別實(shí)現(xiàn)了哪些進(jìn)程/線程間的并發(fā)容器,說白了其實(shí)就是數(shù)據(jù)結(jié)構(gòu)的設(shè)計(jì)。下面我們就來一起看一下操作系統(tǒng)級(jí)別的并發(fā)工具

信號(hào)量

信號(hào)量是 E.W.Dijkstra 在 1965 年提出的一種方法,它使用一個(gè)整形變量來累計(jì)喚醒次數(shù),以供之后使用。在他的觀點(diǎn)中,有一個(gè)新的變量類型稱作 信號(hào)量(semaphore)。一個(gè)信號(hào)量的取值可以是 0 ,或任意正數(shù)。0 表示的是不需要任何喚醒,任意的正數(shù)表示的就是喚醒次數(shù)。

Dijkstra 提出了信號(hào)量有兩個(gè)操作,現(xiàn)在通常使用 down 和 up(分別可以用 sleep 和 wakeup 來表示)。down 這個(gè)指令的操作會(huì)檢查值是否大于 0 。如果大于 0 ,則將其值減 1 ;若該值為 0 ,則進(jìn)程將睡眠,而且此時(shí) down 操作將會(huì)繼續(xù)執(zhí)行。檢查數(shù)值、修改變量值以及可能發(fā)生的睡眠操作均為一個(gè)單一的、不可分割的 原子操作(atomic action) 完成。

互斥量

如果不需要信號(hào)量的計(jì)數(shù)能力時(shí),可以使用信號(hào)量的一個(gè)簡單版本,稱為 mutex(互斥量)?;コ饬康膬?yōu)勢就在于在一些共享資源和一段代碼中保持互斥。由于互斥的實(shí)現(xiàn)既簡單又有效,這使得互斥量在實(shí)現(xiàn)用戶空間線程包時(shí)非常有用。

互斥量是一個(gè)處于兩種狀態(tài)之一的共享變量:解鎖(unlocked) 和 加鎖(locked)。這樣,只需要一個(gè)二進(jìn)制位來表示它,不過一般情況下,通常會(huì)用一個(gè) 整型(integer) 來表示。0 表示解鎖,其他所有的值表示加鎖,比 1 大的值表示加鎖的次數(shù)。

mutex 使用兩個(gè)過程,當(dāng)一個(gè)線程(或者進(jìn)程)需要訪問關(guān)鍵區(qū)域時(shí),會(huì)調(diào)用 mutex_lock 進(jìn)行加鎖。如果互斥鎖當(dāng)前處于解鎖狀態(tài)(表示關(guān)鍵區(qū)域可用),則調(diào)用成功,并且調(diào)用線程可以自由進(jìn)入關(guān)鍵區(qū)域。

另一方面,如果 mutex 互斥量已經(jīng)鎖定的話,調(diào)用線程會(huì)阻塞直到關(guān)鍵區(qū)域內(nèi)的線程執(zhí)行完畢并且調(diào)用了 mutex_unlock 。如果多個(gè)線程在 mutex 互斥量上阻塞,將隨機(jī)選擇一個(gè)線程并允許它獲得鎖。

 

Futexes

隨著并行的增加,有效的同步(synchronization)和鎖定(locking) 對(duì)于性能來說是非常重要的。如果進(jìn)程等待時(shí)間很短,那么自旋鎖(Spin lock) 是非常有效;但是如果等待時(shí)間比較長,那么這會(huì)浪費(fèi) CPU 周期。如果進(jìn)程很多,那么阻塞此進(jìn)程,并僅當(dāng)鎖被釋放的時(shí)候讓內(nèi)核解除阻塞是更有效的方式。不幸的是,這種方式也會(huì)導(dǎo)致另外的問題:它可以在進(jìn)程競爭頻繁的時(shí)候運(yùn)行良好,但是在競爭不是很激烈的情況下內(nèi)核切換的消耗會(huì)非常大,而且更困難的是,預(yù)測鎖的競爭數(shù)量更不容易。

有一種有趣的解決方案是把兩者的優(yōu)點(diǎn)結(jié)合起來,提出一種新的思想,稱為 futex,或者是快速用戶空間互斥(fast user space mutex),是不是聽起來很有意思?

 

futex 是 Linux 中的特性實(shí)現(xiàn)了基本的鎖定(很像是互斥鎖)而且避免了陷入內(nèi)核中,因?yàn)閮?nèi)核的切換的開銷非常大,這樣做可以大大提高性能。futex 由兩部分組成:內(nèi)核服務(wù)和用戶庫。內(nèi)核服務(wù)提供了了一個(gè) 等待隊(duì)列(wait queue) 允許多個(gè)進(jìn)程在鎖上排隊(duì)等待。除非內(nèi)核明確的對(duì)他們解除阻塞,否則它們不會(huì)運(yùn)行。

 

Pthreads 中的互斥量

Pthreads 提供了一些功能用來同步線程。最基本的機(jī)制是使用互斥量變量,可以鎖定和解鎖,用來保護(hù)每個(gè)關(guān)鍵區(qū)域。希望進(jìn)入關(guān)鍵區(qū)域的線程首先要嘗試獲取 mutex。如果 mutex 沒有加鎖,線程能夠馬上進(jìn)入并且互斥量能夠自動(dòng)鎖定,從而阻止其他線程進(jìn)入。如果 mutex 已經(jīng)加鎖,調(diào)用線程會(huì)阻塞,直到 mutex 解鎖。如果多個(gè)線程在相同的互斥量上等待,當(dāng)互斥量解鎖時(shí),只有一個(gè)線程能夠進(jìn)入并且重新加鎖。這些鎖并不是必須的,程序員需要正確使用它們。

下面是與互斥量有關(guān)的函數(shù)調(diào)用

 

和我們想象中的一樣,mutex 能夠被創(chuàng)建和銷毀,扮演這兩個(gè)角色的分別是Phread_mutex_init 和 Pthread_mutex_destroy。mutex 也可以通過 Pthread_mutex_lock 來進(jìn)行加鎖,如果互斥量已經(jīng)加鎖,則會(huì)阻塞調(diào)用者。還有一個(gè)調(diào)用Pthread_mutex_trylock 用來嘗試對(duì)線程加鎖,當(dāng) mutex 已經(jīng)被加鎖時(shí),會(huì)返回一個(gè)錯(cuò)誤代碼而不是阻塞調(diào)用者。這個(gè)調(diào)用允許線程有效的進(jìn)行忙等。最后,Pthread_mutex_unlock 會(huì)對(duì) mutex 解鎖并且釋放一個(gè)正在等待的線程。

除了互斥量以外,Pthreads 還提供了第二種同步機(jī)制:條件變量(condition variables) 。mutex 可以很好的允許或阻止對(duì)關(guān)鍵區(qū)域的訪問。條件變量允許線程由于未滿足某些條件而阻塞。絕大多數(shù)情況下這兩種方法是一起使用的。下面我們進(jìn)一步來研究線程、互斥量、條件變量之間的關(guān)聯(lián)。

下面再來重新認(rèn)識(shí)一下生產(chǎn)者和消費(fèi)者問題:一個(gè)線程將東西放在一個(gè)緩沖區(qū)內(nèi),由另一個(gè)線程將它們?nèi)〕觥H绻a(chǎn)者發(fā)現(xiàn)緩沖區(qū)沒有空槽可以使用了,生產(chǎn)者線程會(huì)阻塞起來直到有一個(gè)線程可以使用。生產(chǎn)者使用 mutex 來進(jìn)行原子性檢查從而不受其他線程干擾。但是當(dāng)發(fā)現(xiàn)緩沖區(qū)已經(jīng)滿了以后,生產(chǎn)者需要一種方法來阻塞自己并在以后被喚醒。這便是條件變量做的工作。

下面是一些與條件變量有關(guān)的最重要的 pthread 調(diào)用

 

上表中給出了一些調(diào)用用來創(chuàng)建和銷毀條件變量。條件變量上的主要屬性是Pthread_cond_wait 和 Pthread_cond_signal。前者阻塞調(diào)用線程,直到其他線程發(fā)出信號(hào)為止(使用后者調(diào)用)。阻塞的線程通常需要等待喚醒的信號(hào)以此來釋放資源或者執(zhí)行某些其他活動(dòng)。只有這樣阻塞的線程才能繼續(xù)工作。條件變量允許等待與阻塞原子性的進(jìn)程。Pthread_cond_broadcast 用來喚醒多個(gè)阻塞的、需要等待信號(hào)喚醒的線程。

“需要注意的是,條件變量(不像是信號(hào)量)不會(huì)存在于內(nèi)存中。如果將一個(gè)信號(hào)量傳遞給一個(gè)沒有線程等待的條件變量,那么這個(gè)信號(hào)就會(huì)丟失,這個(gè)需要注意

管程

為了能夠編寫更加準(zhǔn)確無誤的程序,Brinch Hansen 和 Hoare 提出了一個(gè)更高級(jí)的同步原語叫做 管程(monitor)。管程有一個(gè)很重要的特性,即在任何時(shí)候管程中只能有一個(gè)活躍的進(jìn)程,這一特性使管程能夠很方便的實(shí)現(xiàn)互斥操作。管程是編程語言的特性,所以編譯器知道它們的特殊性,因此可以采用與其他過程調(diào)用不同的方法來處理對(duì)管程的調(diào)用。通常情況下,當(dāng)進(jìn)程調(diào)用管程中的程序時(shí),該程序的前幾條指令會(huì)檢查管程中是否有其他活躍的進(jìn)程。如果有的話,調(diào)用進(jìn)程將被掛起,直到另一個(gè)進(jìn)程離開管程才將其喚醒。如果沒有活躍進(jìn)程在使用管程,那么該調(diào)用進(jìn)程才可以進(jìn)入。

進(jìn)入管程中的互斥由編譯器負(fù)責(zé),但是一種通用做法是使用 互斥量(mutex) 和 二進(jìn)制信號(hào)量(binary semaphore)。由于編譯器而不是程序員在操作,因此出錯(cuò)的幾率會(huì)大大降低。在任何時(shí)候,編寫管程的程序員都無需關(guān)心編譯器是如何處理的。他只需要知道將所有的臨界區(qū)轉(zhuǎn)換成為管程過程即可。絕不會(huì)有兩個(gè)進(jìn)程同時(shí)執(zhí)行臨界區(qū)中的代碼。

即使管程提供了一種簡單的方式來實(shí)現(xiàn)互斥,但在我們看來,這還不夠。因?yàn)槲覀冞€需要一種在進(jìn)程無法執(zhí)行被阻塞。在生產(chǎn)者-消費(fèi)者問題中,很容易將針對(duì)緩沖區(qū)滿和緩沖區(qū)空的測試放在管程程序中,但是生產(chǎn)者在發(fā)現(xiàn)緩沖區(qū)滿的時(shí)候該如何阻塞呢?

解決的辦法是引入條件變量(condition variables) 以及相關(guān)的兩個(gè)操作 wait 和 signal。當(dāng)一個(gè)管程程序發(fā)現(xiàn)它不能運(yùn)行時(shí)(例如,生產(chǎn)者發(fā)現(xiàn)緩沖區(qū)已滿),它會(huì)在某個(gè)條件變量(如 full)上執(zhí)行 wait 操作。這個(gè)操作造成調(diào)用進(jìn)程阻塞,并且還將另一個(gè)以前等在管程之外的進(jìn)程調(diào)入管程。在前面的 pthread 中我們已經(jīng)探討過條件變量的實(shí)現(xiàn)細(xì)節(jié)了。另一個(gè)進(jìn)程,比如消費(fèi)者可以通過執(zhí)行 signal 來喚醒阻塞的調(diào)用進(jìn)程。

通過臨界區(qū)自動(dòng)的互斥,管程比信號(hào)量更容易保證并行編程的正確性。但是管程也有缺點(diǎn),我們前面說到過管程是一個(gè)編程語言的概念,編譯器必須要識(shí)別管程并用某種方式對(duì)其互斥作出保證。C、Pascal 以及大多數(shù)其他編程語言都沒有管程,所以不能依靠編譯器來遵守互斥規(guī)則。

與管程和信號(hào)量有關(guān)的另一個(gè)問題是,這些機(jī)制都是設(shè)計(jì)用來解決訪問共享內(nèi)存的一個(gè)或多個(gè) CPU 上的互斥問題的。通過將信號(hào)量放在共享內(nèi)存中并用 TSL 或 XCHG 指令來保護(hù)它們,可以避免競爭。但是如果是在分布式系統(tǒng)中,可能同時(shí)具有多個(gè) CPU 的情況,并且每個(gè) CPU 都有自己的私有內(nèi)存呢,它們通過網(wǎng)絡(luò)相連,那么這些原語將會(huì)失效。因?yàn)樾盘?hào)量太低級(jí)了,而管程在少數(shù)幾種編程語言之外無法使用,所以還需要其他方法。

消息傳遞

上面提到的其他方法就是 消息傳遞(messaage passing)。這種進(jìn)程間通信的方法使用兩個(gè)原語send 和 receive ,它們像信號(hào)量而不像管程,是系統(tǒng)調(diào)用而不是語言級(jí)別。示例如下

  1. send(destination, &message); 
  2.  
  3. receive(source, &message); 

send 方法用于向一個(gè)給定的目標(biāo)發(fā)送一條消息,receive 從一個(gè)給定的源接收一條消息。如果沒有消息,接受者可能被阻塞,直到接收一條消息或者帶著錯(cuò)誤碼返回。

消息傳遞系統(tǒng)現(xiàn)在面臨著許多信號(hào)量和管程所未涉及的問題和設(shè)計(jì)難點(diǎn),尤其對(duì)那些在網(wǎng)絡(luò)中不同機(jī)器上的通信狀況。例如,消息有可能被網(wǎng)絡(luò)丟失。為了防止消息丟失,發(fā)送方和接收方可以達(dá)成一致:一旦接受到消息后,接收方馬上回送一條特殊的 確認(rèn)(acknowledgement) 消息。如果發(fā)送方在一段時(shí)間間隔內(nèi)未收到確認(rèn),則重發(fā)消息。

現(xiàn)在考慮消息本身被正確接收,而返回給發(fā)送著的確認(rèn)消息丟失的情況。發(fā)送者將重發(fā)消息,這樣接受者將收到兩次相同的消息。

 

對(duì)于接收者來說,如何區(qū)分新的消息和一條重發(fā)的老消息是非常重要的。通常采用在每條原始消息中嵌入一個(gè)連續(xù)的序號(hào)來解決此問題。如果接受者收到一條消息,它具有與前面某一條消息一樣的序號(hào),就知道這條消息是重復(fù)的,可以忽略。

消息系統(tǒng)還必須處理如何命名進(jìn)程的問題,以便在發(fā)送或接收調(diào)用中清晰的指明進(jìn)程。身份驗(yàn)證(authentication) 也是一個(gè)問題,比如客戶端怎么知道它是在與一個(gè)真正的文件服務(wù)器通信,從發(fā)送方到接收方的信息有可能被中間人所篡改。

屏障

最后一個(gè)同步機(jī)制是準(zhǔn)備用于進(jìn)程組而不是進(jìn)程間的生產(chǎn)者-消費(fèi)者情況的。在某些應(yīng)用中劃分了若干階段,并且規(guī)定,除非所有的進(jìn)程都就緒準(zhǔn)備著手下一個(gè)階段,否則任何進(jìn)程都不能進(jìn)入下一個(gè)階段,可以通過在每個(gè)階段的結(jié)尾安裝一個(gè) 屏障(barrier) 來實(shí)現(xiàn)這種行為。當(dāng)一個(gè)進(jìn)程到達(dá)屏障時(shí),它會(huì)被屏障所攔截,直到所有的屏障都到達(dá)為止。屏障可用于一組進(jìn)程同步,如下圖所示

 

在上圖中我們可以看到,有四個(gè)進(jìn)程接近屏障,這意味著每個(gè)進(jìn)程都在進(jìn)行運(yùn)算,但是還沒有到達(dá)每個(gè)階段的結(jié)尾。過了一段時(shí)間后,A、B、D 三個(gè)進(jìn)程都到達(dá)了屏障,各自的進(jìn)程被掛起,但此時(shí)還不能進(jìn)入下一個(gè)階段呢,因?yàn)檫M(jìn)程 B 還沒有執(zhí)行完畢。結(jié)果,當(dāng)最后一個(gè) C 到達(dá)屏障后,這個(gè)進(jìn)程組才能夠進(jìn)入下一個(gè)階段。

避免鎖:讀-復(fù)制-更新

最快的鎖是根本沒有鎖。問題在于沒有鎖的情況下,我們是否允許對(duì)共享數(shù)據(jù)結(jié)構(gòu)的并發(fā)讀寫進(jìn)行訪問。答案當(dāng)然是不可以。假設(shè)進(jìn)程 A 正在對(duì)一個(gè)數(shù)字?jǐn)?shù)組進(jìn)行排序,而進(jìn)程 B 正在計(jì)算其平均值,而此時(shí)你進(jìn)行 A 的移動(dòng),會(huì)導(dǎo)致 B 會(huì)多次讀到重復(fù)值,而某些值根本沒有遇到過。

然而,在某些情況下,我們可以允許寫操作來更新數(shù)據(jù)結(jié)構(gòu),即便還有其他的進(jìn)程正在使用。竅門在于確保每個(gè)讀操作要么讀取舊的版本,要么讀取新的版本,例如下面的樹

 

上面的樹中,讀操作從根部到葉子遍歷整個(gè)樹。加入一個(gè)新節(jié)點(diǎn) X 后,為了實(shí)現(xiàn)這一操作,我們要讓這個(gè)節(jié)點(diǎn)在樹中可見之前使它"恰好正確":我們對(duì)節(jié)點(diǎn) X 中的所有值進(jìn)行初始化,包括它的子節(jié)點(diǎn)指針。然后通過原子寫操作,使 X 稱為 A 的子節(jié)點(diǎn)。所有的讀操作都不會(huì)讀到前后不一致的版本

 

在上面的圖中,我們接著移除 B 和 D。首先,將 A 的左子節(jié)點(diǎn)指針指向 C 。所有原本在 A 中的讀操作將會(huì)后續(xù)讀到節(jié)點(diǎn) C ,而永遠(yuǎn)不會(huì)讀到 B 和 D。也就是說,它們將只會(huì)讀取到新版數(shù)據(jù)。同樣,所有當(dāng)前在 B 和 D 中的讀操作將繼續(xù)按照原始的數(shù)據(jù)結(jié)構(gòu)指針并且讀取舊版數(shù)據(jù)。所有操作均能正確運(yùn)行,我們不需要鎖住任何東西。而不需要鎖住數(shù)據(jù)就能夠移除 B 和 D 的主要原因就是 讀-復(fù)制-更新(Ready-Copy-Update,RCU),將更新過程中的移除和再分配過程分離開。

Java 并發(fā)工具包JDK 1.5 提供了許多種并發(fā)容器來改進(jìn)同步容器的性能,同步容器將所有對(duì)容器狀態(tài)的訪問都串行化,以實(shí)現(xiàn)他們之間的線程安全性。這種方法的代價(jià)是嚴(yán)重降低了并發(fā)性能,當(dāng)多個(gè)線程爭搶容器鎖的同時(shí),嚴(yán)重降低吞吐量。

下面我們就來一起認(rèn)識(shí)一下 Java 中都用了哪些并發(fā)工具

Java 并發(fā)工具綜述

在 Java 5.0 中新增加了 ConcurrentHashMap 用來替代基于散列的 Map 容器;新增加了 CopyOnWriteArrayList 和 CopyOnWriteArraySet 來分別替代 ArrayList 和 Set 接口實(shí)現(xiàn)類;還新增加了兩種容器類型,分別是 Queue 和 BlockingQueue, Queue 是隊(duì)列的意思,它有一些實(shí)現(xiàn)分別是傳統(tǒng)的先進(jìn)先出隊(duì)列 ConcurrentLinkedQueue以及并發(fā)優(yōu)先級(jí)隊(duì)列 PriorityQueue。Queue 是一個(gè)先入先出的隊(duì)列,它的操作不會(huì)阻塞,如果隊(duì)列為空那么獲取元素的操作會(huì)返回空值。PriorityQueue 擴(kuò)展了 Queue,增加了可阻塞的插入和獲取等操作。如果隊(duì)列為空,那么獲取元素的操作將一直阻塞,直到隊(duì)列中出現(xiàn)一個(gè)可用的元素為止。如果隊(duì)列已滿,那么插入操作則一直阻塞,直到隊(duì)列中有可用的空間為止。

Java 6.0 還引入了 ConcurrentSkipListMap 和 ConcurrentSkipListSet 分別作為同步的 SortedMap 和 SortedSet 的并發(fā)替代品。下面我們就展開探討了,設(shè)計(jì)不到底層源碼,因?yàn)楸酒恼轮饕康木褪菫榱嗣枋鲆幌掠心男〇|西以及用了哪些東西。

ConcurrentHashMap

我們先來看一下 ConcurrentHashMap 在并發(fā)集合中的位置

 

可以看到,ConcurrentHashMap 繼承了 AbstractMap 接口并實(shí)現(xiàn)了 ConcurrentMap 和 Serializable 接口,AbstractMap 和 ConcurrentMap 都是 Map 的實(shí)現(xiàn)類,只不過 AbstractMap 是抽象實(shí)現(xiàn)。

ConcurrentHashMap 和 Hashtable 的構(gòu)造非常相似,只不過 Hashtable 容器在激烈競爭的場景中會(huì)表現(xiàn)出效率低下的現(xiàn)象,這是因?yàn)樗性L問 Hashtable 的線程都想獲取同一把鎖,如果容器里面有多把鎖,并且每一把鎖都只用來鎖定一段數(shù)據(jù),那么當(dāng)多個(gè)線程訪問不同的數(shù)據(jù)段時(shí),就不存在競爭關(guān)系。這就是 ConcurreentHashMap 采用的 分段鎖 實(shí)現(xiàn)。在這種鎖實(shí)現(xiàn)中,任意數(shù)量的讀取線程可以并發(fā)的訪問 Map,執(zhí)行讀取操作的線程和執(zhí)行寫入的線程可以并發(fā)的訪問 Map,并且在讀取的同時(shí)也可以并發(fā)修改 Map。

 

ConcurrentHashMap 分段鎖實(shí)現(xiàn)帶來的結(jié)果是,在并發(fā)環(huán)境下可以實(shí)現(xiàn)更高的吞吐量,在單線程環(huán)境下只損失非常小的性能。

你知道 HashMap 是具有 fail-fast 機(jī)制的,也就是說它是一種強(qiáng)一致性的集合,在數(shù)據(jù)不一致的情況下會(huì)拋出 ConcurrentModificationException 異常,而 ConcurrentHashMap 是一種 弱一致性 的集合,在并發(fā)修改其內(nèi)部結(jié)構(gòu)時(shí),它不會(huì)拋出 ConcurrentModificationException 異常,弱一致性能夠容忍并發(fā)修改。

在 HashMap 中,我們一般使用的 size、empty、containsKey 等方法都是標(biāo)準(zhǔn)方法,其返回的結(jié)果是一定的,包含就是包含,不包含就是不包含,可以作為判斷條件;而 ConcurrentHashMap 中的這些方法只是參考方法,它不是一個(gè) 精確值,像是 size、empty 這些方法在并發(fā)場景下用處很小,因?yàn)樗麄兊姆祷刂悼偸窃诓粩嘧兓赃@些操作的需求就被弱化了。

 

在 ConcurrentHashMap 中沒有實(shí)現(xiàn)對(duì) Map 加鎖從而實(shí)現(xiàn)獨(dú)占訪問。在線程安全的 Map 實(shí)現(xiàn) Hashtable 和 Collections.synchronizedMap 中都實(shí)現(xiàn)了獨(dú)占訪問,因此只能單個(gè)線程修改 Map 。ConcurrentHashMap 與這些 Map 容器相比,具有更多的優(yōu)勢和更少的劣勢,只有當(dāng)需要獨(dú)占訪問的需求時(shí)才會(huì)使用 Hashtable 或者是 Collections.synchronizedMap ,否則其他并發(fā)場景下,應(yīng)該使用 ConcurrentHashMap。

ConcurrentMap

ConcurrentMap 是一個(gè)接口,它繼承了 Map 接口并提供了 Map 接口中四個(gè)新的方法,這四個(gè)方法都是 原子性 方法,進(jìn)一步擴(kuò)展了 Map 的功能。

  1. public interface ConcurrentMap<K, V> extends Map<K, V> { 
  2.   
  3.   // 僅當(dāng) key 沒有相應(yīng)的映射值時(shí)才插入 
  4.   V putIfAbsent(K key, V value); 
  5.    
  6.   // 僅當(dāng) key 被映射到 value 時(shí)才移除 
  7.   boolean remove(Object key, Object value); 
  8.    
  9.   // 僅當(dāng) key 被映射到 value 時(shí)才移除 
  10.   V replace(K key, V value); 
  11.    
  12.   // 僅當(dāng) key 被映射到 oldValue 時(shí)才替換為 newValue 
  13.   boolean replace(K key, V oldValue, V newValue); 
  14.    

ConcurrentNavigableMap

java.util.concurrent.ConcurrentNavigableMap 類是 java.util.NavigableMap 的子類,它支持并發(fā)訪問,并且允許其視圖的并發(fā)訪問。

 

什么是視圖呢?視圖就是集合中的一段數(shù)據(jù)序列,ConcurrentNavigableMap 中支持使用 headMap、subMap、tailMap 返回的視圖。與其重新解釋一下 NavigableMap 中找到的所有方法,不如看一下 ConcurrentNavigableMap 中添加的方法

  • headMap 方法:headMap 方法返回一個(gè)嚴(yán)格小于給定鍵的視圖
  • tailMap 方法:tailMap 方法返回包含大于或等于給定鍵的視圖。
  • subMap 方法:subMap 方法返回給定兩個(gè)參數(shù)的視圖

ConcurrentNavigableMap 接口包含一些可能有用的其他方法

  • descendingKeySet()
  • descendingMap()
  • navigableKeySet()

更多關(guān)于方法的描述這里就不再贅述了,讀者朋友們可自行查閱 javadoc

ConcurrentSkipListMap

ConcurrentSkipListMap 是線程安全的有序的哈希表,適用于高并發(fā)的場景。

 

ConcurrentSkipListMap 的底層數(shù)據(jù)結(jié)構(gòu)是基于跳表實(shí)現(xiàn)的。ConcurrentSkipListMap 可以提供 Comparable 內(nèi)部排序或者是 Comparator 外部排序,具體取決于使用哪個(gè)構(gòu)造函數(shù)。

ConcurrentSkipListSet

ConcurrentSkipListSet 是線程安全的有序的集合,適用于高并發(fā)的場景。ConcurrentSkipListSet 底層是通過 ConcurrentNavigableMap 來實(shí)現(xiàn)的,它是一個(gè)有序的線程安全的集合。

ConcurrentSkipListSet有序的,基于元素的自然排序或者通過比較器確定的順序;

ConcurrentSkipListSet是線程安全的;

CopyOnWriteArrayList

CopyOnWriteArrayList 是 ArrayList 的變體,在 CopyOnWriteArrayList 中,所有可變操作比如 add、set 其實(shí)都是重新創(chuàng)建了一個(gè)副本,通過對(duì)數(shù)組進(jìn)行復(fù)制而實(shí)現(xiàn)的。

 

CopyOnWriteArrayList 其內(nèi)部有一個(gè)指向數(shù)組的引用,數(shù)組是不會(huì)被修改的,每次并發(fā)修改 CopyOnWriteArrayList 都相當(dāng)于重新創(chuàng)建副本,CopyOnWriteArrayList 是一種 fail-safe 機(jī)制的,它不會(huì)拋出 ConcurrentModificationException 異常,并且返回元素與迭代器創(chuàng)建時(shí)的元素相同。

每次并發(fā)寫操作都會(huì)創(chuàng)建新的副本,這個(gè)過程存在一定的開銷,所以,一般在規(guī)模很大時(shí),讀操作要遠(yuǎn)遠(yuǎn)多于寫操作時(shí),為了保證線程安全性,會(huì)使用 CopyOnWriteArrayList。

類似的,CopyOnWriteArraySet 的作用也相當(dāng)于替代了 Set 接口。

 

BlockingQueue

BlockingQueue 譯為 阻塞隊(duì)列,它是 JDK 1.5 添加的新的工具類,它繼承于 Queue 隊(duì)列,并擴(kuò)展了 Queue 的功能。

 

BlockingQueue 在檢索元素時(shí)會(huì)等待隊(duì)列變成非空,并在存儲(chǔ)元素時(shí)會(huì)等待隊(duì)列變?yōu)榭捎谩lockingQueue 的方法有四種實(shí)現(xiàn)形式,以不同的方式來處理。

  • 第一種是拋出異常
  • 特殊值:第二種是根據(jù)情況會(huì)返回 null 或者 false
  • 阻塞:第三種是無限期的阻塞當(dāng)前線程直到操作變?yōu)榭捎煤?/li>
  • 超時(shí):第四種是給定一個(gè)最大的超時(shí)時(shí)間,超過后才會(huì)放棄

BlockingQueue 不允許添加 null 元素,在其實(shí)現(xiàn)類的方法 add、put 或者 offer 后時(shí)添加 null 會(huì)拋出空指針異常。BlockingQueue 會(huì)有容量限制。在任意時(shí)間內(nèi),它都會(huì)有一個(gè) remainCapacity,超過該值之前,任意 put 元素都會(huì)阻塞。

BlockingQueue 一般用于實(shí)現(xiàn)生產(chǎn)者 - 消費(fèi)者 隊(duì)列,如下圖所示

 

BlockingQueue 有多種實(shí)現(xiàn),下面我們一起來認(rèn)識(shí)一下這些容器。

其中 LinkedBlockingQueue 和 ArrayBlockingQueue 是 FIFO 先入先出隊(duì)列,二者分別和 LinkedList和 ArrayList 對(duì)應(yīng),比同步 List 具有更好的并發(fā)性能。PriorityBlockingQueue 是一個(gè)優(yōu)先級(jí)排序的阻塞隊(duì)列,如果你希望按照某種順序而不是 FIFO 處理元素時(shí)這個(gè)隊(duì)列將非常有用。正如其他有序的容器一樣,PriorityBlockingQueue 既可以按照自然順序來比較元素,也可以使用 Comparator 比較器進(jìn)行外部元素比較。SynchronousQueue 它維護(hù)的是一組線程而不是一組隊(duì)列,實(shí)際上它不是一個(gè)隊(duì)列,它的每個(gè) insert 操作必須等待其他相關(guān)線程的 remove 方法后才能執(zhí)行,反之亦然。

LinkedBlockingQueue

LinkedBlockingQueue 是一種 BlockingQueue 的實(shí)現(xiàn)。

 

它是一種基于鏈表的構(gòu)造、先入先出的有界阻塞隊(duì)列。隊(duì)列的 head 也就是頭元素是在隊(duì)列中等待時(shí)間最長的元素;隊(duì)列的 tail也就是隊(duì)尾元素是隊(duì)列中等待時(shí)間最短的元素。新的元素會(huì)被插入到隊(duì)尾中,檢索操作將獲取隊(duì)列中的頭部元素。鏈表隊(duì)列通常比基于數(shù)組的隊(duì)列具有更高的吞吐量,但是在大多數(shù)并發(fā)應(yīng)用程序中,可預(yù)測的性能較差。

ArrayBlockingQueue

ArrayBlockingQueue 是一個(gè)用數(shù)組實(shí)現(xiàn)的有界隊(duì)列,此隊(duì)列順序按照先入先出的原則對(duì)元素進(jìn)行排序。

默認(rèn)情況下不保證線程公平的訪問隊(duì)列,所謂公平訪問隊(duì)列指的是阻塞的線程,可以按照阻塞的先后順序訪問,即先阻塞線程先訪問隊(duì)列。非公平性是對(duì)先等待的線程是非公平的。有可能先阻塞的線程最后才訪問隊(duì)列。

PriorityBlockingQueue

PriorityBlockingQueue 是一個(gè)支持優(yōu)先級(jí)的阻塞隊(duì)列,默認(rèn)情況下的元素采取自然順序生序或者降序,也可以自己定義 Comparator 進(jìn)行外部排序。但需要注意的是不能保證同優(yōu)先級(jí)元素的順序。

DelayQueue

DelayQueue 是一個(gè)支持延時(shí)獲取元素的無阻塞隊(duì)列,其中的元素只能在延遲到期后才能使用,DelayQueue 中的隊(duì)列頭是延遲最長時(shí)間的元素,如果沒有延遲,則沒有 head 頭元素,poll 方法會(huì)返回 null。判斷的依據(jù)就是 getDelay(TimeUnit.NANOSECONDS) 方法返回一個(gè)值小于或者等于 0 就會(huì)發(fā)生過期。

TransferQueue

TransferQueue 繼承于 BlockingQueue,它是一個(gè)接口,一個(gè) BlockingQueue 是一個(gè)生產(chǎn)者可能等待消費(fèi)者接受元素,TransferQueue 則更進(jìn)一步,生產(chǎn)者會(huì)一直阻塞直到所添加到隊(duì)列的元素被某一個(gè)消費(fèi)者所消費(fèi),新添加的transfer 方法用來實(shí)現(xiàn)這種約束。

TransferQueue 有下面這些方法:兩個(gè) tryTransfer 方法,一個(gè)是非阻塞的,另一個(gè)是帶有 timeout 參數(shù)設(shè)置超時(shí)時(shí)間的。還有兩個(gè)輔助方法 hasWaitingConsumer 和 getWaitingConcusmerCount。

LinkedTransferQueue

一個(gè)無界的基于鏈表的 TransferQueue。這個(gè)隊(duì)列對(duì)任何給定的生產(chǎn)者進(jìn)行 FIFO 排序,head 是隊(duì)列中存在時(shí)間最長的元素。tail 是隊(duì)列中存在時(shí)間最短的元素。

BlockingDeque

與 BlockingQueue 相對(duì)的還有 BlockingDeque 和 Deque,它們是 JDK1.6 被提出的,分別對(duì) Queue 和 BlockingQueue 做了擴(kuò)展。

 

Deque 是一個(gè)雙端隊(duì)列,分別實(shí)現(xiàn)了在隊(duì)列頭和隊(duì)列尾的插入。Deque 的實(shí)現(xiàn)有 ArrayDeque、ConcurrentLinkedDeque,BlockingDeque 的實(shí)現(xiàn)有 LinkedBlockingDeque 。

阻塞模式一般用于生產(chǎn)者 - 消費(fèi)者隊(duì)列,而雙端隊(duì)列適用于工作密取。在工作密取的設(shè)計(jì)中,每個(gè)消費(fèi)者都有各自的雙端隊(duì)列,如果一個(gè)消費(fèi)者完成了自己雙端隊(duì)列的任務(wù),就會(huì)去其他雙端隊(duì)列的末尾進(jìn)行消費(fèi)。密取方式要比傳統(tǒng)的生產(chǎn)者 - 消費(fèi)者隊(duì)列具有更高的可伸縮性,這是因?yàn)槊總€(gè)工作密取的工作者都有自己的雙端隊(duì)列,不存在競爭的情況。

ArrayDeque

ArrayDeque 是 Deque 的可動(dòng)態(tài)調(diào)整大小的數(shù)組實(shí)現(xiàn),其內(nèi)部沒有容量限制,他們會(huì)根據(jù)需要進(jìn)行增長。ArrayDeque 不是線程安全的,如果沒有外部加鎖的情況下,不支持多線程訪問。ArrayDeque 禁止空元素,這個(gè)類作為棧使用時(shí)要比 Stack 快,作為 queue 使用時(shí)要比 LinkedList 快。

除了 remove、removeFirstOccurrence、removeLastOccurrence、contains、interator.remove 外,大部分的 ArrayDeque 都以恒定的開銷運(yùn)行。

“注意:ArrayDeque 是 fail-fast 的,如果創(chuàng)建了迭代器之后,卻使用了迭代器外部的 remove 等修改方法,那么這個(gè)類將會(huì)拋出 ConcurrentModificationException 異常。

ConcurrentLinkedDeque

ConcurrentLinkedDeque 是 JDK1.7 引入的雙向鏈表的無界并發(fā)隊(duì)列。它與 ConcurrentLinkedQueue 的區(qū)別是 ConcurrentLinkedDeque 同時(shí)支持 FIFO 和 FILO 兩種操作方式,即可以從隊(duì)列的頭和尾同時(shí)操作(插入/刪除)。ConcurrentLinkedDeque 也支持 happen-before 原則。ConcurrentLinkedDeque 不允許空元素。

LinkedBlockingDeque

LinkedBlockingDeque 是一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列,即可以從隊(duì)列的兩端插入和移除元素。雙向隊(duì)列因?yàn)槎嗔艘粋€(gè)操作隊(duì)列的入口,在多線程同時(shí)入隊(duì)時(shí),也就減少了一半的競爭。LinkedBlockingDeque 把初始容量和構(gòu)造函數(shù)綁定,這樣能夠有效過度拓展。初始容量如果沒有指定,就取的是 Integer.MAX_VALUE,這也是 LinkedBlockingDeque 的默認(rèn)構(gòu)造函數(shù)。

同步工具類

同步工具類可以是任何一個(gè)對(duì)象,只要它根據(jù)自身狀態(tài)來協(xié)調(diào)線程的控制流。阻塞隊(duì)列可以作為同步控制類,其他類型的同步工具類還包括 信號(hào)量(Semaphore)、柵欄(Barrier) 和 閉鎖(Latch)。下面我們就來一起認(rèn)識(shí)一下這些工具類

Semaphore

Semaphore 翻譯過來就是 信號(hào)量,信號(hào)量是什么?它其實(shí)就是一種信號(hào),在操作系統(tǒng)中,也有信號(hào)量的這個(gè)概念,在進(jìn)程間通信的時(shí)候,我們就會(huì)談到信號(hào)量進(jìn)行通信。還有在 Linux 操作系統(tǒng)采取中斷時(shí),也會(huì)向進(jìn)程發(fā)出中斷信號(hào),根據(jù)進(jìn)程的種類和信號(hào)的類型判斷是否應(yīng)該結(jié)束進(jìn)程。

在 Java 中,Semaphore(信號(hào)量)是用來控制同時(shí)訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個(gè)線程,以保證合理的使用公共資源。

Semaphore 管理著一組許可(permit),許可的初始數(shù)量由構(gòu)造函數(shù)來指定。在獲取某個(gè)資源之前,應(yīng)該先從信號(hào)量獲取許可(permit),以確保資源是否可用。當(dāng)線程完成對(duì)資源的操作后,會(huì)把它放在池中并向信號(hào)量返回一個(gè)許可,從而允許其他線程訪問資源,這叫做釋放許可。如果沒有許可的話,那么 acquire 將會(huì)阻塞直到有許可(中斷或者操作超時(shí))為止。release方法將返回一個(gè)許可信號(hào)量。

Semaphore 可以用來實(shí)現(xiàn)流量控制,例如常用的數(shù)據(jù)庫連接池,線程請(qǐng)求資源時(shí),如果數(shù)據(jù)庫連接池為空則阻塞線程,直接返回失敗,如果連接池不為空時(shí)解除阻塞。

CountDownLatch

閉鎖(Latch) 是一種同步工具類,它可以延遲線程的進(jìn)度以直到其到達(dá)終止?fàn)顟B(tài)。閉鎖的作用相當(dāng)于是一扇門,在閉鎖達(dá)到結(jié)束狀態(tài)前,門是一直關(guān)著的,沒有任何線程能夠通過。當(dāng)閉鎖到達(dá)結(jié)束狀態(tài)后,這扇門會(huì)打開并且允許任何線程通過,然后就一直保持打開狀態(tài)。

CountDownLatch 就是閉鎖的一種實(shí)現(xiàn)。它可以使一個(gè)或者多個(gè)線程等待一組事件的發(fā)生。閉鎖有一個(gè)計(jì)數(shù)器,閉鎖需要對(duì)計(jì)數(shù)器進(jìn)行初始化,表示需要等待的次數(shù),閉鎖在調(diào)用 await處進(jìn)行等待,其他線程在調(diào)用 countDown 把閉鎖 count 次數(shù)進(jìn)行遞減,直到遞減為 0 ,喚醒 await。如下代碼所示

  1. public class TCountDownLatch { 
  2.  
  3.     public static void main(String[] args) { 
  4.         CountDownLatch latch = new CountDownLatch(5); 
  5.         Increment increment = new Increment(latch); 
  6.         Decrement decrement = new Decrement(latch); 
  7.  
  8.         new Thread(increment).start(); 
  9.         new Thread(decrement).start(); 
  10.  
  11.         try { 
  12.             Thread.sleep(6000); 
  13.         } catch (InterruptedException e) { 
  14.             e.printStackTrace(); 
  15.         } 
  16.     } 
  17. class Decrement implements Runnable { 
  18.  
  19.     CountDownLatch countDownLatch; 
  20.  
  21.     public Decrement(CountDownLatch countDownLatch){ 
  22.         this.countDownLatch = countDownLatch; 
  23.     } 
  24.  
  25.     @Override 
  26.     public void run() { 
  27.         try { 
  28.  
  29.             for(long i = countDownLatch.getCount();i > 0;i--){ 
  30.                 Thread.sleep(1000); 
  31.                 System.out.println("countdown"); 
  32.                 this.countDownLatch.countDown(); 
  33.             } 
  34.  
  35.         } catch (InterruptedException e) { 
  36.             e.printStackTrace(); 
  37.         } 
  38.     } 
  39. class Increment implements Runnable { 
  40.  
  41.     CountDownLatch countDownLatch; 
  42.  
  43.     public Increment(CountDownLatch countDownLatch){ 
  44.         this.countDownLatch = countDownLatch; 
  45.     } 
  46.  
  47.     @Override 
  48.     public void run() { 
  49.         try { 
  50.             System.out.println("await"); 
  51.             countDownLatch.await(); 
  52.         } catch (InterruptedException e) { 
  53.             e.printStackTrace(); 
  54.         } 
  55.         System.out.println("Waiter Released"); 
  56.     } 

Future

我們常見的創(chuàng)建多線程的方式有兩種,一種是繼承 Thread 類,一種是實(shí)現(xiàn) Runnable 接口。這兩種方式都沒有返回值。相對(duì)的,創(chuàng)建多線程還有其他三種方式,那就是使用 Callable接口、 Future 接口和 FutureTask 類。Callable 我們之前聊過,這里就不再描述了,我們主要來描述一下 Future 和 FutureTask 接口。

 

Future 就是對(duì)具體的 Runnable 或者 Callable 任務(wù)的執(zhí)行結(jié)果進(jìn)行一系列的操作,必要時(shí)可通過 get 方法獲取執(zhí)行結(jié)果,這個(gè)方法會(huì)阻塞直到執(zhí)行結(jié)束。Future 中的主要方法有

  1. public interface Future<V> { 
  2.     boolean cancel(boolean mayInterruptIfRunning); 
  3.     boolean isCancelled(); 
  4.     boolean isDone(); 
  5.     V get() throws InterruptedException, ExecutionException; 
  6.     V get(long timeout, TimeUnit unit) 
  7.         throws InterruptedException, ExecutionException, TimeoutException; 
  • cancel(boolean mayInterruptIfRunning) : 嘗試取消任務(wù)的執(zhí)行。如果任務(wù)已經(jīng)完成、已經(jīng)被取消或者由于某些原因而無法取消,那么這個(gè)嘗試會(huì)失敗。如果取消成功,或者在調(diào)用 cancel 時(shí)此任務(wù)尚未開始,那么此任務(wù)永遠(yuǎn)不會(huì)執(zhí)行。如果任務(wù)已經(jīng)開始,那么 mayInterruptIfRunning 參數(shù)會(huì)確定是否中斷執(zhí)行任務(wù)以便于嘗試停止該任務(wù)。這個(gè)方法返回后,會(huì)對(duì) isDone 的后續(xù)調(diào)用也返回 true,如果 cancel 返回 true,那么后續(xù)的調(diào)用 isCancelled 也會(huì)返回 true。
  • boolean isCancelled():如果此任務(wù)在正常完成之前被取消,則返回 true。
  • boolean isDone():如果任務(wù)完成,返回 true。
  • V get() throws InterruptedException, ExecutionException:等待必要的計(jì)算完成,然后檢索其結(jié)果
  • V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException :必要時(shí)最多等待給定時(shí)間以完成計(jì)算,然后檢索其結(jié)果。

因?yàn)镕uture只是一個(gè)接口,所以是無法直接用來創(chuàng)建對(duì)象使用的,因此就有了下面的FutureTask。

FutureTask

FutureTask 實(shí)現(xiàn)了 RunnableFuture 接口,RunnableFuture 接口是什么呢?

RunnableFuture 接口又繼承了 Runnable 接口和 Future 接口。納尼?在 Java 中不是只允許單繼承么,是的,單繼承更多的是說的類與類之間的繼承關(guān)系,子類繼承父類,擴(kuò)展父類的接口,這個(gè)過程是單向的,就是為了解決多繼承引起的過渡引用問題。而接口之間的繼承是接口的擴(kuò)展,在 Java 編程思想中也印證了這一點(diǎn)

 

對(duì) RunnableFuture 接口的解釋是:成功執(zhí)行的 run 方法會(huì)使 Future 接口的完成并允許訪問其結(jié)果。所以它既可以作為 Runnable 被線程執(zhí)行,又可以作為 Future 得到 Callable 的返回值。

FutureTask 也可以用作閉鎖,它可以處于以下三種狀態(tài)

  • 等待運(yùn)行
  • 正在運(yùn)行
  • 運(yùn)行完成

FutureTask 在 Executor 框架中表示異步任務(wù),此外還可以表示一些時(shí)間較長的計(jì)算,這些計(jì)算可以在使用計(jì)算結(jié)果之前啟動(dòng)。

FutureTask 具體的源碼我后面會(huì)單獨(dú)出文章進(jìn)行描述。

Barrier

我們上面聊到了通過閉鎖來啟動(dòng)一組相關(guān)的操作,使用閉鎖來等待一組事件的執(zhí)行。閉鎖是一種一次性對(duì)象,一旦進(jìn)入終止?fàn)顟B(tài)后,就不能被 重置。

Barrier 的特點(diǎn)和閉鎖也很類似,它也是阻塞一組線程直到某個(gè)事件發(fā)生。柵欄與閉鎖的區(qū)別在于,所有線程必須同時(shí)到達(dá)柵欄的位置,才能繼續(xù)執(zhí)行,就像我們上面操作系統(tǒng)給出的這幅圖一樣。

 

ABCD 四條線程,必須同時(shí)到達(dá) Barrier,然后 手牽手一起走過幸福的殿堂。

當(dāng)線程到達(dá) Barrier 的位置時(shí)會(huì)調(diào)用 await 方法,這個(gè)方法會(huì)阻塞直到所有線程都到達(dá) Barrier 的位置,如果所有線程都到達(dá) Barrier 的位置,那么 Barrier 將會(huì)打開使所有線程都被釋放,而 Barrier 將被重置以等待下次使用。如果調(diào)用 await 方法導(dǎo)致超時(shí),或者 await 阻塞的線程被中斷,那么 Barrier 就被認(rèn)為被打破,所有阻塞的 await 都會(huì)拋出 BrokenBarrierException 。如果成功通過柵欄后,await 方法返回一個(gè)唯一索引號(hào),可以利用這些索引號(hào)選舉一個(gè)新的 leader,來處理一下其他工作。

  1. public class TCyclicBarrier { 
  2.  
  3.     public static void main(String[] args) { 
  4.  
  5.         Runnable runnable = () -> System.out.println("Barrier 1 開始..."); 
  6.  
  7.         Runnable runnable2 = () -> System.out.println("Barrier 2 開始..."); 
  8.  
  9.         CyclicBarrier barrier1 = new CyclicBarrier(2,runnable); 
  10.         CyclicBarrier barrier2 = new CyclicBarrier(2,runnable2); 
  11.  
  12.         CyclicBarrierRunnable b1 = new CyclicBarrierRunnable(barrier1,barrier2); 
  13.         CyclicBarrierRunnable b2 = new CyclicBarrierRunnable(barrier1,barrier2); 
  14.  
  15.         new Thread(b1).start(); 
  16.         new Thread(b2).start(); 
  17.     } 
  18.  
  19.  
  20. class CyclicBarrierRunnable implements Runnable { 
  21.  
  22.     CyclicBarrier barrier1; 
  23.     CyclicBarrier barrier2; 
  24.  
  25.     public CyclicBarrierRunnable(CyclicBarrier barrier1,CyclicBarrier barrier2){ 
  26.         this.barrier1 = barrier1; 
  27.         this.barrier2 = barrier2; 
  28.     } 
  29.  
  30.     @Override 
  31.     public void run() { 
  32.  
  33.         try { 
  34.             Thread.sleep(1000); 
  35.             System.out.println(Thread.currentThread().getName() + "等待 barrier1" ); 
  36.             barrier1.await(); 
  37.  
  38.             Thread.sleep(1000); 
  39.             System.out.println(Thread.currentThread().getName() + "等待 barrier2" ); 
  40.             barrier2.await(); 
  41.  
  42.         } catch (InterruptedException | BrokenBarrierException e) { 
  43.             e.printStackTrace(); 
  44.         } 
  45.  
  46.         System.out.println(Thread.currentThread().getName() + 
  47.                 " 做完了!"); 
  48.  
  49.     } 

Exchanger

與 Barrier 相關(guān)聯(lián)的還有一個(gè)工具類就是 Exchanger, Exchanger 是一個(gè)用于線程間協(xié)作的工具類。Exchanger用于進(jìn)行線程間的數(shù)據(jù)交換。

 

它提供一個(gè)同步點(diǎn),在這個(gè)同步點(diǎn)兩個(gè)線程可以交換彼此的數(shù)據(jù)。這兩個(gè)線程通過exchange 方法交換數(shù)據(jù), 如果第一個(gè)線程先執(zhí)行 exchange方法,它會(huì)一直等待第二個(gè)線程也執(zhí)行 exchange,當(dāng)兩個(gè)線程都到達(dá)同步點(diǎn)時(shí),這兩個(gè)線程就可以交換數(shù)據(jù),將本線程生產(chǎn)出來的數(shù)據(jù)傳遞給對(duì)方。因此使用Exchanger 的重點(diǎn)是成對(duì)的線程使用 exchange() 方法,當(dāng)有一對(duì)線程達(dá)到了同步點(diǎn),就會(huì)進(jìn)行交換數(shù)據(jù)。因此該工具類的線程對(duì)象是成對(duì)的。

下面通過一段例子代碼來講解一下

  1. public class TExchanger { 
  2.  
  3.     public static void main(String[] args) { 
  4.  
  5.         Exchanger exchanger = new Exchanger(); 
  6.  
  7.         ExchangerRunnable exchangerRunnable = new ExchangerRunnable(exchanger,"A"); 
  8.         ExchangerRunnable exchangerRunnable2 = new ExchangerRunnable(exchanger,"B"); 
  9.  
  10.         new Thread(exchangerRunnable).start(); 
  11.         new Thread(exchangerRunnable2).start(); 
  12.     } 
  13.  
  14.  
  15. class ExchangerRunnable implements Runnable { 
  16.  
  17.     Exchanger exchanger; 
  18.     Object object; 
  19.  
  20.     public ExchangerRunnable(Exchanger exchanger,Object object){ 
  21.         this.exchanger = exchanger; 
  22.         this.object = object; 
  23.     } 
  24.  
  25.  
  26.     @Override 
  27.     public void run() { 
  28.  
  29.         Object previous = object; 
  30.  
  31.         try { 
  32.             object = this.exchanger.exchange(object); 
  33.             System.out.println( 
  34.                     Thread.currentThread().getName() + "改變前是" + previous + "改變后是" + object); 
  35.         } catch (InterruptedException e) { 
  36.             e.printStackTrace(); 
  37.         } 
  38.  
  39.     } 

總結(jié)本篇文章我們從同步容器類入手,主要講了 fail-fast 和 fail-safe 機(jī)制,這兩個(gè)機(jī)制在并發(fā)編程中非常重要。然后我們從操作系統(tǒng)的角度,聊了聊操作系統(tǒng)層面實(shí)現(xiàn)安全性的幾種方式,然后從操作系統(tǒng) -> 并發(fā)我們聊了聊 Java 中的并發(fā)工具包有哪些,以及構(gòu)建并發(fā)的幾種工具類。

本文轉(zhuǎn)載自微信公眾號(hào)「 程序員cxuan」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系 程序員cxuan公眾號(hào)。

 

責(zé)任編輯:武曉燕 來源: 程序員cxuan
相關(guān)推薦

2012-07-31 09:19:02

程序員

2024-07-02 17:43:26

2013-07-17 09:13:19

2017-10-30 12:00:05

2018-01-03 10:34:20

創(chuàng)業(yè)公司事業(yè)

2020-03-13 09:38:45

離職創(chuàng)業(yè)感受

2022-05-27 12:20:18

微信智能手機(jī)

2018-09-17 09:46:41

華為狼性文化

2010-02-26 10:22:11

IPv4

2019-05-22 13:28:05

機(jī)器人人工智能系統(tǒng)

2020-04-22 15:00:03

iPhone蘋果劉海

2021-05-18 06:55:07

客戶端HTML容器

2020-09-14 15:30:23

開發(fā)技能代碼

2025-04-11 00:33:00

2018-10-20 10:30:30

WindowsWindows 10Windows 10.

2019-06-27 22:23:56

谷歌Android開發(fā)者

2022-02-16 09:55:39

Elastic 8開源索引

2023-04-23 13:30:19

ERP廠商華為

2019-06-03 10:53:49

MySQL硬盤數(shù)據(jù)庫

2016-01-08 10:32:48

虛擬現(xiàn)實(shí)頭盔
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)