真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Kotlin協(xié)程的工作原理是什么

這篇文章主要講解了“Kotlin協(xié)程的工作原理是什么”,文中的講解內(nèi)容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Kotlin協(xié)程的工作原理是什么”吧!

成都創(chuàng)新互聯(lián)公司堅持“要么做到,要么別承諾”的工作理念,服務領域包括:做網(wǎng)站、成都做網(wǎng)站、企業(yè)官網(wǎng)、英文網(wǎng)站、手機端網(wǎng)站、網(wǎng)站推廣等服務,滿足客戶于互聯(lián)網(wǎng)時代的松陽網(wǎng)站設計、移動媒體設計的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡建設合作伙伴!

協(xié)程的狀態(tài)機

這一章會以下面的代碼為例解析一下協(xié)程啟動,掛起以及恢復的流程:

private suspend fun getId(): String {     return GlobalScope.async(Dispatchers.IO) {         delay(1000)         "hearing"     }.await() }  private suspend fun getAvatar(id: String): String {     return GlobalScope.async(Dispatchers.IO) {         delay(1000)         "avatar-$id"     }.await() }  fun main() {     GlobalScope.launch {         val id = getId()         val avatar = getAvatar(id)         println("${Thread.currentThread().name} - $id - $avatar")     } }

上面 main 方法中,GlobalScope.launch 啟動的協(xié)程體在執(zhí)行到 getId 后,協(xié)程體會掛起,直到 getId 返回可用結果,才會  resume launch 協(xié)程,執(zhí)行到 getAvatar 也是同樣的過程。協(xié)程內(nèi)部實現(xiàn)使用狀態(tài)機來處理不同的掛起點,將 GlobalScope.launch  協(xié)程體字節(jié)碼反編譯成 Java 代碼,大致如下(有所刪減):

BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null,     (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {     int label;      public final Object invokeSuspend(   Object $result) {         Object var10000;         String id;         label17: {             CoroutineScope $this$launch;             switch(this.label) {             case 0: // a                 ResultKt.throwOnFailure($result);                 $this$launch = this.p$;                 this.label = 1; // label置為1                 var10000 = getId(this);                 if (var10000 == COROUTINE_SUSPENDED) {                     return COROUTINE_SUSPENDED;                 }                 // 若此時已經(jīng)有結果,則不掛起,直接break                 break;             case 1: // b                 ResultKt.throwOnFailure($result);                 var10000 = $result;                 break;             case 2: // d                 id = (String)this.L$1;                 ResultKt.throwOnFailure($result);                 var10000 = $result;                 break label17; // 退出label17             default:                 throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");             }             // c             id = (String)var10000;             this.L$1 = id; // 將id賦給L$1             this.label = 2; // label置為2             var10000 = getAvatar(id, this);             if (var10000 == COROUTINE_SUSPENDED) {                 return COROUTINE_SUSPENDED;             }         }         // e         String avatar = (String)var10000;         String var5 = var9.append(var10001.getName()).append(" - ").append(id).append(" - ").append(avatar).toString();         System.out.println(var5);         return Unit.INSTANCE;     }             public final Continuation create(   Object value,    Continuation completion) {         Intrinsics.checkParameterIsNotNull(completion, "completion");         Function2 var3 = new (completion);         var3.p$ = (CoroutineScope)value;         return var3;     }      public final Object invoke(Object var1, Object var2) {         return (()this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);     } }

這里我們根據(jù)上面的注釋以及字母標簽來看一下執(zhí)行流程(invokeSuspend 方法會在協(xié)程體中的 suspend  函數(shù)得到結果后被調用,具體是在哪里被調用的稍后會講到):

  • a: launch 協(xié)程體剛執(zhí)行到 getId 方法時,getId 方法的返回值將是 COROUTINE_SUSPENDED, 此時直接 return,  則 launch 協(xié)程體中 getId 后面的代碼暫時不會執(zhí)行,即 launch 協(xié)程體被掛起(非阻塞, 該線程依舊會做其它工作)。這里將 label 置為了  1. 而若此時 getId 已經(jīng)有結果(內(nèi)部沒有調用 delay 之類的 suspend 函數(shù)等),則不掛起,而是直接 break。

  • b: 若上面 a 中 getId 返回 COROUTINE_SUSPENDED, 則當 getId 有可用結果返回后,會重新執(zhí)行 launch 協(xié)程體的  invokeSuspend 方法,根據(jù)上面的 label==1, 會執(zhí)行到這里檢查一下 result 沒問題的話就 break, 此時 id 賦值給了  var10000。

  • c: 在 a 中若直接 break 或 在 b 中得到 getId 的結果然后 break 后,都會執(zhí)行到這里,得到 id 的值并把 label  置為2。然后調用 getAvatar 方法,跟 getId 類似,若其返回 COROUTINE_SUSPENDED 則 return,協(xié)程被掛起,等到下次  invokeSuspend 被執(zhí)行,否則離開 label17 接著執(zhí)行后續(xù)邏輯。

  • d: 若上面 c 中 getAvatar 返回 COROUTINE_SUSPENDED, 則當 getAvatar 有可用結果返回后會重新調用  launch 協(xié)程體的 invokeSuspend 方法,此時根據(jù) label==2 來到這里并取得之前的 id 值,檢驗  result(即avatar),然后break label17。

  • e: c 中直接返回了可用結果 或 d 中 break label17 后,launch 協(xié)程體中的 suspend  函數(shù)都執(zhí)行完畢了,這里會執(zhí)行剩下的邏輯。

suspend 函數(shù)不會阻塞線程,且 suspend 函數(shù)不一定會掛起協(xié)程,如果相關調用的結果已經(jīng)可用,則繼續(xù)運行而不掛起,例如 async{} 返回值  Deferred 的結果已經(jīng)可用時,await()掛起函數(shù)可以直接返回結果,不用再掛起協(xié)程。

這一節(jié)看了一下 launch 協(xié)程體反編譯成 Java 后的代碼邏輯,關于 invokeSuspend 是何時怎么被調用的,將會在下面講到。

協(xié)程的創(chuàng)建與啟動

這一節(jié)以 CoroutineScope.launch {} 默認參數(shù)為例,從源碼角度看看 Kotlin 協(xié)程是怎樣創(chuàng)建與啟動的:

public fun CoroutineScope.launch(     context: CoroutineContext = EmptyCoroutineContext,     start: CoroutineStart = CoroutineStart.DEFAULT,     block: suspend CoroutineScope.() -> Unit ): Job {     val newContext = newCoroutineContext(context)     val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true)     coroutine.start(start, coroutine, block)     return coroutine }  // AbstractCoroutine.kt // receiver: StandaloneCoroutine // block: suspend StandaloneCoroutine.() -> Unit // private open class StandaloneCoroutine(...) : AbstractCoroutine(...) {} // public abstract class AbstractCoroutine(...) : JobSupport(active), Job, Continuation, CoroutineScope {} public fun  start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {     // 調用 CoroutineStart 中的 invoke 方法     start(block, receiver, this) }  public enum class CoroutineStart {     // block - StandaloneCoroutine.() -> Unit     // receiver - StandaloneCoroutine     // completion - StandaloneCoroutine     public operator fun  invoke(block: suspend R.() -> T, receiver: R, completion: Continuation): Unit =         when (this) {             // 根據(jù) start 參數(shù)的類型調用不同的方法             DEFAULT -> block.startCoroutineCancellable(receiver, completion)             ATOMIC -> block.startCoroutine(receiver, completion)             UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)             LAZY -> Unit // will start lazily         } }

接下來看看 startCoroutineCancellable 方法:

// receiver - StandaloneCoroutine // completion - StandaloneCoroutine internal fun  (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation) =     runSafely(completion) {         createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))     }

createCoroutineUnintercepted 方法創(chuàng)建了一個 Continuation 類型(協(xié)程)的實例,即創(chuàng)建了一個協(xié)程:

public actual fun  (suspend R.() -> T).createCoroutineUnintercepted(     receiver: R, completion: Continuation ): Continuation {     return if (this is BaseContinuationImpl) create(receiver, completion) else // ... }

調用的是 (suspend (R) -> T) 的 createCoroutineUnintercepted 方法,(suspend (R)  -> T) 就是協(xié)程體。直接看上面示例代碼中 GlobalScope.launch 編譯后的字節(jié)碼,可以發(fā)現(xiàn) CoroutineScope.launch  傳入的 lambda 表達式被編譯成了繼承 SuspendLambda 的子類:

final class Main$main$1 extends kotlin/coroutines/jvm/internal/SuspendLambda implements kotlin/jvm/functions/Function2

其繼承關系為: SuspendLambda -> ContinuationImpl -> BaseContinuationImpl ->  Continuation, 因此走 create(receiver, completion) 方法,從上面反編譯出的 Java 代碼可以看到 create  方法創(chuàng)建了一個 Continuation 實例,再看一下 Kotlin 代碼編譯后的字節(jié)碼(包名已省略):

public final create(Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation; // ... NEW Main$main$1

從上面可以看到,create 方法創(chuàng)建了 Main$main$1 實例,而其繼承自 SuspendLambda, 因此 create 方法創(chuàng)建的  Continuation 是一個 SuspendLambda 對象。

即 createCoroutineUnintercepted 方法創(chuàng)建了一個 SuspendLambda 實例。然后看看 intercepted  方法:

public actual fun  Continuation.intercepted(): Continuation =     // 如果是ContinuationImpl類型,則調用intercepted方法,否則返回自身     // 這里的 this 是 Main$main$1 實例 - ContinuationImpl的子類     (this as? ContinuationImpl)?.intercepted() ?: this  // ContinuationImpl public fun intercepted(): Continuation =     // context[ContinuationInterceptor]是 CoroutineDispatcher 實例     // 需要線程調度 - 返回 DispatchedContinuation,其 continuation 參數(shù)值為 SuspendLambda     // 不需要線程調度 - 返回 SuspendLambda     intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this).also { intercepted = it }  // CoroutineDispatcher // continuation - SuspendLambda -> ContinuationImpl -> BaseContinuationImpl public final override fun  interceptContinuation(continuation: Continuation): Continuation =     DispatchedContinuation(this, continuation)

接下來看看 resumeCancellableWith 是怎么啟動協(xié)程的,這里還涉及到Dispatchers線程調度的邏輯:

internal class DispatchedContinuation(        val dispatcher: CoroutineDispatcher,        val continuation: Continuation ) : DispatchedTask(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation by continuation {     public fun  Continuation.resumeCancellableWith(result: Result): Unit = when (this) {         // 進行線程調度,最后也會執(zhí)行到continuation.resumeWith方法         is DispatchedContinuation -> resumeCancellableWith(result)         // 直接執(zhí)行continuation.resumeWith方法         else -> resumeWith(result)     }      inline fun resumeCancellableWith(result: Result) {         val state = result.toState()         // 判斷是否需要線程調度         if (dispatcher.isDispatchNeeded(context)) {             _state = state             resumeMode = MODE_CANCELLABLE             // 需要調度則先進行調度             dispatcher.dispatch(context, this)         } else {             executeUnconfined(state, MODE_CANCELLABLE) {                 if (!resumeCancelled()) {                     // 不需要調度則直接在當前線程執(zhí)行協(xié)程                     resumeUndispatchedWith(result)                 }             }         }     }      inline fun resumeUndispatchedWith(result: Result) {         withCoroutineContext(context, countOrElement) {             continuation.resumeWith(result)         }     } }
  • 當需要線程調度時,則在調度后會調用 DispatchedContinuation.continuation.resumeWith 來啟動協(xié)程,其中  continuation 是 SuspendLambda 實例;

  • 當不需要線程調度時,則直接調用 SuspendLambda.resumeWith 來啟動協(xié)程。

resumeWith 方法調用的是父類 BaseContinuationImpl 中的 resumeWith 方法:

internal abstract class BaseContinuationImpl(public val completion: Continuation?) : Continuation, CoroutineStackFrame, Serializable {     public final override fun resumeWith(result: Result) {         // ...         val outcome = invokeSuspend(param)         // ...     } }

因此,協(xié)程的啟動是通過 BaseContinuationImpl.resumeWith 方法調用到了子類  SuspendLambda.invokeSuspend 方法,然后通過狀態(tài)機來控制順序運行。

協(xié)程的掛起和恢復

Kotlin 編譯器會為 協(xié)程體 生成繼承自 SuspendLambda 的子類,協(xié)程的真正運算邏輯都在其 invokeSuspend  方法中。上一節(jié)介紹了 launch 是怎么創(chuàng)建和啟動協(xié)程的,在這一節(jié)我們再看看當協(xié)程代碼執(zhí)行到 suspend 函數(shù)后,協(xié)程是怎么被掛起的 以及 當  suspend 函數(shù)執(zhí)行完成得到可用結果后是怎么恢復協(xié)程的。

Kotlin 協(xié)程的內(nèi)部實現(xiàn)使用了 Kotlin 編譯器的一些編譯技術,當 suspend 函數(shù)被調用時,都有一個隱式的參數(shù)額外傳入,這個參數(shù)是  Continuation 類型,封裝了協(xié)程 resume 后執(zhí)行的代碼邏輯。

private suspend fun getId(): String {     return GlobalScope.async(Dispatchers.IO) {         delay(1000)         "hearing"     }.await() }  // Decompile成Java final Object getId(   Continuation $completion) {     // ... }

其中傳入的 $completion 參數(shù),從上一節(jié)可以看到是調用 getId 方法所在的協(xié)程體對象,也就是一個 SuspendLambda  對象。Continuation的定義如下:

public interface Continuation {     public val context: CoroutineContext      public fun resumeWith(result: Result) }

將 getId 方法編譯后的字節(jié)碼反編譯成 Java 代碼如下(為便于閱讀,刪減及修改了部分代碼):

final Object getId(   Continuation $completion) {     // 新建與啟動協(xié)程     return BuildersKt.async$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getIO(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {         int label;           public final Object invokeSuspend(   Object $result) {             switch(this.label) {             case 0:                 ResultKt.throwOnFailure($result);                 this.label = 1;                 if (DelayKt.delay(1000L, this) == COROUTINE_SUSPENDED) {                     return COROUTINE_SUSPENDED;                 }                 break;             case 1:                 ResultKt.throwOnFailure($result);                 break;             default:                 throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");             }             return "hearing";         }          // ...     }), 2, (Object)null).await($completion); // 調用 await() suspend 函數(shù) }

結合協(xié)程的狀態(tài)機一節(jié),當上面的 launch 協(xié)程體執(zhí)行到 getId 方法時, 會根據(jù)其返回值是否為 COROUTINE_SUSPENDED  來決定是否掛起,由于 getId 的邏輯是通過 async 啟動一個新的協(xié)程,協(xié)程體內(nèi)調用了 suspend delay 方法,然后通過 await  suspend 函數(shù)等待結果,當 async 協(xié)程沒完成時, await 會返回 COROUTINE_SUSPENDED, 因此 launch 協(xié)程體的  invokeSuspend 方法直接 return COROUTINE_SUSPENDED 值執(zhí)行完成,此時 launch  啟動的協(xié)程處于掛起狀態(tài)但不阻塞所處線程,而 async 啟動的協(xié)程開始執(zhí)行。

我們看一下 async 的源碼:

public fun  CoroutineScope.async(...): Deferred {     val newContext = newCoroutineContext(context)     val coroutine = if (start.isLazy) LazyDeferredCoroutine(newContext, block) else         DeferredCoroutine(newContext, active = true)     coroutine.start(start, coroutine, block)     return coroutine }

默認情況下,上面的 coroutine 取 DeferredCoroutine 實例,于是我們看一下其 await 方法以及在 async  協(xié)程執(zhí)行完成后,是怎么恢復 launch 協(xié)程的:

private open class DeferredCoroutine(     parentContext: CoroutineContext, active: Boolean ) : AbstractCoroutine(parentContext, active), Deferred, SelectClause1 {     override suspend fun await(): T = awaitInternal() as T }  // JobSupport internal suspend fun awaitInternal(): Any? {     while (true) { // lock-free loop on state         val state = this.state         if (state !is Incomplete) {             // 已經(jīng)完成,則直接返回結果             if (state is CompletedExceptionally) { // Slow path to recover stacktrace                 recoverAndThrow(state.cause)             }             return state.unboxState()         }         // 不需要重試時直接break,執(zhí)行awaitSuspend         if (startInternal(state) >= 0) break     }     return awaitSuspend() // slow-path }  // suspendCoroutineUninterceptedOrReturn: 獲取當前協(xié)程,且掛起當前協(xié)程(返回COROUTINE_SUSPENDED)或不掛起直接返回結果 private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->     val cont = AwaitContinuation(uCont.intercepted(), this)     cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler))     cont.getResult() }

上面 awaitInternal 的大致邏輯是當掛起函數(shù)已經(jīng)有結果時則直接返回,否則掛起父協(xié)程,然后 invokeOnCompletion 方法將  ResumeAwaitOnCompletion 插入一個隊列(state.list)中,源碼就不再貼出了。接著看看在 async 執(zhí)行完成后是怎么調用  ResumeAwaitOnCompletion 來 resume 被掛起的協(xié)程的。注意:不要繞進 async 協(xié)程體中 delay 是怎么掛起和恢復 async  協(xié)程的這一邏輯,我們不需要關注這一層!

接著 async 協(xié)程的執(zhí)行往下看,從前面可知它會調用 BaseContinuationImpl.resumeWith  方法來執(zhí)行協(xié)程邏輯,我們詳細看一下這個方法,在這里會執(zhí)行該協(xié)程的 invokeSuspend 函數(shù):

internal abstract class BaseContinuationImpl(     public val completion: Continuation? ) : Continuation, CoroutineStackFrame, Serializable {     public final override fun resumeWith(result: Result) {         var current = this         var param = result         while (true) {             with(current) {                 val completion = completion!! // fail fast when trying to resume continuation without completion                 val outcome: Result =                     try {// 調用 invokeSuspend 方法執(zhí)行協(xié)程邏輯                         val outcome = invokeSuspend(param)                         // 協(xié)程掛起時返回的是 COROUTINE_SUSPENDED,即協(xié)程掛起時,resumeWith 執(zhí)行結束                         // 再次調用 resumeWith 時協(xié)程掛起點之后的代碼才能繼續(xù)執(zhí)行                         if (outcome === COROUTINE_SUSPENDED) return                         Result.success(outcome)                     } catch (exception: Throwable) {                         Result.failure(exception)                     }                 releaseIntercepted() // this state machine instance is terminating                 if (completion is BaseContinuationImpl) {                     // unrolling recursion via loop                     current = completion                     param = outcome                 } else {                     // top-level completion reached -- invoke and return                     completion.resumeWith(outcome)                     return                 }             }         }     } }

我們從上面的源碼可以看到,在 createCoroutineUnintercepted 方法中創(chuàng)建的 SuspendLambda 實例是  BaseContinuationImpl 的子類對象,其 completion 參數(shù)為下:

  • launch: if (isLazy) LazyStandaloneCoroutine else StandaloneCoroutine

  • async: if (isLazy) LazyDeferredCoroutine else DeferredCoroutine

上面這幾個類都是 AbstractCoroutine 的子類。而根據(jù) completion 的類型會執(zhí)行不同的邏輯:

  • BaseContinuationImpl: 執(zhí)行協(xié)程邏輯

  • 其它: 調用 resumeWith 方法,處理協(xié)程的狀態(tài),協(xié)程掛起后的恢復即與它有關

在上面的例子中 async 啟動的協(xié)程,它也會調用其 invokeSuspend 方法執(zhí)行 async 協(xié)程邏輯,假設 async  返回的結果已經(jīng)可用時,即非 COROUTINE_SUSPENDED 值,此時 completion 是 DeferredCoroutine 對象,因此會調用  DeferredCoroutine.resumeWith 方法,然后返回,父協(xié)程的恢復邏輯便是在這里。

// AbstractCoroutine public final override fun resumeWith(result: Result) {     val state = makeCompletingOnce(result.toState())     if (state === COMPLETING_WAITING_CHILDREN) return     afterResume(state) }

在 makeCompletingOnce 方法中,會根據(jù) state 去處理協(xié)程狀態(tài),并執(zhí)行上面插入 state.list 隊列中的  ResumeAwaitOnCompletion.invoke 來恢復父協(xié)程,必要的話還會把 async  的結果給它,具體代碼實現(xiàn)太多就不貼了,不是本節(jié)的重點。直接看 ResumeAwaitOnCompletion.invoke 方法:

private class ResumeAwaitOnCompletion(     job: JobSupport, private val continuation: CancellableContinuationImpl ) : JobNode(job) {     override fun invoke(cause: Throwable?) {         val state = job.state         assert { state !is Incomplete }         if (state is CompletedExceptionally) {             // Resume with with the corresponding exception to preserve it             continuation.resumeWithException(state.cause)         } else {             // resume 被掛起的協(xié)程             continuation.resume(state.unboxState() as T)         }     } }

這里的 continuation 就是 launch 協(xié)程體,也就是 SuspendLambda 對象,于是 invoke 方法會再一次調用到  BaseContinuationImpl.resumeWith 方法,接著調用 SuspendLambda.invokeSuspend, 然后根據(jù) label  取值繼續(xù)執(zhí)行接下來的邏輯!

suspendCoroutineUninterceptedOrReturn

接下來我們看一下怎么將一個基于回調的方法改造成一個基于協(xié)程的 suspend 方法,要實現(xiàn)這個需求,重點在于  suspendCoroutineUninterceptedOrReturn 方法,根據(jù)注釋,這個方法的作用是: Obtains the current  continuation instance inside suspend functions and either suspends currently  running coroutine or returns result immediately without suspension.  即獲取當前協(xié)程的實例,并且掛起當前協(xié)程或不掛起直接返回結果。函數(shù)定義如下:

public suspend inline fun  suspendCoroutineUninterceptedOrReturn(crossinline block: (Continuation) -> Any?): T {     // ... }

根據(jù) block 的返回值,有兩種情況:

  • 如果 block 返回 COROUTINE_SUSPENDED, 意味著 suspend 函數(shù)會掛起當前協(xié)程而不會立即返回結果。這種情況下, block  中的 Continuation 需要在結果可用后調用 Continuation.resumeWith 來 resume 協(xié)程。

  • 如果 block 返回的 T 是 suspend 函數(shù)的結果,則協(xié)程不會被掛起, block 中的 Continuation 不會被調用。

調用 Continuation.resumeWith 會直接在調用者的線程 resume 協(xié)程,而不會經(jīng)過 CoroutineContext 中可能存在的  ContinuationInterceptor。建議使用更安全的 suspendCoroutine 方法,在其 block 中可以同步或在異步線程調用  Continuation.resume 和 Continuation.resumeWithException:

public suspend inline fun  suspendCoroutine(crossinline block: (Continuation) -> Unit): T {     contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }     return suspendCoroutineUninterceptedOrReturn { c: Continuation ->         // 調用攔截器         val safe = SafeContinuation(c.intercepted())         block(safe)         safe.getOrThrow()     } }

此外除了 suspendCoroutine 方法,還有 suspendCancellableCoroutine,  suspendAtomicCancellableCoroutine, suspendAtomicCancellableCoroutineReusable  等方法都可以用來將異步回調的方法封裝成 suspend 函數(shù)。

下面來看一個例子來介紹怎么將異步回調函數(shù)封裝成 suspend 函數(shù):

class NetFetcher {     // 將下面的 request 方法封裝成 suspend 方法     suspend fun requestSuspend(id: Int): String = suspendCoroutine { continuation ->         request(id, object : OnResponseListener {             override fun onResponse(response: String) {                 continuation.resume(response)             }              override fun onError(error: String) {                 continuation.resumeWithException(Exception(error))             }         })     }      fun request(id: Int, listener: OnResponseListener) {         Thread.sleep(5000)         if (id % 2 == 0) {             listener.onResponse("success")         } else {             listener.onError("error")         }     }      interface OnResponseListener {         fun onResponse(response: String)         fun onError(error: String)     } }  object Main {     fun main() {         requestByCoroutine()     }      // 使用回調     private fun requestByCallback() {         NetFetcher().request(21, object : NetFetcher.OnResponseListener {             override fun onResponse(response: String) {                 println("result = $response")             }              override fun onError(error: String) {                 println("result = $error")             }         })     }      // 使用協(xié)程     private fun requestByCoroutine() {         GlobalScope.launch(Dispatchers.Main) {             val result = withContext(Dispatchers.IO) {                 try {                     NetFetcher().requestSuspend(22)                 } catch (e: Exception) {                     e.message                 }             }

為加深理解,再介紹一下 Kotlin 提供的兩個借助 suspendCancellableCoroutine 實現(xiàn)的掛起函數(shù): delay &  yield。

delay

delay 方法借助了 suspendCancellableCoroutine 方法來掛起協(xié)程:

public suspend fun delay(timeMillis: Long) {     if (timeMillis <= 0) return // don't delay     return suspendCancellableCoroutine sc@ { cont: CancellableContinuation ->         cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)     } }  override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) {     postDelayed(Runnable {         with(continuation) { resumeUndispatched(Unit) }     }, timeMillis) }

可以看出這里 delay 的邏輯類似于 Handle 機制,將 resumeUndispatched 封裝的 Runnable  放到一個隊列中,在延遲的時間到達便會執(zhí)行 resume 恢復協(xié)程。

yield

yield 方法作用是掛起當前協(xié)程,這樣可以讓該協(xié)程所在線程運行其他邏輯,當其他協(xié)程執(zhí)行完成或也調用 yield  讓出執(zhí)行權時,之前的協(xié)程可以恢復執(zhí)行。

launch(Dispatchers.Main) {     repeat(3) {         println("job1 $it")         yield()     } } launch(Dispatchers.Main) {     repeat(3) {         println("job2 $it")         yield()     } }  // output job1 0 job2 0 job1 1 job2 1 job1 2 job2 2

看一下 yield 的源碼:

public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->     val context = uCont.context     // 如果協(xié)程沒有調度器,或者像 Unconfined 一樣沒有進行調度則直接返回     val cont = uCont.intercepted() as? DispatchedContinuation ?: return@sc Unit     if (cont.dispatcher.isDispatchNeeded(context)) {         // this is a regular dispatcher -- do simple dispatchYield         cont.dispatchYield(context, Unit)     } else {         // This is either an "immediate" dispatcher or the Unconfined dispatcher         // ...     }     COROUTINE_SUSPENDED }  // DispatchedContinuation internal fun dispatchYield(context: CoroutineContext, value: T) {     _state = value     resumeMode = MODE_CANCELLABLE     dispatcher.dispatchYield(context, this) }  public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block)

可知 dispatchYield 會調用到 dispatcher.dispatch  方法將協(xié)程分發(fā)到調度器隊列中,這樣線程可以執(zhí)行其他協(xié)程,等到調度器再次執(zhí)行到該協(xié)程時,會 resume 該協(xié)程。

總結

通過上面協(xié)程的工作原理解析,可以從源碼中發(fā)現(xiàn) Kotlin 中的協(xié)程存在著三層包裝:

  • 第一層包裝: launch & async 返回的 Job, Deferred 繼承自 AbstractCoroutine,  里面封裝了協(xié)程的狀態(tài),提供了 cancel 等接口;

  • 第二層包裝: 編譯器生成的 SuspendLambda 子類,封裝了協(xié)程的真正執(zhí)行邏輯,其繼承關系為 SuspendLambda ->  ContinuationImpl -> BaseContinuationImpl, 它的 completion 參數(shù)就是第一層包裝實例;

  • 第三層包裝: DispatchedContinuation, 封裝了線程調度邏輯,它的 continuation 參數(shù)就是第二層包裝實例。

感謝各位的閱讀,以上就是“Kotlin協(xié)程的工作原理是什么”的內(nèi)容了,經(jīng)過本文的學習后,相信大家對Kotlin協(xié)程的工作原理是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關知識點的文章,歡迎關注!


本文名稱:Kotlin協(xié)程的工作原理是什么
新聞來源:http://weahome.cn/article/jjogsj.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部