Kotlin Coroutine详解

- 14 mins

前言

本文主要介绍一下Kotlin是如何实现Coroutine的,对于具体的用法推荐参考一下官方文档,讲得还是比较详细的

什么是 Coroutine

概念上来说类似于线程,拥有自己的栈、本地变量、指令指针等,需要一段代码块来运行并且拥有类似的生命周期。但是和线程不同,coroutine并不和某一个特定的线程绑定,它可以再线程A中执行,并在某一个时刻暂停(suspend),等下次调度到恢复执行的时候在线程B中执行。不同于线程,coroutine是协作式的,即子程序可以通过在函数中有不同的入口点来实现暂停、恢复,从而让出线程资源。

实战演练

首先看一个简单的小demo,来看看Kotlin的Coroutine是具体适合使用的:

    @Test
    fun async() {
        async {
            delay(1000)
            print("World!")
        }
        print("Hello ")
        Thread.sleep(2000)
    }

上面这段代码会输出Hello World!, 那么下面我们看看具体是如何工作的.

原理剖析

asyn()这里是一个函数,下面是它的源码:

public val DefaultDispatcher: CoroutineDispatcher = CommonPool

public fun <T> async(
    context: CoroutineContext = DefaultDispatcher,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyDeferredCoroutine(newContext, block) else
        DeferredCoroutine<T>(newContext, active = true)
    coroutine.initParentJob(context[Job])
    start(block, coroutine, coroutine)
    return coroutine
}

这个函数有三个参数,其中两个都有默认值,也就是默认不需要传递,context是指coroutine的上下文,默认是DefaultDispatcher,DefaultDispatcher当前的实现是CommonPool,由于目前还是experimental,后面说不定会更改成其他的实现。

object CommonPool : CoroutineDispatcher() {
    private var usePrivatePool = false

    @Volatile
    private var _pool: Executor? = null

    private inline fun <T> Try(block: () -> T) = try { block() } catch (e: Throwable) { null }

    private fun createPool(): ExecutorService {
        val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") }
            ?: return createPlainPool()
        if (!usePrivatePool) {
            Try { fjpClass.getMethod("commonPool")?.invoke(null) as? ExecutorService }
                ?.let { return it }
        }
        Try { fjpClass.getConstructor(Int::class.java).newInstance(defaultParallelism()) as? ExecutorService }
            ?. let { return it }
        return createPlainPool()
    }

    private fun createPlainPool(): ExecutorService {
        val threadId = AtomicInteger()
        return Executors.newFixedThreadPool(defaultParallelism()) {
            Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true }
        }
    }
}

CommonPool默认会使用ForkJoinPool作为coroutine的调度策略,如果不存在则fallback到线程池的策略,ForkJoinPool实现了work-stealing算法,当前线程的工作完成后从其他线程的待执行任务中窃取,具体的解释推荐直接看其注释,比网上的博客清晰的多。 而第二个参数CoroutineStart 指的是coroutine启动的选项,总的来说有四种:

/*
 * * [DEFAULT] -- immediately schedules coroutine for execution according to its context;
 * * [LAZY] -- starts coroutine lazily, only when it is needed;
 * * [ATOMIC] -- atomically (non-cancellably) schedules coroutine for execution according to its context;
 * * [UNDISPATCHED] -- immediately executes coroutine until its first suspension point _in the current thread_.
 */

第三个就是实际的coroutine要执行的代码段了,下面我们看看具体的执行流程:

public fun <T> async(
    context: CoroutineContext = DefaultDispatcher,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    /**
   *  初始化上下文,例如名字(方便调试)
   */
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyDeferredCoroutine(newContext, block) else
        DeferredCoroutine<T>(newContext, active = true)
    //这里是DeferredCoroutine,并且不存在父任务
    coroutine.initParentJob(context[Job])
    //Kotlin的运算符重载,会转化为对应参数的invoke方法
   //https://kotlinlang.org/docs/reference/operator-overloading.html
    start(block, coroutine, coroutine)
    return coroutine
}

public fun newCoroutineContext(context: CoroutineContext): CoroutineContext {
    val debug = if (DEBUG) context + CoroutineId(COROUTINE_ID.incrementAndGet()) else context
    return if (context !== DefaultDispatcher && context[ContinuationInterceptor] == null)
        debug + DefaultDispatcher else debug
}

