目录

Kotlin Coroutine 入门(一) - suspend

Kotlin 协程入门(一) - suspend

1. 挂起是什么?

首先我们定义三个耗时任务, work1 需要 work0 的返回值作为参数, work2 需要 work1 的返回值作为参数:

1
2
3
suspend fun work0(): Int { largeWork(); return 0 }
suspend fun work1(input: Int): Int { largeWork(); return input + 1 }
suspend fun work2(input: Int): Int { largeWork(); return input + 2 }

在 kotlin 中, 我们会这么写:

1
2
3
4
5
6
7
8
suspend fun coroutineTest() {
    val work0 = work0()
    println(work0)
    val work1 = work1(work0)
    println(work1)
    val work2 = work2(work1)
    println(work2)
}

一切是那么的顺畅! 而在 Java 中, 面对这种耗时任务, 我们往往会这么定义:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
void work0(Consumer<Integer> consumer) {
    largeWork(); // 耗时任务
    consumer.accept(0);
}

void work1(int input, Consumer<Integer> consumer) {
    largeWork(); // 耗时任务
    consumer.accept(input + 1);
}

void work2(int input, Consumer<Integer> consumer) {
    largeWork(); // 耗时任务
    consumer.accept(input + 2);
}

然后这么调用, 这里使用了 Java8 的 Lambda 语法

1
2
3
4
5
6
7
void javaCallbackHell() {
    work0(
      workRes0 -> work1(workRes0,
        workRes1 -> work2(workRes1,
          workRes2 -> {
          })));
}

于是, 出现了回调地狱, 已经开始有点害怕了, 而且这仅仅是包含请求成功的情况, 如果涉及到请求失败, 线程调度, 线程同步, 那事情就变得更加恐怖了

而 Kotlin 的挂起, 就是为了解决这个问题而做的一个语法糖, 让我们写的更爽.

Kotlin 挂起的本质, 就是类似于上面的回调, 而被挂起, 本质上就是将后续全部的函数加入回调中.

这被称为 CPS 转换(Continuation-Passing-Style Transformation), 不信? 我们来分析一下 suspend 这个关键字干了什么:

2. suspend 干了什么

suspend 做了什么? 写一个空的 suspend 我们就知道了:

1
suspend fun coroutineTestFun(): Int = 1

反编译之后得到:

1
2
3
public final Object coroutineTestFun(@NotNull Continuation $completion) {
   return Boxing.boxInt(1);
}

事实上只是多了一个 Continuation 对象, 返回值变成 Object 类型, 即将 suspend () -> Int 类型转换为了(Continuation) -> Any?

为什么是 Object 而不是 Integer 呢? 我们不妨看一下调用这个函数的地方做了怎样的判断:

在调用它的地方会变成类似下面这样, coroutineTestFun 如果返回 COROUTINE_SUSPENDED 表示它被挂起,不会立即返回结果

1
2
3
4
5
6
7
val var10000: Any = coroutineTestFun($completion)
if (var10000 === IntrinsicsKt.COROUTINE_SUSPENDED)
	return var10000
else {
    // 挂起函数执行结束, 继续后续的函数
    processNext(var10000)
}

而当 coroutineTestFun 说它被挂起的时候, 包裹其的外层 suspend 函数也被挂起, 不会立即返回结果, 因此: return var10000

这里我们就可以知道为什么要返回一个 Object 类型了吧, 因为 suspend 函数的返回值有三种:

  1. COROUTINE_SUSPENDED: 代表当前函数被挂起
  2. 包裹着执行结果的对象(此对象事实上通过 Continuation 传递, 后面会说到)
  3. 挂起函数内部并没有执行挂起, 直接返回实际结果

当然, 如果 suspend 内部没有实际挂起(没有直接调用其他 suspend 函数), Continuation 对象也没有任何作用.

这时, 我们想看看 suspend 具体是怎么实现这些的, 但是 suspend 函数的字节码较为复杂, 我们不妨先了解一下 Continuation

3. Continuation

