阿里架構(gòu)師教你JUC-Future與FutureTask原理詳解
1 Future
Future 表示一個任務(wù)的生命周期,是一個可取消的異步運(yùn)算。提供了相應(yīng)的方法來判斷任務(wù)狀態(tài)(完成或取消),以及獲取任務(wù)的結(jié)果和取消任務(wù)等。適合具有可取消性和執(zhí)行時間較長的異步任務(wù)。
并發(fā)包中許多異步任務(wù)類都繼承自Future,其中最典型的就是 FutureTask
1.1 介紹
Future 表示異步計(jì)算的結(jié)果。它提供了檢查計(jì)算是否完成的方法,以等待計(jì)算的完成,并獲取計(jì)算的結(jié)果。計(jì)算完成后只能使用get方法來獲取結(jié)果,如有必要,計(jì)算完成前可以阻塞此方法。取消則由 cancel 方法來執(zhí)行。還提供了其他方法,以確定任務(wù)是正常完成還是被取消了。一旦計(jì)算完成,就不能再取消計(jì)算。如果為了可取消性而使用 Future 但又不提供可用的結(jié)果,則可以聲明 Future 形式類型、并返回 null 作為底層任務(wù)的結(jié)果。
也就是說Future具有這樣的特性
- 異步執(zhí)行,可用 get 方法獲取執(zhí)行結(jié)果
- 如果計(jì)算還沒完成,get 方法是會阻塞的,如果完成了,是可以多次獲取并立即得到結(jié)果的
- 如果計(jì)算還沒完成,是可以取消計(jì)算的
- 可以查詢計(jì)算的執(zhí)行狀態(tài)
2 FutureTask
FutureTask 為 Future 提供了基礎(chǔ)實(shí)現(xiàn),如獲取任務(wù)執(zhí)行結(jié)果(get)和取消任務(wù)(cancel)等。如果任務(wù)尚未完成,獲取任務(wù)執(zhí)行結(jié)果時將會阻塞。一旦執(zhí)行結(jié)束,任務(wù)就不能被重啟或取消(除非使用runAndReset執(zhí)行計(jì)算)。
FutureTask 常用來封裝 Callable 和 Runnable,也可作為一個任務(wù)提交到線程池中執(zhí)行。除了作為一個獨(dú)立的類,此類也提供創(chuàng)建自定義 task 類使用。FutureTask 的線程安全由CAS保證。
FutureTask 內(nèi)部維護(hù)了一個由volatile修飾的int型變量—state,代表當(dāng)前任務(wù)的運(yùn)行狀態(tài)
- NEW:新建
- COMPLETING:完成
- NORMAL:正常運(yùn)行
- EXCEPTIONAL:異常退出
- CANCELLED:任務(wù)取消
- INTERRUPTING:線程中斷中
- INTERRUPTED:線程已中斷
在這七種狀態(tài)中,有四種任務(wù)終止?fàn)顟B(tài):NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED。各種狀態(tài)的轉(zhuǎn)化如下:
數(shù)據(jù)結(jié)構(gòu)及核心參數(shù)
- //內(nèi)部持有的callable任務(wù),運(yùn)行完畢后置空
- private Callable<V> callable;
- //從get()中返回的結(jié)果或拋出的異常
- private Object outcome; // non-volatile, protected by state reads/writes
- //運(yùn)行callable的線程,在 run 時進(jìn)行 CAS 操作
- private volatile Thread runner;
- //使用Treiber棧保存等待線程
- private volatile WaitNode waiters;
FutureTask 繼承了Runnale和Future,本身也作為一個線程運(yùn)行,可以提交給線程池執(zhí)行。維護(hù)了一個內(nèi)部類WaitNode,使用簡單的Treiber棧(無鎖并發(fā)棧)實(shí)現(xiàn),用于存儲等待線程。FutureTask 只有一個自定義的同步器 Sync 的屬性,所有的方法都是委派給此同步器來實(shí)現(xiàn)。這也是JUC里使用AQS的通用模式。
源碼解析
FutureTask 的同步器 由于Future在任務(wù)完成后,可以多次自由獲取結(jié)果,因此,用于控制同步的AQS使用共享模式。
FutureTask 底層任務(wù)的執(zhí)行狀態(tài)保存在AQS的狀態(tài)里。AQS是否允許線程獲取(是否阻塞)是取決于任務(wù)是否執(zhí)行完成,而不是具體的狀態(tài)值。
- private final class Sync extends AbstractQueuedSynchronizer {
- // 定義表示任務(wù)執(zhí)行狀態(tài)的常量。由于使用了位運(yùn)算進(jìn)行判斷,所以狀態(tài)值分別是2的冪。
- // 表示任務(wù)已經(jīng)準(zhǔn)備好了,可以執(zhí)行
- private static final int READY = 0;
- // 表示任務(wù)正在執(zhí)行中
- private static final int RUNNING = 1;
- // 表示任務(wù)已執(zhí)行完成
- private static final int RAN = 2;
- // 表示任務(wù)已取消
- private static final int CANCELLED = 4;
- // 底層的表示任務(wù)的可執(zhí)行對象
- private final Callable<V> callable;
- // 表示任務(wù)執(zhí)行結(jié)果,用于get方法返回。
- private V result;
- // 表示任務(wù)執(zhí)行中的異常,用于get方法調(diào)用時拋出。
- private Throwable exception;
- /*
- * 用于執(zhí)行任務(wù)的線程。在 set/cancel 方法后置為空,表示結(jié)果可獲取。
- * 必須是 volatile的,用于確保完成后(result和exception)的可見性。
- * (如果runner不是volatile,則result和exception必須都是volatile的)
- */
- private volatile Thread runner;
- /**
- * 已完成或已取消 時成功獲取
- */
- protected int tryAcquireShared( int ignore) {
- return innerIsDone() ? 1 : -1;
- }
- /**
- * 在設(shè)置最終完成狀態(tài)后讓AQS總是通知,通過設(shè)置runner線程為空。
- * 這個方法并沒有更新AQS的state屬性,
- * 所以可見性是通過對volatile的runner的寫來保證的。
- */
- protected boolean tryReleaseShared( int ignore) {
- runner = null;
- return true;
- }
- // 執(zhí)行任務(wù)的方法
- void innerRun() {
- // 用于確保任務(wù)不會重復(fù)執(zhí)行
- if (!compareAndSetState(READY, RUNNING))
- return;
- // 由于Future一般是異步執(zhí)行,所以runner一般是線程池里的線程。
- runner = Thread.currentThread();
- // 設(shè)置執(zhí)行線程后再次檢查,在執(zhí)行前檢查是否被異步取消
- // 由于前面的CAS已把狀態(tài)設(shè)置RUNNING,
- if (getState() == RUNNING) { // recheck after setting thread
- V result;
- //
- try {
- result = callable.call();
- } catch (Throwable ex) {
- // 捕獲任務(wù)執(zhí)行過程中拋出的所有異常
- setException(ex);
- return;
- }
- set(result);
- } else {
- // 釋放等待的線程
- releaseShared(0); // cancel
- }
- }
- // 設(shè)置結(jié)果
- void innerSet(V v) {
- // 放在循環(huán)里進(jìn)行是為了失敗后重試。
- for (;;) {
- // AQS初始化時,狀態(tài)值默認(rèn)是 0,對應(yīng)這里也就是 READY 狀態(tài)。
- int s = getState();
- // 已完成任務(wù)不能設(shè)置結(jié)果
- if (s == RAN)
- return;
- // 已取消 的任務(wù)不能設(shè)置結(jié)果
- if (s == CANCELLED) {
- // releaseShared 會設(shè)置runner為空,
- // 這是考慮到與其他的取消請求線程 競爭中斷 runner
- releaseShared(0);
- return;
- }
- // 先設(shè)置已完成,免得多次設(shè)置
- if (compareAndSetState(s, RAN)) {
- result = v;
- releaseShared(0); // 此方法會更新 runner,保證result的可見性
- done();
- return;
- }
- }
- }
- // 獲取異步計(jì)算的結(jié)果
- V innerGet() throws InterruptedException, ExecutionException {
- acquireSharedInterruptibly(0);// 獲取共享,如果沒有完成則會阻塞。
- // 檢查是否被取消
- if (getState() == CANCELLED)
- throw new CancellationException();
- // 異步計(jì)算過程中出現(xiàn)異常
- if (exception != null)
- throw new ExecutionException(exception);
- return result;
- }
- // 取消執(zhí)行任務(wù)
- boolean innerCancel( boolean mayInterruptIfRunning) {
- for (;;) {
- int s = getState();
- // 已完成或已取消的任務(wù)不能再次取消
- if (ranOrCancelled(s))
- return false;
- // 任務(wù)處于 READY 或 RUNNING
- if (compareAndSetState(s, CANCELLED))
- break;
- }
- // 任務(wù)取消后,中斷執(zhí)行線程
- if (mayInterruptIfRunning) {
- Thread r = runner;
- if (r != null)
- r.interrupt();
- }
- releaseShared(0); // 釋放等待的訪問結(jié)果的線程
- done();
- return true;
- }
- /**
- * 檢查任務(wù)是否處于完成或取消狀態(tài)
- */
- private boolean ranOrCancelled( int state) {
- return (state & (RAN | CANCELLED)) != 0;
- }
- // 其他方法省略
- }
從 innerCancel 方法可知,取消操作只是改變了任務(wù)對象的狀態(tài)并可能會中斷執(zhí)行線程。如果任務(wù)的邏輯代碼沒有響應(yīng)中斷,則會一直異步執(zhí)行直到完成,只是最終的執(zhí)行結(jié)果不會被通過get方法返回,計(jì)算資源的開銷仍然是存在的。
總的來說,F(xiàn)uture 是線程間協(xié)調(diào)的一種工具。
AbstractExecutorService.submit(Callable task)
FutureTask 內(nèi)部實(shí)現(xiàn)方法都很簡單,先從線程池的submit分析。submit方法默認(rèn)實(shí)現(xiàn)在AbstractExecutorService,幾種實(shí)現(xiàn)源碼如下:
- public Future<?> submit(Runnable task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<Void> ftask = newTaskFor(task, null);
- execute(ftask);
- return ftask;
- }
- public <T> Future<T> submit(Runnable task, T result) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<T> ftask = newTaskFor(task, result);
- execute(ftask);
- return ftask;
- }
- public <T> Future<T> submit(Callable<T> task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<T> ftask = newTaskFor(task);
- execute(ftask);
- return ftask;
- }
- protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
- return new FutureTask<T>(runnable, value);
- }
- public FutureTask(Runnable runnable, V result) {
- this.callable = Executors.callable(runnable, result);
- this.state = NEW; // ensure visibility of callable
- }
首先調(diào)用newTaskFor方法構(gòu)造FutureTask,然后調(diào)用execute把任務(wù)放進(jìn)線程池中,返回FutureTask
FutureTask.run()
- public void run() {
- //新建任務(wù),CAS替換runner為當(dāng)前線程
- if (state != NEW ||
- !UNSAFE.compareAndSwapObject(this, runnerOffset,
- null, Thread.currentThread()))
- return;
- try {
- Callable<V> c = callable;
- if (c != null && state == NEW) {
- V result;
- boolean ran;
- try {
- result = c.call();
- ran = true;
- } catch (Throwable ex) {
- result = null;
- ran = false;
- setException(ex);
- }
- if (ran)
- set(result);//設(shè)置執(zhí)行結(jié)果
- }
- } finally {
- // runner must be non-null until state is settled to
- // prevent concurrent calls to run()
- runner = null;
- // state must be re-read after nulling runner to prevent
- // leaked interrupts
- int s = state;
- if (s >= INTERRUPTING)
- handlePossibleCancellationInterrupt(s);//處理中斷邏輯
- }
- }
運(yùn)行任務(wù),如果任務(wù)狀態(tài)為NEW狀態(tài),則利用CAS修改為當(dāng)前線程。執(zhí)行完畢調(diào)用set(result)方法設(shè)置執(zhí)行結(jié)果。 set(result)源碼如下
首先利用cas修改state狀態(tài)為
設(shè)置返回結(jié)果,然后使用 lazySet(UNSAFE.putOrderedInt)的方式設(shè)置state狀態(tài)為
結(jié)果設(shè)置完畢后,調(diào)用finishCompletion()喚醒等待線程
- private void finishCompletion() {
- for (WaitNode q; (q = waiters) != null;) {
- if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//移除等待線程
- for (;;) {//自旋遍歷等待線程
- Thread t = q.thread;
- if (t != null) {
- q.thread = null;
- LockSupport.unpark(t);//喚醒等待線程
- }
- WaitNode next = q.next;
- if (next == null)
- break;
- q.next = null; // unlink to help gc
- q = next;
- }
- break;
- }
- }
- //任務(wù)完成后調(diào)用函數(shù),自定義擴(kuò)展
- done();
- callable = null; // to reduce footprint
- }
回到run方法,如果在 run 期間被中斷,此時需要調(diào)用handlePossibleCancellationInterrupt處理中斷邏輯,確保任何中斷(例如cancel(true))只停留在當(dāng)前run或runAndReset的任務(wù)中
- private void handlePossibleCancellationInterrupt(int s) {
- //在中斷者中斷線程之前可能會延遲,所以我們只需要讓出CPU時間片自旋等待
- if (s == INTERRUPTING)
- while (state == INTERRUPTING)
- Thread.yield(); // wait out pending interrupt
- }
FutureTask.runAndReset()
runAndReset是 FutureTask另外一個任務(wù)執(zhí)行的方法,它不會返回執(zhí)行結(jié)果,而且在任務(wù)執(zhí)行完之后會重置stat的狀態(tài)為NEW,使任務(wù)可以多次執(zhí)行。 runAndReset的典型應(yīng)用是在 ScheduledThreadPoolExecutor 中,周期性的執(zhí)行任務(wù)。
FutureTask.get()
FutureTask 通過get()獲取任務(wù)執(zhí)行結(jié)果。如果任務(wù)處于未完成的狀態(tài)(state <= COMPLETING),就調(diào)用awaitDone等待任務(wù)完成。任務(wù)完成后,通過report獲取執(zhí)行結(jié)果或拋出執(zhí)行期間的異常。
awaitDone(boolean timed, long nanos)
- private int awaitDone(boolean timed, long nanos)
- throws InterruptedException {
- final long deadline = timed ? System.nanoTime() + nanos : 0L;
- WaitNode q = null;
- boolean queued = false;
- for (;;) {//自旋
- if (Thread.interrupted()) {//獲取并清除中斷狀態(tài)
- removeWaiter(q);//移除等待WaitNode
- throw new InterruptedException();
- }
- int s = state;
- if (s > COMPLETING) {
- if (q != null)
- q.thread = null;//置空等待節(jié)點(diǎn)的線程
- return s;
- }
- else if (s == COMPLETING) // cannot time out yet
- Thread.yield();
- else if (q == null)
- q = new WaitNode();
- else if (!queued)
- //CAS修改waiter
- queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
- q.next = waiters, q);
- else if (timed) {
- nanos = deadline - System.nanoTime();
- if (nanos <= 0L) {
- removeWaiter(q);//超時,移除等待節(jié)點(diǎn)
- return state;
- }
- LockSupport.parkNanos(this, nanos);//阻塞當(dāng)前線程
- }
- else
- LockSupport.park(this);//阻塞當(dāng)前線程
- }
- }
awaitDone用于等待任務(wù)完成,或任務(wù)因?yàn)橹袛嗷虺瑫r而終止。返回任務(wù)的完成狀態(tài)。
1.如果線程被中斷,首先清除中斷狀態(tài),調(diào)用removeWaiter移除等待節(jié)點(diǎn),然后拋InterruptedException。removeWaiter源碼如下:
- private void removeWaiter(WaitNode node) {
- if (node != null) {
- node.thread = null;//首先置空線程
- retry:
- for (;;) { // restart on removeWaiter race
- //依次遍歷查找
- for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
- s = q.next;
- if (q.thread != null)
- pred = q;
- else if (pred != null) {
- pred.next = s;
- if (pred.thread == null) // check for race
- continue retry;
- }
- else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s)) //cas替換
- continue retry;
- }
- break;
- }
- }
- }
2.如果當(dāng)前為結(jié)束態(tài)(state>COMPLETING),則根據(jù)需要置空等待節(jié)點(diǎn)的線程,并返回 Future 狀態(tài)
3.如果當(dāng)前為正在完成(COMPLETING),說明此時 Future 還不能做出超時動作,為任務(wù)讓出CPU執(zhí)行時間片
4.如果state為NEW,先新建一個WaitNode,然后CAS修改當(dāng)前waiters
5.如果等待超時,則調(diào)用removeWaiter移除等待節(jié)點(diǎn),返回任務(wù)狀態(tài);如果設(shè)置了超時時間但是尚未超時,則park阻塞當(dāng)前線程
6.其他情況直接阻塞當(dāng)前線程
FutureTask.cancel(boolean mayInterruptIfRunning)
- public boolean cancel(boolean mayInterruptIfRunning) {
- //如果當(dāng)前Future狀態(tài)為NEW,根據(jù)參數(shù)修改Future狀態(tài)為INTERRUPTING或CANCELLED
- if (!(state == NEW &&
- UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
- mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
- return false;
- try { // in case call to interrupt throws exception
- if (mayInterruptIfRunning) {//可以在運(yùn)行時中斷
- try {
- Thread t = runner;
- if (t != null)
- t.interrupt();
- } finally { // final state
- UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
- }
- }
- } finally {
- finishCompletion();//移除并喚醒所有等待線程
- }
- return true;
- }
說明:嘗試取消任務(wù)。如果任務(wù)已經(jīng)完成或已經(jīng)被取消,此操作會失敗。如果當(dāng)前Future狀態(tài)為NEW,根據(jù)參數(shù)修改Future狀態(tài)為INTERRUPTING或CANCELLED。如果當(dāng)前狀態(tài)不為NEW,則根據(jù)參數(shù)mayInterruptIfRunning決定是否在任務(wù)運(yùn)行中也可以中斷。中斷操作完成后,調(diào)用finishCompletion移除并喚醒所有等待線程。
示例
小結(jié)
本章重點(diǎn):FutureTask 結(jié)果返回機(jī)制,以及內(nèi)部運(yùn)行狀態(tài)的轉(zhuǎn)變