下面就会调用CoroutineStart中对应的invoke方法:

    public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>) =
        when (this) {
            //类似java的switch语句(TABLESWITCH/lookupswitch)
            //https://kotlinlang.org/docs/reference/control-flow.html
            CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
            CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
            CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
            CoroutineStart.LAZY -> Unit // will start lazily
        }

接着会去调用startCoroutineCancellable方法:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    createCoroutineUnchecked(receiver, completion).resumeCancellable(Unit)

//
public fun <R, T> (suspend R.() -> T).createCoroutineUnchecked(
        receiver: R,
        completion: Continuation<T>
): Continuation<Unit> =
        if (this !is kotlin.coroutines.experimental.jvm.internal.CoroutineImpl)
            buildContinuationByInvokeCall(completion) {
                @Suppress("UNCHECKED_CAST")
                (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, completion)
            }
        else
            //这里create方法会去创建CoroutineImpl,但是我们看到CoroutineImpl中这方法会直接抛异常
            //wtf?实际上这里传递进来的this是编译器根据async中的lambda动态生产的类的实例,因此
            //也就是说实际的调用是哪个动态类
            (this.create(receiver, completion) as kotlin.coroutines.experimental.jvm.internal.CoroutineImpl).facade

上面说到编译器会生成内部类,那么我们看看这里到底有什么黑魔法,下面贴一下具体的结构 image 反编译之后,先只看create方法:

static final class CoroutineTest.launch
extends CoroutineImpl
implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
    private CoroutineScope p$;
    
    @NotNull
    public final Continuation<Unit> create(@NotNull CoroutineScope $receiver, @NotNull Continuation<? super Unit> $continuation) {
        Intrinsics.checkParameterIsNotNull((Object)$receiver, (String)"$receiver");
        Intrinsics.checkParameterIsNotNull($continuation, (String)"$continuation");
        CoroutineImpl coroutineImpl = new ;
        CoroutineScope coroutineScope = coroutineImpl.p$ = $receiver;
        return coroutineImpl;
    }
}

创建完需要的上下文之后,会去调用拿到Continuation后,就去调用resumeCancellable方法:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    createCoroutineUnchecked(receiver, completion).resumeCancellable(Unit)
internal fun <T> Continuation<T>.resumeCancellable(value: T) = when (this) {
    is DispatchedContinuation -> resumeCancellable(value)
    else -> resume(value)
}

    @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
    inline fun resumeCancellable(value: T) {
        val context = continuation.context
        if (dispatcher.isDispatchNeeded(context))
            dispatcher.dispatch(context, DispatchTask(continuation, value, exception = false, cancellable = true))
        else
            resumeUndispatched(value)
    }

image 可以看到最终就是丢到CommonPool中(ForkJoinPool),不过在那之前会包装成一个DispatchTask:

internal class DispatchTask<in T>(
    private val continuation: Continuation<T>,
    private val value: Any?, // T | Throwable
    private val exception: Boolean,
    private val cancellable: Boolean
) : Runnable {
    @Suppress("UNCHECKED_CAST")
    override fun run() {
        val context = continuation.context
        val job = if (cancellable) context[Job] else null
        withCoroutineContext(context) {
            when {
                job != null && !job.isActive -> continuation.resumeWithException(job.getCancellationException())
                exception -> continuation.resumeWithException(value as Throwable)
                else -> continuation.resume(value as T)
            }
        }
    }

    override fun toString(): String =
        "DispatchTask[$value, cancellable=$cancellable, ${continuation.toDebugString()}]"
}

在我们的场景下最终会调用:Continuation#public fun resume(value: T),这里的实际会调用:

abstract class CoroutineImpl(
        arity: Int,
        @JvmField
        protected var completion: Continuation<Any?>?
) : Lambda(arity), Continuation<Any?> {
    override fun resume(value: Any?) {
        processBareContinuationResume(completion!!) {
            doResume(value, null)
        }
    }
}
@kotlin.internal.InlineOnly
internal inline fun processBareContinuationResume(completion: Continuation<*>, block: () -> Any?) {
    try {
        val result = block()
        if (result !== COROUTINE_SUSPENDED) {
            @Suppress("UNCHECKED_CAST")
            (completion as Continuation<Any?>).resume(result)
        }
    } catch (t: Throwable) {
        completion.resumeWithException(t)
    }
}

这里doResume就是上文提到Kotlin编译器生成的内部类:

    @Nullable
    public final Object doResume(@Nullable Object var1_1, @Nullable Throwable var2_2) {
        var5_3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch (var0.label) {
            case 0: {
                v0 = var2_2;
                if (v0 != null) {
                    throw v0;
                }
                var3_4 = this.p$;
                this.label = 1;
                v1 = DelayKt.delay$default((long)1000, (TimeUnit)null, (Continuation)this, (int)2, (Object)null);
                if (v1 == var5_3) {
                    return var5_3;
                }
                ** GOTO lbl18
            }
            case 1: {
                v2 = throwable;
                if (v2 != null) {
                    throw v2;
                }
                v1 = data;
lbl18: // 2 sources:
                var4_5 = "World!";
                System.out.print((Object)var4_5);
                return Unit.INSTANCE;
            }
        }
        throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }

可以看到实际上Kotlin使用状态机实现的Coroutine,根据label的状态决定要执行的代码块,Kotlin会在编译时根据可suspend的方法插入相应的label,从而实现主动让出线程资源,并且将本地变量保存到Continuation的实例变量中,等到下次得到调度的时候,根据label来决定要执行的代码块,等到函数实际执行完的时候则直接返回对应的返回值(没有的话则是默认值). 比如看下面这段代码:

val a = a()
val y = foo(a).await() // suspension point #1
b()
val z = bar(a, y).await() // suspension point #2
c(z)

这段代码总共有三种状态:

  1. 初始状态,在所有的suspension point点之前
  2. 第一个暂停点
  3. 第二个暂停点

每一个状态都是continuation的入口点之一,这段代码会编译为一个实现了状态机的匿名类,有一个状态变量用于保存当前状态机的状态,以及用于保存当前coroutine的本地变量,编译后的代码用Java代码类似下面:

class <anonymous_for_state_machine> extends CoroutineImpl<...> implements Continuation<Object> {
    // 状态机当前的状态
    int label = 0
    
    // coroutine的本地变量
    A a = null
    Y y = null
    
    void resume(Object data) {
        if (label == 0) goto L0
        if (label == 1) goto L1
        if (label == 2) goto L2
        else throw IllegalStateException()
        
      L0:
        a = a()
        label = 1
        data = foo(a).await(this) // 'this' 默认会被传递给await方法
        if (data == COROUTINE_SUSPENDED) return //如果需要暂停则返回
      L1:
        //重新得到调度
        y = (Y) data//保存本地变量
        b()
        label = 2
        data = bar(a, y).await(this) 
        if (data == COROUTINE_SUSPENDED) return 
      L2:
        Z z = (Z) data
        c(z)
        label = -1 // No more steps are allowed
        return
    }          
}    

当coroutine开始执行的时候,默认label为0,那么就会跳转到L0,然后执行一个耗时的业务逻辑,将label设置为1,调用await,如果coroutine的执行需要暂停的那么就返回掉。当需要继续执行的时候就再次调用resume(),这次就会跳转到L1, 执行完业务逻辑后,将label设置为2,调用await并根据是否需要暂停来return,下次的继续调度,这次会从L2开始执行,然后label设置为-1,意味着不需要执行完了,不需要再调度了。

回到最初的代码段,我们首先调用了delay方法,这个方法默认使用ScheduledExecutorService,从而将当前的coroutine上下文包装到DispatchTask,再对应的延迟时间之后再恢复执行,恢复执行之后,这时候label是1,那么就会直接进入第二段代码,输出World!

    @Test
    fun async() {
        async {
            //在另外的线程池中执行,通过保存当前的执行上下文(本地变量、状态机的状态位等),并丢到
            //ScheduledExecutorService中延迟执行
            delay(1000)
            print("World!")
        }
        //主线程中直接输出
        print("Hello ")
        Thread.sleep(2000)
    }

165021507294969_ pic_hd

总结

本文大致讲解了一些Kotlin中Coroutine的实现原理,当然对于协程,很多编程语言都有相关的实现,推荐都看一下文档,实际使用对比看看。 image

参考资料

  1. https://github.com/Kotlin/kotlin-coroutines
  2. https://en.wikipedia.org/wiki/Coroutine
  3. https://www.lua.org/pil/9.html
  4. https://golang.org/doc/effective_go.html#concurrency
  5. https://www.youtube.com/watch?v=4W3ruTWUhpw
  6. https://www.youtube.com/watch?v=EMv_8dxSqdE
comments powered by Disqus
rss facebook twitter github youtube mail spotify lastfm instagram linkedin google google-plus pinterest medium vimeo stackoverflow reddit quora quora