Kotlin筆記之協(xié)程工作原理
協(xié)程的狀態(tài)機(jī)
這一章會(huì)以下面的代碼為例解析一下協(xié)程啟動(dòng),掛起以及恢復(fù)的流程:
- private suspend fun getId(): String {
- return GlobalScope.async(Dispatchers.IO) {
- delay(1000)
- "hearing"
- }.await()
- }
- private suspend fun getAvatar(id: String): String {
- return GlobalScope.async(Dispatchers.IO) {
- delay(1000)
- "avatar-$id"
- }.await()
- }
- fun main() {
- GlobalScope.launch {
- val id = getId()
- val avatar = getAvatar(id)
- println("${Thread.currentThread().name} - $id - $avatar")
- }
- }
上面 main 方法中,GlobalScope.launch 啟動(dòng)的協(xié)程體在執(zhí)行到 getId 后,協(xié)程體會(huì)掛起,直到 getId 返回可用結(jié)果,才會(huì) resume launch 協(xié)程,執(zhí)行到 getAvatar 也是同樣的過(guò)程。協(xié)程內(nèi)部實(shí)現(xiàn)使用狀態(tài)機(jī)來(lái)處理不同的掛起點(diǎn),將 GlobalScope.launch 協(xié)程體字節(jié)碼反編譯成 Java 代碼,大致如下(有所刪減):
- BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null,
- (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
- int label;
- public final Object invokeSuspend(
- Object $result) {
- Object var10000;
- String id;
- label17: {
- CoroutineScope $this$launch;
- switch(this.label) {
- case 0: // a
- ResultKt.throwOnFailure($result);
- $this$launch = this.p$;
- this.label = 1; // label置為1
- var10000 = getId(this);
- if (var10000 == COROUTINE_SUSPENDED) {
- return COROUTINE_SUSPENDED;
- }
- // 若此時(shí)已經(jīng)有結(jié)果,則不掛起,直接break
- break;
- case 1: // b
- ResultKt.throwOnFailure($result);
- var10000 = $result;
- break;
- case 2: // d
- id = (String)this.L$1;
- ResultKt.throwOnFailure($result);
- var10000 = $result;
- break label17; // 退出label17
- default:
- throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
- }
- // c
- id = (String)var10000;
- this.L$1 = id; // 將id賦給L$1
- this.label = 2; // label置為2
- var10000 = getAvatar(id, this);
- if (var10000 == COROUTINE_SUSPENDED) {
- return COROUTINE_SUSPENDED;
- }
- }
- // e
- String avatar = (String)var10000;
- String var5 = var9.append(var10001.getName()).append(" - ").append(id).append(" - ").append(avatar).toString();
- System.out.println(var5);
- return Unit.INSTANCE;
- }
- public final Continuation create(
- Object value,
- Continuation completion) {
- Intrinsics.checkParameterIsNotNull(completion, "completion");
- Function2 var3 = new <anonymous constructor>(completion);
- var3.p$ = (CoroutineScope)value;
- return var3;
- }
- public final Object invoke(Object var1, Object var2) {
- return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
- }
- }
這里我們根據(jù)上面的注釋以及字母標(biāo)簽來(lái)看一下執(zhí)行流程(invokeSuspend 方法會(huì)在協(xié)程體中的 suspend 函數(shù)得到結(jié)果后被調(diào)用,具體是在哪里被調(diào)用的稍后會(huì)講到):
- a: launch 協(xié)程體剛執(zhí)行到 getId 方法時(shí),getId 方法的返回值將是 COROUTINE_SUSPENDED, 此時(shí)直接 return, 則 launch 協(xié)程體中 getId 后面的代碼暫時(shí)不會(huì)執(zhí)行,即 launch 協(xié)程體被掛起(非阻塞, 該線程依舊會(huì)做其它工作)。這里將 label 置為了 1. 而若此時(shí) getId 已經(jīng)有結(jié)果(內(nèi)部沒(méi)有調(diào)用 delay 之類(lèi)的 suspend 函數(shù)等),則不掛起,而是直接 break。
- b: 若上面 a 中 getId 返回 COROUTINE_SUSPENDED, 則當(dāng) getId 有可用結(jié)果返回后,會(huì)重新執(zhí)行 launch 協(xié)程體的 invokeSuspend 方法,根據(jù)上面的 label==1, 會(huì)執(zhí)行到這里檢查一下 result 沒(méi)問(wèn)題的話就 break, 此時(shí) id 賦值給了 var10000。
- c: 在 a 中若直接 break 或 在 b 中得到 getId 的結(jié)果然后 break 后,都會(huì)執(zhí)行到這里,得到 id 的值并把 label 置為2。然后調(diào)用 getAvatar 方法,跟 getId 類(lèi)似,若其返回 COROUTINE_SUSPENDED 則 return,協(xié)程被掛起,等到下次 invokeSuspend 被執(zhí)行,否則離開(kāi) label17 接著執(zhí)行后續(xù)邏輯。
- d: 若上面 c 中 getAvatar 返回 COROUTINE_SUSPENDED, 則當(dāng) getAvatar 有可用結(jié)果返回后會(huì)重新調(diào)用 launch 協(xié)程體的 invokeSuspend 方法,此時(shí)根據(jù) label==2 來(lái)到這里并取得之前的 id 值,檢驗(yàn) result(即avatar),然后break label17。
- e: c 中直接返回了可用結(jié)果 或 d 中 break label17 后,launch 協(xié)程體中的 suspend 函數(shù)都執(zhí)行完畢了,這里會(huì)執(zhí)行剩下的邏輯。
suspend 函數(shù)不會(huì)阻塞線程,且 suspend 函數(shù)不一定會(huì)掛起協(xié)程,如果相關(guān)調(diào)用的結(jié)果已經(jīng)可用,則繼續(xù)運(yùn)行而不掛起,例如 async{} 返回值 Deferred 的結(jié)果已經(jīng)可用時(shí),await()掛起函數(shù)可以直接返回結(jié)果,不用再掛起協(xié)程。
這一節(jié)看了一下 launch 協(xié)程體反編譯成 Java 后的代碼邏輯,關(guān)于 invokeSuspend 是何時(shí)怎么被調(diào)用的,將會(huì)在下面講到。
協(xié)程的創(chuàng)建與啟動(dòng)
這一節(jié)以 CoroutineScope.launch {} 默認(rèn)參數(shù)為例,從源碼角度看看 Kotlin 協(xié)程是怎樣創(chuàng)建與啟動(dòng)的:
- public fun CoroutineScope.launch(
- context: CoroutineContext = EmptyCoroutineContext,
- start: CoroutineStart = CoroutineStart.DEFAULT,
- block: suspend CoroutineScope.() -> Unit
- ): Job {
- val newContext = newCoroutineContext(context)
- val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true)
- coroutine.start(start, coroutine, block)
- return coroutine
- }
- // AbstractCoroutine.kt
- // receiver: StandaloneCoroutine
- // block: suspend StandaloneCoroutine.() -> Unit
- // private open class StandaloneCoroutine(...) : AbstractCoroutine<Unit>(...) {}
- // public abstract class AbstractCoroutine<in T>(...) : JobSupport(active), Job, Continuation<T>, CoroutineScope {}
- public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
- // 調(diào)用 CoroutineStart 中的 invoke 方法
- start(block, receiver, this)
- }
- public enum class CoroutineStart {
- // block - StandaloneCoroutine.() -> Unit
- // receiver - StandaloneCoroutine
- // completion - StandaloneCoroutine<Unit>
- public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
- when (this) {
- // 根據(jù) start 參數(shù)的類(lèi)型調(diào)用不同的方法
- DEFAULT -> block.startCoroutineCancellable(receiver, completion)
- ATOMIC -> block.startCoroutine(receiver, completion)
- UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
- LAZY -> Unit // will start lazily
- }
- }
接下來(lái)看看 startCoroutineCancellable 方法:
- // receiver - StandaloneCoroutine
- // completion - StandaloneCoroutine<Unit>
- internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
- runSafely(completion) {
- createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))
- }
createCoroutineUnintercepted 方法創(chuàng)建了一個(gè) Continuation 類(lèi)型(協(xié)程)的實(shí)例,即創(chuàng)建了一個(gè)協(xié)程:
- public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
- receiver: R, completion: Continuation<T>
- ): Continuation<Unit> {
- return if (this is BaseContinuationImpl) create(receiver, completion) else // ...
- }
調(diào)用的是 (suspend (R) -> T) 的 createCoroutineUnintercepted 方法,(suspend (R) -> T) 就是協(xié)程體。直接看上面示例代碼中 GlobalScope.launch 編譯后的字節(jié)碼,可以發(fā)現(xiàn) CoroutineScope.launch 傳入的 lambda 表達(dá)式被編譯成了繼承 SuspendLambda 的子類(lèi):
- final class Main$main$1 extends kotlin/coroutines/jvm/internal/SuspendLambda implements kotlin/jvm/functions/Function2
其繼承關(guān)系為: SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation, 因此走 create(receiver, completion) 方法,從上面反編譯出的 Java 代碼可以看到 create 方法創(chuàng)建了一個(gè) Continuation 實(shí)例,再看一下 Kotlin 代碼編譯后的字節(jié)碼(包名已省略):
- public final create(Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation;
- // ...
- NEW Main$main$1
從上面可以看到,create 方法創(chuàng)建了 Main$main$1 實(shí)例,而其繼承自 SuspendLambda, 因此 create 方法創(chuàng)建的 Continuation 是一個(gè) SuspendLambda 對(duì)象。
即 createCoroutineUnintercepted 方法創(chuàng)建了一個(gè) SuspendLambda 實(shí)例。然后看看 intercepted 方法:
- public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
- // 如果是ContinuationImpl類(lèi)型,則調(diào)用intercepted方法,否則返回自身
- // 這里的 this 是 Main$main$1 實(shí)例 - ContinuationImpl的子類(lèi)
- (this as? ContinuationImpl)?.intercepted() ?: this
- // ContinuationImpl
- public fun intercepted(): Continuation<Any?> =
- // context[ContinuationInterceptor]是 CoroutineDispatcher 實(shí)例
- // 需要線程調(diào)度 - 返回 DispatchedContinuation,其 continuation 參數(shù)值為 SuspendLambda
- // 不需要線程調(diào)度 - 返回 SuspendLambda
- intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this).also { intercepted = it }
- // CoroutineDispatcher
- // continuation - SuspendLambda -> ContinuationImpl -> BaseContinuationImpl
- public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
- DispatchedContinuation(this, continuation)
接下來(lái)看看 resumeCancellableWith 是怎么啟動(dòng)協(xié)程的,這里還涉及到Dispatchers線程調(diào)度的邏輯:
- internal class DispatchedContinuation<in T>(
- val dispatcher: CoroutineDispatcher,
- val continuation: Continuation<T>
- ) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
- public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>): Unit = when (this) {
- // 進(jìn)行線程調(diào)度,最后也會(huì)執(zhí)行到continuation.resumeWith方法
- is DispatchedContinuation -> resumeCancellableWith(result)
- // 直接執(zhí)行continuation.resumeWith方法
- else -> resumeWith(result)
- }
- inline fun resumeCancellableWith(result: Result<T>) {
- val state = result.toState()
- // 判斷是否需要線程調(diào)度
- if (dispatcher.isDispatchNeeded(context)) {
- _state = state
- resumeMode = MODE_CANCELLABLE
- // 需要調(diào)度則先進(jìn)行調(diào)度
- dispatcher.dispatch(context, this)
- } else {
- executeUnconfined(state, MODE_CANCELLABLE) {
- if (!resumeCancelled()) {
- // 不需要調(diào)度則直接在當(dāng)前線程執(zhí)行協(xié)程
- resumeUndispatchedWith(result)
- }
- }
- }
- }
- inline fun resumeUndispatchedWith(result: Result<T>) {
- withCoroutineContext(context, countOrElement) {
- continuation.resumeWith(result)
- }
- }
- }
- 當(dāng)需要線程調(diào)度時(shí),則在調(diào)度后會(huì)調(diào)用 DispatchedContinuation.continuation.resumeWith 來(lái)啟動(dòng)協(xié)程,其中 continuation 是 SuspendLambda 實(shí)例;
- 當(dāng)不需要線程調(diào)度時(shí),則直接調(diào)用 SuspendLambda.resumeWith 來(lái)啟動(dòng)協(xié)程。
resumeWith 方法調(diào)用的是父類(lèi) BaseContinuationImpl 中的 resumeWith 方法:
- internal abstract class BaseContinuationImpl(public val completion: Continuation<Any?>?) : Continuation<Any?>, CoroutineStackFrame, Serializable {
- public final override fun resumeWith(result: Result<Any?>) {
- // ...
- val outcome = invokeSuspend(param)
- // ...
- }
- }
因此,協(xié)程的啟動(dòng)是通過(guò) BaseContinuationImpl.resumeWith 方法調(diào)用到了子類(lèi) SuspendLambda.invokeSuspend 方法,然后通過(guò)狀態(tài)機(jī)來(lái)控制順序運(yùn)行。
協(xié)程的掛起和恢復(fù)
Kotlin 編譯器會(huì)為 協(xié)程體 生成繼承自 SuspendLambda 的子類(lèi),協(xié)程的真正運(yùn)算邏輯都在其 invokeSuspend 方法中。上一節(jié)介紹了 launch 是怎么創(chuàng)建和啟動(dòng)協(xié)程的,在這一節(jié)我們?cè)倏纯串?dāng)協(xié)程代碼執(zhí)行到 suspend 函數(shù)后,協(xié)程是怎么被掛起的 以及 當(dāng) suspend 函數(shù)執(zhí)行完成得到可用結(jié)果后是怎么恢復(fù)協(xié)程的。
Kotlin 協(xié)程的內(nèi)部實(shí)現(xiàn)使用了 Kotlin 編譯器的一些編譯技術(shù),當(dāng) suspend 函數(shù)被調(diào)用時(shí),都有一個(gè)隱式的參數(shù)額外傳入,這個(gè)參數(shù)是 Continuation 類(lèi)型,封裝了協(xié)程 resume 后執(zhí)行的代碼邏輯。
- private suspend fun getId(): String {
- return GlobalScope.async(Dispatchers.IO) {
- delay(1000)
- "hearing"
- }.await()
- }
- // Decompile成Java
- final Object getId(
- Continuation $completion) {
- // ...
- }
其中傳入的 $completion 參數(shù),從上一節(jié)可以看到是調(diào)用 getId 方法所在的協(xié)程體對(duì)象,也就是一個(gè) SuspendLambda 對(duì)象。Continuation的定義如下:
- public interface Continuation<in T> {
- public val context: CoroutineContext
- public fun resumeWith(result: Result<T>)
- }
將 getId 方法編譯后的字節(jié)碼反編譯成 Java 代碼如下(為便于閱讀,刪減及修改了部分代碼):
- final Object getId(
- Continuation $completion) {
- // 新建與啟動(dòng)協(xié)程
- return BuildersKt.async$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getIO(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
- int label;
- public final Object invokeSuspend(
- Object $result) {
- switch(this.label) {
- case 0:
- ResultKt.throwOnFailure($result);
- this.label = 1;
- if (DelayKt.delay(1000L, this) == COROUTINE_SUSPENDED) {
- return COROUTINE_SUSPENDED;
- }
- break;
- case 1:
- ResultKt.throwOnFailure($result);
- break;
- default:
- throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
- }
- return "hearing";
- }
- // ...
- }), 2, (Object)null).await($completion); // 調(diào)用 await() suspend 函數(shù)
- }
結(jié)合協(xié)程的狀態(tài)機(jī)一節(jié),當(dāng)上面的 launch 協(xié)程體執(zhí)行到 getId 方法時(shí), 會(huì)根據(jù)其返回值是否為 COROUTINE_SUSPENDED 來(lái)決定是否掛起,由于 getId 的邏輯是通過(guò) async 啟動(dòng)一個(gè)新的協(xié)程,協(xié)程體內(nèi)調(diào)用了 suspend delay 方法,然后通過(guò) await suspend 函數(shù)等待結(jié)果,當(dāng) async 協(xié)程沒(méi)完成時(shí), await 會(huì)返回 COROUTINE_SUSPENDED, 因此 launch 協(xié)程體的 invokeSuspend 方法直接 return COROUTINE_SUSPENDED 值執(zhí)行完成,此時(shí) launch 啟動(dòng)的協(xié)程處于掛起狀態(tài)但不阻塞所處線程,而 async 啟動(dòng)的協(xié)程開(kāi)始執(zhí)行。
我們看一下 async 的源碼:
- public fun <T> CoroutineScope.async(...): Deferred<T> {
- val newContext = newCoroutineContext(context)
- val coroutine = if (start.isLazy) LazyDeferredCoroutine(newContext, block) else
- DeferredCoroutine<T>(newContext, active = true)
- coroutine.start(start, coroutine, block)
- return coroutine
- }
默認(rèn)情況下,上面的 coroutine 取 DeferredCoroutine 實(shí)例,于是我們看一下其 await 方法以及在 async 協(xié)程執(zhí)行完成后,是怎么恢復(fù) launch 協(xié)程的:
- private open class DeferredCoroutine<T>(
- parentContext: CoroutineContext, active: Boolean
- ) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> {
- override suspend fun await(): T = awaitInternal() as T
- }
- // JobSupport
- internal suspend fun awaitInternal(): Any? {
- while (true) { // lock-free loop on state
- val state = this.state
- if (state !is Incomplete) {
- // 已經(jīng)完成,則直接返回結(jié)果
- if (state is CompletedExceptionally) { // Slow path to recover stacktrace
- recoverAndThrow(state.cause)
- }
- return state.unboxState()
- }
- // 不需要重試時(shí)直接break,執(zhí)行awaitSuspend
- if (startInternal(state) >= 0) break
- }
- return awaitSuspend() // slow-path
- }
- // suspendCoroutineUninterceptedOrReturn: 獲取當(dāng)前協(xié)程,且掛起當(dāng)前協(xié)程(返回COROUTINE_SUSPENDED)或不掛起直接返回結(jié)果
- private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
- val cont = AwaitContinuation(uCont.intercepted(), this)
- cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler))
- cont.getResult()
- }
上面 awaitInternal 的大致邏輯是當(dāng)掛起函數(shù)已經(jīng)有結(jié)果時(shí)則直接返回,否則掛起父協(xié)程,然后 invokeOnCompletion 方法將 ResumeAwaitOnCompletion 插入一個(gè)隊(duì)列(state.list)中,源碼就不再貼出了。接著看看在 async 執(zhí)行完成后是怎么調(diào)用 ResumeAwaitOnCompletion 來(lái) resume 被掛起的協(xié)程的。注意:不要繞進(jìn) async 協(xié)程體中 delay 是怎么掛起和恢復(fù) async 協(xié)程的這一邏輯,我們不需要關(guān)注這一層!
接著 async 協(xié)程的執(zhí)行往下看,從前面可知它會(huì)調(diào)用 BaseContinuationImpl.resumeWith 方法來(lái)執(zhí)行協(xié)程邏輯,我們?cè)敿?xì)看一下這個(gè)方法,在這里會(huì)執(zhí)行該協(xié)程的 invokeSuspend 函數(shù):
- internal abstract class BaseContinuationImpl(
- public val completion: Continuation<Any?>?
- ) : Continuation<Any?>, CoroutineStackFrame, Serializable {
- public final override fun resumeWith(result: Result<Any?>) {
- var current = this
- var param = result
- while (true) {
- with(current) {
- val completion = completion!! // fail fast when trying to resume continuation without completion
- val outcome: Result<Any?> =
- try {// 調(diào)用 invokeSuspend 方法執(zhí)行協(xié)程邏輯
- val outcome = invokeSuspend(param)
- // 協(xié)程掛起時(shí)返回的是 COROUTINE_SUSPENDED,即協(xié)程掛起時(shí),resumeWith 執(zhí)行結(jié)束
- // 再次調(diào)用 resumeWith 時(shí)協(xié)程掛起點(diǎn)之后的代碼才能繼續(xù)執(zhí)行
- if (outcome === COROUTINE_SUSPENDED) return
- Result.success(outcome)
- } catch (exception: Throwable) {
- Result.failure(exception)
- }
- releaseIntercepted() // this state machine instance is terminating
- if (completion is BaseContinuationImpl) {
- // unrolling recursion via loop
- current = completion
- param = outcome
- } else {
- // top-level completion reached -- invoke and return
- completion.resumeWith(outcome)
- return
- }
- }
- }
- }
- }
我們從上面的源碼可以看到,在 createCoroutineUnintercepted 方法中創(chuàng)建的 SuspendLambda 實(shí)例是 BaseContinuationImpl 的子類(lèi)對(duì)象,其 completion 參數(shù)為下:
- launch: if (isLazy) LazyStandaloneCoroutine else StandaloneCoroutine
- async: if (isLazy) LazyDeferredCoroutine else DeferredCoroutine
上面這幾個(gè)類(lèi)都是 AbstractCoroutine 的子類(lèi)。而根據(jù) completion 的類(lèi)型會(huì)執(zhí)行不同的邏輯:
- BaseContinuationImpl: 執(zhí)行協(xié)程邏輯
- 其它: 調(diào)用 resumeWith 方法,處理協(xié)程的狀態(tài),協(xié)程掛起后的恢復(fù)即與它有關(guān)
在上面的例子中 async 啟動(dòng)的協(xié)程,它也會(huì)調(diào)用其 invokeSuspend 方法執(zhí)行 async 協(xié)程邏輯,假設(shè) async 返回的結(jié)果已經(jīng)可用時(shí),即非 COROUTINE_SUSPENDED 值,此時(shí) completion 是 DeferredCoroutine 對(duì)象,因此會(huì)調(diào)用 DeferredCoroutine.resumeWith 方法,然后返回,父協(xié)程的恢復(fù)邏輯便是在這里。
- // AbstractCoroutine
- public final override fun resumeWith(result: Result<T>) {
- val state = makeCompletingOnce(result.toState())
- if (state === COMPLETING_WAITING_CHILDREN) return
- afterResume(state)
- }
在 makeCompletingOnce 方法中,會(huì)根據(jù) state 去處理協(xié)程狀態(tài),并執(zhí)行上面插入 state.list 隊(duì)列中的 ResumeAwaitOnCompletion.invoke 來(lái)恢復(fù)父協(xié)程,必要的話還會(huì)把 async 的結(jié)果給它,具體代碼實(shí)現(xiàn)太多就不貼了,不是本節(jié)的重點(diǎn)。直接看 ResumeAwaitOnCompletion.invoke 方法:
- private class ResumeAwaitOnCompletion<T>(
- job: JobSupport, private val continuation: CancellableContinuationImpl<T>
- ) : JobNode<JobSupport>(job) {
- override fun invoke(cause: Throwable?) {
- val state = job.state
- assert { state !is Incomplete }
- if (state is CompletedExceptionally) {
- // Resume with with the corresponding exception to preserve it
- continuation.resumeWithException(state.cause)
- } else {
- // resume 被掛起的協(xié)程
- continuation.resume(state.unboxState() as T)
- }
- }
- }
這里的 continuation 就是 launch 協(xié)程體,也就是 SuspendLambda 對(duì)象,于是 invoke 方法會(huì)再一次調(diào)用到 BaseContinuationImpl.resumeWith 方法,接著調(diào)用 SuspendLambda.invokeSuspend, 然后根據(jù) label 取值繼續(xù)執(zhí)行接下來(lái)的邏輯!
suspendCoroutineUninterceptedOrReturn
接下來(lái)我們看一下怎么將一個(gè)基于回調(diào)的方法改造成一個(gè)基于協(xié)程的 suspend 方法,要實(shí)現(xiàn)這個(gè)需求,重點(diǎn)在于 suspendCoroutineUninterceptedOrReturn 方法,根據(jù)注釋?zhuān)@個(gè)方法的作用是: Obtains the current continuation instance inside suspend functions and either suspends currently running coroutine or returns result immediately without suspension. 即獲取當(dāng)前協(xié)程的實(shí)例,并且掛起當(dāng)前協(xié)程或不掛起直接返回結(jié)果。函數(shù)定義如下:
- public suspend inline fun <T> suspendCoroutineUninterceptedOrReturn(crossinline block: (Continuation<T>) -> Any?): T {
- // ...
- }
根據(jù) block 的返回值,有兩種情況:
- 如果 block 返回 COROUTINE_SUSPENDED, 意味著 suspend 函數(shù)會(huì)掛起當(dāng)前協(xié)程而不會(huì)立即返回結(jié)果。這種情況下, block 中的 Continuation 需要在結(jié)果可用后調(diào)用 Continuation.resumeWith 來(lái) resume 協(xié)程。
- 如果 block 返回的 T 是 suspend 函數(shù)的結(jié)果,則協(xié)程不會(huì)被掛起, block 中的 Continuation 不會(huì)被調(diào)用。
調(diào)用 Continuation.resumeWith 會(huì)直接在調(diào)用者的線程 resume 協(xié)程,而不會(huì)經(jīng)過(guò) CoroutineContext 中可能存在的 ContinuationInterceptor。建議使用更安全的 suspendCoroutine 方法,在其 block 中可以同步或在異步線程調(diào)用 Continuation.resume 和 Continuation.resumeWithException:
- public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T {
- contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
- return suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
- // 調(diào)用攔截器
- val safe = SafeContinuation(c.intercepted())
- block(safe)
- safe.getOrThrow()
- }
- }
此外除了 suspendCoroutine 方法,還有 suspendCancellableCoroutine, suspendAtomicCancellableCoroutine, suspendAtomicCancellableCoroutineReusable 等方法都可以用來(lái)將異步回調(diào)的方法封裝成 suspend 函數(shù)。
下面來(lái)看一個(gè)例子來(lái)介紹怎么將異步回調(diào)函數(shù)封裝成 suspend 函數(shù):
- class NetFetcher {
- // 將下面的 request 方法封裝成 suspend 方法
- suspend fun requestSuspend(id: Int): String = suspendCoroutine { continuation ->
- request(id, object : OnResponseListener {
- override fun onResponse(response: String) {
- continuation.resume(response)
- }
- override fun onError(error: String) {
- continuation.resumeWithException(Exception(error))
- }
- })
- }
- fun request(id: Int, listener: OnResponseListener) {
- Thread.sleep(5000)
- if (id % 2 == 0) {
- listener.onResponse("success")
- } else {
- listener.onError("error")
- }
- }
- interface OnResponseListener {
- fun onResponse(response: String)
- fun onError(error: String)
- }
- }
- object Main {
- fun main() {
- requestByCoroutine()
- }
- // 使用回調(diào)
- private fun requestByCallback() {
- NetFetcher().request(21, object : NetFetcher.OnResponseListener {
- override fun onResponse(response: String) {
- println("result = $response")
- }
- override fun onError(error: String) {
- println("result = $error")
- }
- })
- }
- // 使用協(xié)程
- private fun requestByCoroutine() {
- GlobalScope.launch(Dispatchers.Main) {
- val result = withContext(Dispatchers.IO) {
- try {
- NetFetcher().requestSuspend(22)
- } catch (e: Exception) {
- e.message
- }
- }
為加深理解,再介紹一下 Kotlin 提供的兩個(gè)借助 suspendCancellableCoroutine 實(shí)現(xiàn)的掛起函數(shù): delay & yield。
delay
delay 方法借助了 suspendCancellableCoroutine 方法來(lái)掛起協(xié)程:
- public suspend fun delay(timeMillis: Long) {
- if (timeMillis <= 0) return // don't delay
- return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
- cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
- }
- }
- override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
- postDelayed(Runnable {
- with(continuation) { resumeUndispatched(Unit) }
- }, timeMillis)
- }
可以看出這里 delay 的邏輯類(lèi)似于 Handle 機(jī)制,將 resumeUndispatched 封裝的 Runnable 放到一個(gè)隊(duì)列中,在延遲的時(shí)間到達(dá)便會(huì)執(zhí)行 resume 恢復(fù)協(xié)程。
yield
yield 方法作用是掛起當(dāng)前協(xié)程,這樣可以讓該協(xié)程所在線程運(yùn)行其他邏輯,當(dāng)其他協(xié)程執(zhí)行完成或也調(diào)用 yield 讓出執(zhí)行權(quán)時(shí),之前的協(xié)程可以恢復(fù)執(zhí)行。
- launch(Dispatchers.Main) {
- repeat(3) {
- println("job1 $it")
- yield()
- }
- }
- launch(Dispatchers.Main) {
- repeat(3) {
- println("job2 $it")
- yield()
- }
- }
- // output
- job1 0
- job2 0
- job1 1
- job2 1
- job1 2
- job2 2
看一下 yield 的源碼:
- public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
- val context = uCont.context
- // 如果協(xié)程沒(méi)有調(diào)度器,或者像 Unconfined 一樣沒(méi)有進(jìn)行調(diào)度則直接返回
- val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit
- if (cont.dispatcher.isDispatchNeeded(context)) {
- // this is a regular dispatcher -- do simple dispatchYield
- cont.dispatchYield(context, Unit)
- } else {
- // This is either an "immediate" dispatcher or the Unconfined dispatcher
- // ...
- }
- COROUTINE_SUSPENDED
- }
- // DispatchedContinuation
- internal fun dispatchYield(context: CoroutineContext, value: T) {
- _state = value
- resumeMode = MODE_CANCELLABLE
- dispatcher.dispatchYield(context, this)
- }
- public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block)
可知 dispatchYield 會(huì)調(diào)用到 dispatcher.dispatch 方法將協(xié)程分發(fā)到調(diào)度器隊(duì)列中,這樣線程可以執(zhí)行其他協(xié)程,等到調(diào)度器再次執(zhí)行到該協(xié)程時(shí),會(huì) resume 該協(xié)程。
總結(jié)
通過(guò)上面協(xié)程的工作原理解析,可以從源碼中發(fā)現(xiàn) Kotlin 中的協(xié)程存在著三層包裝:
- 第一層包裝: launch & async 返回的 Job, Deferred 繼承自 AbstractCoroutine, 里面封裝了協(xié)程的狀態(tài),提供了 cancel 等接口;
- 第二層包裝: 編譯器生成的 SuspendLambda 子類(lèi),封裝了協(xié)程的真正執(zhí)行邏輯,其繼承關(guān)系為 SuspendLambda -> ContinuationImpl -> BaseContinuationImpl, 它的 completion 參數(shù)就是第一層包裝實(shí)例;
- 第三層包裝: DispatchedContinuation, 封裝了線程調(diào)度邏輯,它的 continuation 參數(shù)就是第二層包裝實(shí)例。
這三層包裝都實(shí)現(xiàn)了 Continuation 接口,通過(guò)代理模式將協(xié)程的各層包裝組合在一起,每層負(fù)責(zé)不同的功能,如下圖: