Android Bolts-更簡單的完成線程調度和任務管理
尤塞恩·圣利奧·博爾特 Usain St Leo Bolt ,牙買加短跑運動員,男子100米、男子200米以及男子400米接力賽的世界紀錄保持人,同時是以上三項賽事的連續(xù)三屆奧運金牌得主。
使用 Bolts 可以將一個完整的操作拆分成多個子任務,這些子任務可以自由的拆分、組合和替換,每個任務作為整個任務鏈的一環(huán)可以運行在指定線程中,同時既能從上行任務中獲取任務結果,又可以向下行任務發(fā)布當前任務的結果,而不必考慮線程之間的交互。
前言
一個關于線程調度的簡單需求,在子線程從網(wǎng)絡下載圖片,并返回下載的圖片,在主線程使用該圖片更新到 UI,同時返回當前 UI 的狀態(tài) json,在子線程將 json 數(shù)據(jù)保存到本地文件,完成后在主線程彈出提示,這中間涉及到了 4 次線程切換,同時后面的任務需要前面任務完成后的返回值作為參數(shù)。
使用 Thread + Handler 實現(xiàn),線程調度很不靈活,代碼可讀性差,不美觀,擴展性差,錯誤處理異常麻煩。
- String url = "http://www.baidu.com";
- Handler handler = new Handler(Looper.getMainLooper());
- new Thread(() -> {
- // 下載
- Bitmap bitmap = downloadBitmap(url);
- handler.post(() -> {
- // 更新 UI
- String json = updateUI(bitmap);
- new Thread(() -> {
- // 向存儲寫入UI狀態(tài)
- saveUIState(json);
- // 保存成功后,提示
- handler.post(() -> toastMsg("save finish."));
- }).start();
- });
- }).start();
使用 RxJava 實現(xiàn),線程調度非常靈活,鏈式調用,代碼清晰,擴展性好,有統(tǒng)一的異常處理機制,不過 Rx 是一個很強大的庫,如果只用來做線程調度的話, Rx 就顯得有點太重了。
- Observable.just(URL)
- // 下載
- .map(this::downloadBitmap)
- .subscribeOn(Schedulers.newThread())
- // 更新UI
- .observeOn(AndroidSchedulers.mainThread())
- .map(this::updateUI)
- // 存儲 UI 狀態(tài)
- .observeOn(Schedulers.io())
- .map(this::saveUIState)
- // 顯示提示
- .observeOn(AndroidSchedulers.mainThread())
- .subscribe(rst -> toastMsg("save to " + rst),
- // handle error
- Throwable::printStackTrace);
使用 bolts 實現(xiàn),線程調度靈活,鏈式調用,代碼清晰,具有良好的擴展性,具有統(tǒng)一的異常處理機制,雖然沒有 Rx 那么豐富的操作符,但是勝在類庫非常非常小,只有 38 KB。
- Task
- .forResult(URL)
- // 下載
- .onSuccess(task -> downloadBitmap(task.getResult()), Task.BACKGROUND_EXECUTOR)
- // 更新UI
- .onSuccess(task -> updateUI(task.getResult()), Task.UI_THREAD_EXECUTOR)
- // 存儲UI狀態(tài)
- .onSuccess(task -> saveUIState(task.getResult()), Task.BACKGROUND_EXECUTOR)
- // 提示
- .onSuccess(task -> toastMsg("save to " + task.getResult()), Task.UI_THREAD_EXECUT
- // handle error
- .continueWith(task -> {
- if (task.isFaulted()) {
- task.getError().printStackTrace();
- return false;
- }
- return true;
- });
線程調度器
共有 4 種類型執(zhí)行線程,將任務分發(fā)到指定線程執(zhí)行,分別是
- backgroud - 后臺線程池,可以并發(fā)執(zhí)行任務。
- scheduled - 單線程池,只有一個線程,主要用來執(zhí)行 delay 操作。
- immediate - 即時線程,如果線程調用棧小于 15,則在當前線程執(zhí)行,否則代理給 background 。
- uiThread - 針對 Android 設計,使用 Handler 發(fā)送到主線程執(zhí)行。
backgroud
主要用來在后臺并發(fā)執(zhí)行多任務
- public static final ExecutorService BACKGROUND_EXECUTOR = BoltsExecutors.background();
在 Android 平臺下根據(jù) CPU 核數(shù)創(chuàng)建線程池,其他情況下,創(chuàng)建緩存線程池。
- background = !isAndroidRuntime()
- ? java.util.concurrent.Executors.newCachedThreadPool()
- : AndroidExecutors.newCachedThreadPool();
scheduled
主要用于任務之間做 delay 操作,并不實際執(zhí)行任務。
- scheduled = Executors.newSingleThreadScheduledExecutor();
immediate
主要用來簡化那些不指定運行線程的方法,默認在當前線程去執(zhí)行任務,使用 ThreadLocal 保存每個線程調用棧的深度,如果深度不超過 15,則在當前線程執(zhí)行,否則代理給 backgroud 執(zhí)行。
- private static final Executor IMMEDIATE_EXECUTOR = BoltsExecutors.immediate();
- // 關鍵方法
- @Override
- public void execute(Runnable command) {
- int depth = incrementDepth();
- try {
- if (depth <= MAX_DEPTH) {
- command.run();
- } else {
- BoltsExecutors.background().execute(command)
- }
- } finally {
- decrementDepth();
- }
- }
uiThread
為 Android 專門設計,在主線程執(zhí)行任務。
- public static final Executor UI_THREAD_EXECUTOR = AndroidExecutors.uiThread();
- private static class UIThreadExecutor implements Executor {
- @Override
- public void execute(Runnable command) {
- new Handler(Looper.getMainLooper()).post(command);
- }
- }
核心類
Task ,最核心的類,每個子任務都是一個 Task ,它們負責自己需要執(zhí)行的任務。每個 Task 具有 3 種狀態(tài) Result 、 Error 和 Cancel ,分別代表成功、異常和取消。
Continuation ,是一個接口,它就像鏈接子任務每一環(huán)的鎖扣,把一個個獨立的任務鏈接在一起。
通過 Task - Continuation - Task - Continuation ... 的形式組成完整的任務鏈,順序在各自線程執(zhí)行。
創(chuàng)建 Task
根據(jù) Task 的 3 種狀態(tài),創(chuàng)建簡單的 Task ,會復用已有的任務對象
- public static <TResult> Task<TResult> forResult(TResult value)
- public static <TResult> Task<TResult> forError(Exception error)
- public static <TResult> Task<TResult> cancelled()
使用 delay 方法,延時執(zhí)行并創(chuàng)建 Task
- public static Task delay(long delay)
- public static Task delay(long delay, CancellationToken cancellationToken)
使用 whenAny 方法,執(zhí)行多個任務,當任意任務返回結果時,保存這個結果
- public static Task> whenAnyResult(Collection> tasks)
- public static Task> whenAny(Collection> tasks)
使用 whenAll 方法,執(zhí)行多個任務,當全部任務執(zhí)行完后,返回結果
- public static Task whenAll(Collection> tasks)
- public static Task> whenAllResult(final Collection> tasks)
使用 call 方法,執(zhí)行一個任務,同時創(chuàng)建 Task
- public static Task call(final Callable callable, Executor executor,
- final CancellationToken ct)
鏈接子任務
使用 continueWith 方法,鏈接一個子任務,如果前行任務已經(jīng)執(zhí)行完成,則立即執(zhí)行當前任務,否則加入隊列中,等待。
- public <TContinuationResult> Task<TContinuationResult> continueWith(
- final Continuation<TResult, TContinuationResult> continuation, final Executor executor,
- final CancellationToken ct)
使用 continueWithTask 方法,在當前任務之后鏈接另一個任務鏈,這種做法是為了滿足那種將部分任務組合在一起分離出去,作為公共任務的場景,他接受將另外一個完全獨立的任務鏈,追加在當前執(zhí)行的任務后面。
- public <TContinuationResult> Task<TContinuationResult> continueWithTask(
- final Continuation<TResult, Task<TContinuationResult>> continuation, final Executor executor,
- final CancellationToken ct)
使用 continueWhile 方法鏈接子任務,與 continueWith 區(qū)別在于,他有一個 predicate 表達式,只有當表達式成立時,才會追加子任務,這樣做是在執(zhí)行任務前可以做一個攔截操作,也是為了不破環(huán)鏈式調用的整體風格。
- public Task<Void> continueWhile(final Callable<Boolean> predicate,
- final Continuation<Void, Task<Void>> continuation, final Executor executor,
- final CancellationToken ct)
使用 onSuccess 和 onSuccessTask 鏈接單個任務個任務鏈,區(qū)別于 continueWith 在于, onSuccess 方法,前行任務如果失敗了,后行的任務也會直接失敗,不會再執(zhí)行,但是 continueWith 的各個子任務之間沒有關聯(lián),就算前行任務失敗,后行任務也會執(zhí)行。
- public <TContinuationResult> Task<TContinuationResult> onSuccess(
- final Continuation<TResult, TContinuationResult> continuation, Executor executor,
- final CancellationToken ct)
取消任務
- CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
- CancellationToken token = cancellationTokenSource.getToken();
- Task.call((Callable<String>) () -> null,
- Task.BACKGROUND_EXECUTOR,
- token);
- // 取消任務
- cancellationTokenSource.cancel();
異常的處理
關于異常的處理,整個機制下來,每個任務作為一個獨立的單位,異常會被統(tǒng)一捕捉,因此不必針對任務中的方法進行單獨的處理。
如果使用了 continueWith 鏈接任務,那么當前任務的的異常信息,將會保存在當前 Task 中在下行任務中進行處理,下行任務也可以不處理這個異常,直接執(zhí)行任務,那么這個異常就到這里停止了,不會再向下傳遞,也就是說,只有下行任務才知道當前任務的結果,不管是成功還是異常。
當然了,如果任務之間有關聯(lián),由于上行任務的異常極大可能造成當前任務的異常,那么當前任務異常的信息,又會向下傳遞,但是上行任務的異常就到這里為止了。
如果使用 onSuccess 之類的方法,如果上行任務異常了,那么下行任務根本不會執(zhí)行,而是直接將異常往下面?zhèn)鬟f,直到被處理掉。
任務的分離和組合
我們可以將一個完整的操作細分成多個任務,每個任務都遵循單一職責的原則而盡量簡單,這樣可以在任務之間再穿插新的任務,或者將部分任務分離出來組合到一起等。
擴展性
我們可以在兩個細分的任務之間添加一個新的操作,而不影響上行和下行任務,如我們給文章開頭的需求中更新 UI 之前,將 Bitmap 先保存到本地。
- Task
- .forResult(URL)
- // 下載
- .onSuccess(task -> downloadBitmap(task.getResult()), Task.BACKGROUND_EXECUTOR)
- // 保存在本地
- .onSuccess(task -> saveBitmapToFile(task.getResult()),Task.BACKGROUND_EXECUTOR)
- // 更新UI
- .onSuccess(task -> updateUI(task.getResult()), Task.UI_THREAD_EXECUTOR)
- ...
復用性
對一些公共的操作,可以單獨分離成新的任務,當需要做類似操作時,即可復用這部份功能,如可以將 下載圖片并更新 UI 、 保存狀態(tài)并彈出提示 兩塊功能分離出來,作為公共的任務。
- // 下載圖片->更新UI
- public Continuation<String, Task<String>> downloadImageAndUpdateUI() {
- return task ->
- Task.call(() -> downloadBitmap(task.getResult()), Task.BACKGROUND_EXECUTOR)
- .continueWith(taskWithBitmap -> updateUI(taskWithBitmap.getResult()), Task.UI_THREAD_EXECUTOR);
- }
- // 保存狀態(tài)->提示信息
- public Continuation<String, Task<Boolean>> saveStateAndToast() {
- return task ->
- Task.call(() -> saveUIState(task.getResult()), Task.BACKGROUND_EXECUTOR)
- .continueWith(taskWithPath -> toastMsg("save to " + taskWithPath.getResult()));
- }
使用分離的任務
- Task
- .forResult(URL)
- .continueWithTask(downloadImageAndUpdateUI())
- .continueWithTask(saveStateAndToast())
- ...
總結
在 Task 中有一個 continuations 是當前任務后面追加的任務列表,當當前任務成功、異?;蛘呷∠麜r,會去執(zhí)行列表中的后續(xù)任務。
通常情況下,我們使用鏈式調用構建任務鏈,結果就是一條沒有分支的任務鏈。
添加任務時:每次添加一個 Continuation ,就會生成一個 Task ,加到上行任務的 continuations 列表中,等待執(zhí)行,同時返回當前的 Task ,以便后面的任務可以鏈接到當前任務后面。
執(zhí)行任務時:當前任務執(zhí)行完之后,結果可能有 3 種,都會被保存到當前的 Task 中,然后檢查 continuations 列表中的后續(xù)任務,而當前的 Task 就會作為參數(shù),傳遞到后續(xù)鏈接的任務中,來讓后面的任務得知上行任務的結果。