自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

阿里架構(gòu)師教你JUC-Future與FutureTask原理詳解

開發(fā) 前端
Future 表示一個任務(wù)的生命周期,是一個可取消的異步運(yùn)算。提供了相應(yīng)的方法來判斷任務(wù)狀態(tài)(完成或取消),以及獲取任務(wù)的結(jié)果和取消任務(wù)等。適合具有可取消性和執(zhí)行時間較長的異步任務(wù)。

 

[[350087]]

1 Future

 

Future 表示一個任務(wù)的生命周期,是一個可取消的異步運(yùn)算。提供了相應(yīng)的方法來判斷任務(wù)狀態(tài)(完成或取消),以及獲取任務(wù)的結(jié)果和取消任務(wù)等。適合具有可取消性和執(zhí)行時間較長的異步任務(wù)。

并發(fā)包中許多異步任務(wù)類都繼承自Future,其中最典型的就是 FutureTask

1.1 介紹

Future 表示異步計(jì)算的結(jié)果。它提供了檢查計(jì)算是否完成的方法,以等待計(jì)算的完成,并獲取計(jì)算的結(jié)果。計(jì)算完成后只能使用get方法來獲取結(jié)果,如有必要,計(jì)算完成前可以阻塞此方法。取消則由 cancel 方法來執(zhí)行。還提供了其他方法,以確定任務(wù)是正常完成還是被取消了。一旦計(jì)算完成,就不能再取消計(jì)算。如果為了可取消性而使用 Future 但又不提供可用的結(jié)果,則可以聲明 Future 形式類型、并返回 null 作為底層任務(wù)的結(jié)果。

也就是說Future具有這樣的特性

  • 異步執(zhí)行,可用 get 方法獲取執(zhí)行結(jié)果
  • 如果計(jì)算還沒完成,get 方法是會阻塞的,如果完成了,是可以多次獲取并立即得到結(jié)果的
  • 如果計(jì)算還沒完成,是可以取消計(jì)算的
  • 可以查詢計(jì)算的執(zhí)行狀態(tài)

 

2 FutureTask

 

FutureTask 為 Future 提供了基礎(chǔ)實(shí)現(xiàn),如獲取任務(wù)執(zhí)行結(jié)果(get)和取消任務(wù)(cancel)等。如果任務(wù)尚未完成,獲取任務(wù)執(zhí)行結(jié)果時將會阻塞。一旦執(zhí)行結(jié)束,任務(wù)就不能被重啟或取消(除非使用runAndReset執(zhí)行計(jì)算)。

FutureTask 常用來封裝 Callable 和 Runnable,也可作為一個任務(wù)提交到線程池中執(zhí)行。除了作為一個獨(dú)立的類,此類也提供創(chuàng)建自定義 task 類使用。FutureTask 的線程安全由CAS保證。

 

FutureTask 內(nèi)部維護(hù)了一個由volatile修飾的int型變量—state,代表當(dāng)前任務(wù)的運(yùn)行狀態(tài)

  • NEW:新建
  • COMPLETING:完成
  • NORMAL:正常運(yùn)行
  • EXCEPTIONAL:異常退出
  • CANCELLED:任務(wù)取消
  • INTERRUPTING:線程中斷中
  • INTERRUPTED:線程已中斷

 

在這七種狀態(tài)中,有四種任務(wù)終止?fàn)顟B(tài):NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED。各種狀態(tài)的轉(zhuǎn)化如下:

數(shù)據(jù)結(jié)構(gòu)及核心參數(shù)

  1. //內(nèi)部持有的callable任務(wù),運(yùn)行完畢后置空 
  2. private Callable<V> callable; 
  3.  
  4. //從get()中返回的結(jié)果或拋出的異常 
  5. private Object outcome; // non-volatile, protected by state reads/writes 
  6.  
  7. //運(yùn)行callable的線程,在 run 時進(jìn)行 CAS 操作 
  8. private volatile Thread runner; 
  9.  
  10. //使用Treiber棧保存等待線程 
  11. private volatile WaitNode waiters; 

FutureTask 繼承了Runnale和Future,本身也作為一個線程運(yùn)行,可以提交給線程池執(zhí)行。維護(hù)了一個內(nèi)部類WaitNode,使用簡單的Treiber棧(無鎖并發(fā)棧)實(shí)現(xiàn),用于存儲等待線程。FutureTask 只有一個自定義的同步器 Sync 的屬性,所有的方法都是委派給此同步器來實(shí)現(xiàn)。這也是JUC里使用AQS的通用模式。

源碼解析

FutureTask 的同步器 由于Future在任務(wù)完成后,可以多次自由獲取結(jié)果,因此,用于控制同步的AQS使用共享模式。

 

FutureTask 底層任務(wù)的執(zhí)行狀態(tài)保存在AQS的狀態(tài)里。AQS是否允許線程獲取(是否阻塞)是取決于任務(wù)是否執(zhí)行完成,而不是具體的狀態(tài)值。

  1. private final class Sync extends AbstractQueuedSynchronizer { 
  2.     // 定義表示任務(wù)執(zhí)行狀態(tài)的常量。由于使用了位運(yùn)算進(jìn)行判斷,所以狀態(tài)值分別是2的冪。 
  3.  
  4.     // 表示任務(wù)已經(jīng)準(zhǔn)備好了,可以執(zhí)行 
  5.     private static final int READY     = 0; 
  6.  
  7.     // 表示任務(wù)正在執(zhí)行中 
  8.     private static final int RUNNING   = 1; 
  9.  
  10.     // 表示任務(wù)已執(zhí)行完成 
  11.     private static final int RAN       = 2; 
  12.  
  13.     // 表示任務(wù)已取消 
  14.     private static final int CANCELLED = 4; 
  15.  
  16.  
  17.     // 底層的表示任務(wù)的可執(zhí)行對象 
  18.     private final Callable<V> callable; 
  19.  
  20.     // 表示任務(wù)執(zhí)行結(jié)果,用于get方法返回。 
  21.     private V result; 
  22.  
  23.     // 表示任務(wù)執(zhí)行中的異常,用于get方法調(diào)用時拋出。 
  24.     private Throwable exception; 
  25.  
  26.      /* 
  27.      * 用于執(zhí)行任務(wù)的線程。在 set/cancel 方法后置為空,表示結(jié)果可獲取。 
  28.      * 必須是 volatile的,用于確保完成后(result和exception)的可見性。 
  29.      * (如果runner不是volatile,則result和exception必須都是volatile的) 
  30.      */ 
  31.     private volatile Thread runner; 
  32.  
  33.  
  34.      /** 
  35.      * 已完成或已取消 時成功獲取 
  36.      */ 
  37.     protected int tryAcquireShared( int ignore) { 
  38.         return innerIsDone() ? 1 : -1; 
  39.     } 
  40.  
  41.     /** 
  42.      * 在設(shè)置最終完成狀態(tài)后讓AQS總是通知,通過設(shè)置runner線程為空。 
  43.      * 這個方法并沒有更新AQS的state屬性, 
  44.      * 所以可見性是通過對volatile的runner的寫來保證的。 
  45.      */ 
  46.     protected boolean tryReleaseShared( int ignore) { 
  47.         runner = null
  48.         return true
  49.     } 
  50.  
  51.  
  52.      // 執(zhí)行任務(wù)的方法 
  53.     void innerRun() { 
  54.         // 用于確保任務(wù)不會重復(fù)執(zhí)行 
  55.         if (!compareAndSetState(READY, RUNNING)) 
  56.             return
  57.  
  58.         // 由于Future一般是異步執(zhí)行,所以runner一般是線程池里的線程。 
  59.         runner = Thread.currentThread(); 
  60.  
  61.         // 設(shè)置執(zhí)行線程后再次檢查,在執(zhí)行前檢查是否被異步取消 
  62.         // 由于前面的CAS已把狀態(tài)設(shè)置RUNNING, 
  63.         if (getState() == RUNNING) { // recheck after setting thread 
  64.             V result; 
  65.             // 
  66.             try { 
  67.                 result = callable.call(); 
  68.             } catch (Throwable ex) { 
  69.                 // 捕獲任務(wù)執(zhí)行過程中拋出的所有異常 
  70.                 setException(ex); 
  71.                 return
  72.             } 
  73.             set(result); 
  74.         } else { 
  75.       // 釋放等待的線程 
  76.             releaseShared(0); // cancel 
  77.         } 
  78.     } 
  79.  
  80.     // 設(shè)置結(jié)果 
  81.     void innerSet(V v) { 
  82.         // 放在循環(huán)里進(jìn)行是為了失敗后重試。 
  83.         for (;;) { 
  84.             // AQS初始化時,狀態(tài)值默認(rèn)是 0,對應(yīng)這里也就是 READY 狀態(tài)。 
  85.             int s = getState(); 
  86.  
  87.             // 已完成任務(wù)不能設(shè)置結(jié)果 
  88.             if (s == RAN) 
  89.                 return
  90.  
  91.             // 已取消 的任務(wù)不能設(shè)置結(jié)果 
  92.             if (s == CANCELLED) { 
  93.                 // releaseShared 會設(shè)置runner為空, 
  94.                 // 這是考慮到與其他的取消請求線程 競爭中斷 runner 
  95.                 releaseShared(0); 
  96.                 return
  97.             } 
  98.  
  99.             // 先設(shè)置已完成,免得多次設(shè)置 
  100.             if (compareAndSetState(s, RAN)) { 
  101.                 result = v; 
  102.                 releaseShared(0); // 此方法會更新 runner,保證result的可見性 
  103.                 done(); 
  104.                 return
  105.             } 
  106.         } 
  107.     } 
  108.  
  109.     // 獲取異步計(jì)算的結(jié)果 
  110.     V innerGet() throws InterruptedException, ExecutionException { 
  111.         acquireSharedInterruptibly(0);// 獲取共享,如果沒有完成則會阻塞。 
  112.  
  113.         // 檢查是否被取消 
  114.         if (getState() == CANCELLED) 
  115.             throw new CancellationException(); 
  116.  
  117.         // 異步計(jì)算過程中出現(xiàn)異常 
  118.         if (exception != null
  119.             throw new ExecutionException(exception); 
  120.  
  121.         return result; 
  122.     } 
  123.  
  124.     // 取消執(zhí)行任務(wù) 
  125.     boolean innerCancel( boolean mayInterruptIfRunning) { 
  126.         for (;;) { 
  127.             int s = getState(); 
  128.  
  129.             // 已完成或已取消的任務(wù)不能再次取消 
  130.             if (ranOrCancelled(s)) 
  131.                 return false
  132.  
  133.             // 任務(wù)處于 READY 或 RUNNING 
  134.             if (compareAndSetState(s, CANCELLED)) 
  135.                 break; 
  136.         } 
  137.         // 任務(wù)取消后,中斷執(zhí)行線程 
  138.         if (mayInterruptIfRunning) { 
  139.             Thread r = runner; 
  140.             if (r != null
  141.                 r.interrupt(); 
  142.         } 
  143.         releaseShared(0); // 釋放等待的訪問結(jié)果的線程 
  144.         done(); 
  145.         return true
  146.     } 
  147.  
  148.     /** 
  149.      * 檢查任務(wù)是否處于完成或取消狀態(tài) 
  150.      */ 
  151.     private boolean ranOrCancelled( int state) { 
  152.         return (state & (RAN | CANCELLED)) != 0; 
  153.     } 
  154.  
  155.      // 其他方法省略 

從 innerCancel 方法可知,取消操作只是改變了任務(wù)對象的狀態(tài)并可能會中斷執(zhí)行線程。如果任務(wù)的邏輯代碼沒有響應(yīng)中斷,則會一直異步執(zhí)行直到完成,只是最終的執(zhí)行結(jié)果不會被通過get方法返回,計(jì)算資源的開銷仍然是存在的。

總的來說,F(xiàn)uture 是線程間協(xié)調(diào)的一種工具。

AbstractExecutorService.submit(Callable task)

 

FutureTask 內(nèi)部實(shí)現(xiàn)方法都很簡單,先從線程池的submit分析。submit方法默認(rèn)實(shí)現(xiàn)在AbstractExecutorService,幾種實(shí)現(xiàn)源碼如下:

  1. public Future<?> submit(Runnable task) { 
  2.     if (task == null) throw new NullPointerException(); 
  3.     RunnableFuture<Void> ftask = newTaskFor(task, null); 
  4.     execute(ftask); 
  5.     return ftask; 
  6. public <T> Future<T> submit(Runnable task, T result) { 
  7.     if (task == null) throw new NullPointerException(); 
  8.     RunnableFuture<T> ftask = newTaskFor(task, result); 
  9.     execute(ftask); 
  10.     return ftask; 
  11. public <T> Future<T> submit(Callable<T> task) { 
  12.     if (task == null) throw new NullPointerException(); 
  13.     RunnableFuture<T> ftask = newTaskFor(task); 
  14.     execute(ftask); 
  15.     return ftask; 
  16. protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 
  17.     return new FutureTask<T>(runnable, value); 
  18. public FutureTask(Runnable runnable, V result) { 
  19.     this.callable = Executors.callable(runnable, result); 
  20.     this.state = NEW;       // ensure visibility of callable 

首先調(diào)用newTaskFor方法構(gòu)造FutureTask,然后調(diào)用execute把任務(wù)放進(jìn)線程池中,返回FutureTask

FutureTask.run()

  1. public void run() { 
  2.     //新建任務(wù),CAS替換runner為當(dāng)前線程 
  3.     if (state != NEW || 
  4.         !UNSAFE.compareAndSwapObject(this, runnerOffset, 
  5.                                      null, Thread.currentThread())) 
  6.         return
  7.     try { 
  8.         Callable<V> c = callable; 
  9.         if (c != null && state == NEW) { 
  10.             V result; 
  11.             boolean ran; 
  12.             try { 
  13.                 result = c.call(); 
  14.                 ran = true
  15.             } catch (Throwable ex) { 
  16.                 result = null
  17.                 ran = false
  18.                 setException(ex); 
  19.             } 
  20.             if (ran) 
  21.                 set(result);//設(shè)置執(zhí)行結(jié)果 
  22.         } 
  23.     } finally { 
  24.         // runner must be non-null until state is settled to 
  25.         // prevent concurrent calls to run() 
  26.         runner = null
  27.         // state must be re-read after nulling runner to prevent 
  28.         // leaked interrupts 
  29.         int s = state; 
  30.         if (s >= INTERRUPTING) 
  31.             handlePossibleCancellationInterrupt(s);//處理中斷邏輯 
  32.     } 

運(yùn)行任務(wù),如果任務(wù)狀態(tài)為NEW狀態(tài),則利用CAS修改為當(dāng)前線程。執(zhí)行完畢調(diào)用set(result)方法設(shè)置執(zhí)行結(jié)果。 set(result)源碼如下

首先利用cas修改state狀態(tài)為

設(shè)置返回結(jié)果,然后使用 lazySet(UNSAFE.putOrderedInt)的方式設(shè)置state狀態(tài)為

結(jié)果設(shè)置完畢后,調(diào)用finishCompletion()喚醒等待線程

  1. private void finishCompletion() { 
  2.     for (WaitNode q; (q = waiters) != null;) { 
  3.         if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//移除等待線程 
  4.             for (;;) {//自旋遍歷等待線程 
  5.                 Thread t = q.thread; 
  6.                 if (t != null) { 
  7.                     q.thread = null
  8.                     LockSupport.unpark(t);//喚醒等待線程 
  9.                 } 
  10.                 WaitNode next = q.next
  11.                 if (next == null
  12.                     break; 
  13.                 q.next = null; // unlink to help gc 
  14.                 q = next
  15.             } 
  16.             break; 
  17.         } 
  18.     } 
  19.     //任務(wù)完成后調(diào)用函數(shù),自定義擴(kuò)展 
  20.     done(); 
  21.     callable = null;        // to reduce footprint 

回到run方法,如果在 run 期間被中斷,此時需要調(diào)用handlePossibleCancellationInterrupt處理中斷邏輯,確保任何中斷(例如cancel(true))只停留在當(dāng)前run或runAndReset的任務(wù)中

  1. private void handlePossibleCancellationInterrupt(int s) { 
  2.     //在中斷者中斷線程之前可能會延遲,所以我們只需要讓出CPU時間片自旋等待 
  3.     if (s == INTERRUPTING) 
  4.         while (state == INTERRUPTING) 
  5.             Thread.yield(); // wait out pending interrupt 

FutureTask.runAndReset()

runAndReset是 FutureTask另外一個任務(wù)執(zhí)行的方法,它不會返回執(zhí)行結(jié)果,而且在任務(wù)執(zhí)行完之后會重置stat的狀態(tài)為NEW,使任務(wù)可以多次執(zhí)行。 runAndReset的典型應(yīng)用是在 ScheduledThreadPoolExecutor 中,周期性的執(zhí)行任務(wù)。

 

FutureTask.get()

FutureTask 通過get()獲取任務(wù)執(zhí)行結(jié)果。如果任務(wù)處于未完成的狀態(tài)(state <= COMPLETING),就調(diào)用awaitDone等待任務(wù)完成。任務(wù)完成后,通過report獲取執(zhí)行結(jié)果或拋出執(zhí)行期間的異常。

awaitDone(boolean timed, long nanos)

  1. private int awaitDone(boolean timed, long nanos) 
  2.     throws InterruptedException { 
  3.     final long deadline = timed ? System.nanoTime() + nanos : 0L; 
  4.     WaitNode q = null
  5.     boolean queued = false
  6.     for (;;) {//自旋 
  7.         if (Thread.interrupted()) {//獲取并清除中斷狀態(tài) 
  8.             removeWaiter(q);//移除等待WaitNode 
  9.             throw new InterruptedException(); 
  10.         } 
  11.  
  12.         int s = state; 
  13.         if (s > COMPLETING) { 
  14.             if (q != null
  15.                 q.thread = null;//置空等待節(jié)點(diǎn)的線程 
  16.             return s; 
  17.         } 
  18.         else if (s == COMPLETING) // cannot time out yet 
  19.             Thread.yield(); 
  20.         else if (q == null
  21.             q = new WaitNode(); 
  22.         else if (!queued) 
  23.             //CAS修改waiter 
  24.             queued = UNSAFE.compareAndSwapObject(this, waitersOffset, 
  25.                                                  q.next = waiters, q); 
  26.         else if (timed) { 
  27.             nanos = deadline - System.nanoTime(); 
  28.             if (nanos <= 0L) { 
  29.                 removeWaiter(q);//超時,移除等待節(jié)點(diǎn) 
  30.                 return state; 
  31.             } 
  32.             LockSupport.parkNanos(this, nanos);//阻塞當(dāng)前線程 
  33.         } 
  34.         else 
  35.             LockSupport.park(this);//阻塞當(dāng)前線程 
  36.     } 

awaitDone用于等待任務(wù)完成,或任務(wù)因?yàn)橹袛嗷虺瑫r而終止。返回任務(wù)的完成狀態(tài)。

1.如果線程被中斷,首先清除中斷狀態(tài),調(diào)用removeWaiter移除等待節(jié)點(diǎn),然后拋InterruptedException。removeWaiter源碼如下:

  1. private void removeWaiter(WaitNode node) { 
  2.     if (node != null) { 
  3.         node.thread = null;//首先置空線程 
  4.         retry: 
  5.         for (;;) {          // restart on removeWaiter race 
  6.             //依次遍歷查找 
  7.             for (WaitNode pred = null, q = waiters, s; q != null; q = s) { 
  8.                 s = q.next
  9.                 if (q.thread != null
  10.                     pred = q; 
  11.                 else if (pred != null) { 
  12.                     pred.next = s; 
  13.                     if (pred.thread == null) // check for race 
  14.                         continue retry; 
  15.                 } 
  16.                 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s)) //cas替換 
  17.                     continue retry; 
  18.             } 
  19.             break; 
  20.         } 
  21.     } 

2.如果當(dāng)前為結(jié)束態(tài)(state>COMPLETING),則根據(jù)需要置空等待節(jié)點(diǎn)的線程,并返回 Future 狀態(tài)

3.如果當(dāng)前為正在完成(COMPLETING),說明此時 Future 還不能做出超時動作,為任務(wù)讓出CPU執(zhí)行時間片

4.如果state為NEW,先新建一個WaitNode,然后CAS修改當(dāng)前waiters

5.如果等待超時,則調(diào)用removeWaiter移除等待節(jié)點(diǎn),返回任務(wù)狀態(tài);如果設(shè)置了超時時間但是尚未超時,則park阻塞當(dāng)前線程

6.其他情況直接阻塞當(dāng)前線程

 

FutureTask.cancel(boolean mayInterruptIfRunning)

  1. public boolean cancel(boolean mayInterruptIfRunning) { 
  2.     //如果當(dāng)前Future狀態(tài)為NEW,根據(jù)參數(shù)修改Future狀態(tài)為INTERRUPTING或CANCELLED 
  3.     if (!(state == NEW && 
  4.           UNSAFE.compareAndSwapInt(this, stateOffset, NEW, 
  5.               mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) 
  6.         return false
  7.     try {    // in case call to interrupt throws exception 
  8.         if (mayInterruptIfRunning) {//可以在運(yùn)行時中斷 
  9.             try { 
  10.                 Thread t = runner; 
  11.                 if (t != null
  12.                     t.interrupt(); 
  13.             } finally { // final state 
  14.                 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); 
  15.             } 
  16.         } 
  17.     } finally { 
  18.         finishCompletion();//移除并喚醒所有等待線程 
  19.     } 
  20.     return true

說明:嘗試取消任務(wù)。如果任務(wù)已經(jīng)完成或已經(jīng)被取消,此操作會失敗。如果當(dāng)前Future狀態(tài)為NEW,根據(jù)參數(shù)修改Future狀態(tài)為INTERRUPTING或CANCELLED。如果當(dāng)前狀態(tài)不為NEW,則根據(jù)參數(shù)mayInterruptIfRunning決定是否在任務(wù)運(yùn)行中也可以中斷。中斷操作完成后,調(diào)用finishCompletion移除并喚醒所有等待線程。

 

示例

小結(jié)
本章重點(diǎn):FutureTask 結(jié)果返回機(jī)制,以及內(nèi)部運(yùn)行狀態(tài)的轉(zhuǎn)變

 

 

責(zé)任編輯:姜華 來源: JavaEdge
相關(guān)推薦

2020-10-26 09:02:45

如何校驗(yàn)參數(shù)

2019-02-22 10:00:45

Java開發(fā)代碼

2022-06-02 11:12:10

CallableFuture

2019-10-24 11:03:56

HadoopGoogle硬件

2019-10-24 15:15:19

Hadoop框架數(shù)據(jù)

2020-01-16 15:35:00

高并發(fā)架構(gòu)服務(wù)器

2021-10-25 09:41:04

架構(gòu)運(yùn)維技術(shù)

2020-10-26 11:41:47

kill代碼

2020-01-14 14:37:29

JVMJava體系

2009-02-26 16:32:58

SaaS開發(fā)SaaS應(yīng)用Open API

2020-06-28 14:15:52

前端架構(gòu)師互聯(lián)網(wǎng)

2020-12-07 09:40:19

Future&Futu編程Java

2019-07-22 22:22:02

架構(gòu)運(yùn)維技術(shù)

2019-08-22 10:54:05

分布式系統(tǒng)架構(gòu)

2021-02-01 07:40:55

架構(gòu)師阿里技專家

2021-06-07 09:35:11

架構(gòu)運(yùn)維技術(shù)

2020-06-28 08:34:07

架構(gòu)師阿里軟件

2021-08-20 07:53:07

Android動態(tài)換膚

2019-07-31 07:36:12

架構(gòu)運(yùn)維技術(shù)

2019-10-31 09:52:03

Android代碼規(guī)范
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號