很简单的一个接口, 只有一个参数, 代表协程执行结束后会在哪个 CoroutineContext 继续执行, 只有一个方法 resumeWith, 代表得到执行结果之后要干什么

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}

Continuation 的具体实现类是 BaseContinuationImpl:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    public final override fun resumeWith(result: Result<Any?>) {
        var current = this
        var param = result
        while (true) {
            with(current) {
                // ...
                val outcome: Result<Any?> =
                    try {
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted()
                if (completion is BaseContinuationImpl) {
                    current = completion
                    param = outcome
                } else {
                    // top-level completion reached -- invoke and return
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

    protected abstract fun invokeSuspend(result: Result<Any?>): Any?
    protected open fun releaseIntercepted() {}
    //...
}

这里的 invokeSuspend 事实上就是我们说的挂起的本质: 回调

而当传入的 completion 不再是 BaseContinuationImpl 的时候, 这时候就是到了顶层的 Continuation了, 然后会调用其 resumeWith(result) 方法.

那么, 如果不是 BaseContinuationImpl, 那么顶层的 Continuation 是什么呢? 这就要说到协程是如何启动的了: Kotlin Coroutine 入门 - launch/async

4. suspend 的实现

现在, 以最简单的思路去展示在本文开头的例子中 suspend 干了什么, 可以看下面这个 gif:

suspendGif

具体实现在字节码中, 但是字节码阅读略微复杂, 这里给出与反编译的字节码大致等同的 Kotlin 代码:

伪代码

首先, 每个 suspend 函数都会创建一个内部的 Continuation 对象(内部实际上并不是 MyContinuation 这个名字, 这里只是个示例), 内部的 result 属性可能是实际的函数返回值, 也可能是 Result 对象.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
class MyContinuation(continuation: Continuation<*>) : ContinuationImpl(continuation) {
    lateinit var result: Any
    var label = 0

    override fun invokeSuspend(result: Result<*>): Any {
        this.result = result
        label = label or Int.MIN_VALUE // 这里用 |=, 后续流程会减掉
        return coroutineTestFun(this)
    }
}

ContinuationImplBaseContinuationImpl 的子类, 在分析 BaseContinuationImpl 的时候, 我们发现了其实 invokeSuspend 会被多次循环调用直到到达顶层 Continuation 对象.

suspend 内部会对 continuation 进行判断, 如果 continuation 是 MyContinuation 对象的话, 则会使用当前的, 否则就新创建一个:

1
2
3
val myContinuation = if (continuation !is MyContinuation) {
    MyContinuation(continuation)
} else continuation

接下来就进入了 suspend 函数中的状态机, 首先我们要看的是状态0状态1

事实上在字节码中我们能看到其实是通过 label 来控制的, 但是由于在 kotlin 中使用这种方式相当复杂, 有兴趣的可以去自行查看真实的字节码, 这里只是个思路.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
when (myContinuation.label) {
    0 -> myContinuation.apply {
        result.throwOnFailure()
        label = 1
        work = work0(this)
        if (work == COROUTINE_SUSPENDED) {
            return COROUTINE_SUSPENDED
        }
    }
    1 -> myContinuation.apply {
        result.throwOnFailure()
        work = result
        val work0 = work as Int
        println(work0)
        label = 2
        work = work1(work0, this)
        if (work == COROUTINE_SUSPENDED) {
            return COROUTINE_SUSPENDED
        }
    }
    2 -> // ...
    3 -> // ...
    else -> throw IllegalStateException("call to 'resume' before 'invoke' with coroutine")
}

如果 work 返回的不是 COROUTINE_SUSPENDED, 那么代表函数已经得到了确切的返回结果, 比如我们的例子里面是返回 Int, 那么代表着返回了真实的 Int 类型, 直接向下处理即可.

如果返回了 COROUTINE_SUSPENDED, 则代表 work 被挂起了, 直接标记当前函数也是挂起状态即可, 等待下一次 invokeSuspend 方法被调用进入到下一个标签(label 1).

同理, 状态2/3也和状态0/1是同样的原理, 最后放上完整的伪代码(

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class MyContinuation(continuation: Continuation<*>) : ContinuationImpl(continuation) {
    lateinit var result: Any
    var label = 0

    override fun invokeSuspend(result: Result<*>): Any {
        this.result = result
        label = label or Int.MIN_VALUE // 这里用 |=, 后续流程会减掉
        return coroutineTestFun(this)
    }
}

fun coroutineTestFun(continuation: Continuation<*>): Any {
    val myContinuation = if (continuation !is MyContinuation) {
        MyContinuation(continuation)
    } else continuation

    var work: Any? = null
    // 说实话, 字节码中这个先 |=, 然后 -= 的操作有什么用呢?
    myContinuation.label -= Int.MIN_VALUE
    when (myContinuation.label) {
        0 -> myContinuation.apply {
            result.throwOnFailure()
            label = 1
            work = work0(this)
            if (work == COROUTINE_SUSPENDED) {
                return COROUTINE_SUSPENDED
            }
        }
        1 -> myContinuation.apply {
            result.throwOnFailure()
            work = result
            val work0 = work as Int
            println(work0)
            label = 2
            work = work1(work0, this)
            if (work == COROUTINE_SUSPENDED) {
                return COROUTINE_SUSPENDED
            }
        }
        2 -> myContinuation.apply {
            result.throwOnFailure()
            work = result
            val work1 = work as Int
            println(work1)
            label = 3
            work = work2(work1, this)
            if (work == COROUTINE_SUSPENDED) {
                return COROUTINE_SUSPENDED
            }
        }
        3 -> myContinuation.apply {
            result.throwOnFailure()
            work = result
            val work2 = work as Int
            println(work2)
            return Unit
        }
        else -> throw IllegalStateException("call to 'resume' before 'invoke' with coroutine")
    }
    return Unit
}

附: 真实的字节码

事实上的话, 字节码是有点复杂的, 但是看了上面的解释, 想必已经直到了大概的思路了.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@Nullable
public static final Object coroutineTestFun(@NotNull Continuation var0) {
   Object $continuation;
   label37: {
      if (var0 instanceof <undefinedtype>) {
         $continuation = (<undefinedtype>)var0;
         if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
            ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
            break label37;
         }
      }

      $continuation = new ContinuationImpl(var0) {
         // $FF: synthetic field
         Object result;
         int label;

         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {
            this.result = $result;
            this.label |= Integer.MIN_VALUE;
            return CoroutineTestKt.coroutineTestFun(this);
         }
      };
   }

   Object var10000;
   label31: {
      Object var7;
      label30: {
         Object $result = ((<undefinedtype>)$continuation).result;
         var7 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
         switch(((<undefinedtype>)$continuation).label) {
         case 0:
            ResultKt.throwOnFailure($result);
            ((<undefinedtype>)$continuation).label = 1;
            var10000 = work0((Continuation)$continuation);
            if (var10000 == var7) {
               return var7;
            }
            break;
         case 1:
            ResultKt.throwOnFailure($result);
            var10000 = $result;
            break;
         case 2:
            ResultKt.throwOnFailure($result);
            var10000 = $result;
            break label30;
         case 3:
            ResultKt.throwOnFailure($result);
            var10000 = $result;
            break label31;
         default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
         }

         int work0 = ((Number)var10000).intValue();
         boolean var2 = false;
         System.out.println(work0);
         ((<undefinedtype>)$continuation).label = 2;
         var10000 = work1(work0, (Continuation)$continuation);
         if (var10000 == var7) {
            return var7;
         }
      }

      int work1 = ((Number)var10000).intValue();
      boolean var3 = false;
      System.out.println(work1);
      ((<undefinedtype>)$continuation).label = 3;
      var10000 = work2(work1, (Continuation)$continuation);
      if (var10000 == var7) {
         return var7;
      }
   }

   int work2 = ((Number)var10000).intValue();
   boolean var4 = false;
   System.out.println(work2);
   return Unit.INSTANCE;
}

最后

事实上本文没有讨论切换线程/协程时的状况, 只是分析了一下 suspend 关键字, 理解挂起的实现原理.