为了更深入学习协程的底层实现原理,了解协程线程切换的根本本质。也为了以后在工作中可以根据不同的需求场景,更加随心所欲的使用不同的协程。
今天通过 launch 跟踪一下协程的执行流程。
fun getData() {
Trace.beginSection("getData");
Log.e(TAG, "getData before " + Thread.currentThread().name)
val demoScope: suspend CoroutineScope.() -> Unit = {
Trace.beginSection("DispatchersIO");
Log.e(TAG, "getData IO 1 " + Thread.currentThread().name)
Thread.sleep(1000)
Log.e(TAG, "getData IO 2 " + Thread.currentThread().name)
Trace.endSection();
}
viewModelScope.launch(Dispatchers.IO, block = demoScope)
}
1. 流程图
1.1 从 launch 源码开始
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
//1,先通过参数Context构造一个新的CoroutineContext
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
launch 方法有三个参数
-
context:常用的一般是 Dispatchers.Default,Dispatchers.Main,Dispatchers.Unconfined,Dispatchers.IO。
-
start:枚举类型共四种:DEFAULT,LAZY,ATOMIC,UNDISPATCHED
-
block:就是 launch 执行的协程体
1.2 我们来看 newCoroutineContext 方法
@ExperimentalCoroutinesApi
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext + context//1
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
刚开始看到代码 1 的+号,头都是蒙的,这是什么鬼?不是数字类型,为啥能加?
其实本质就是调用了 CoroutineContext 的 plus,是操作符的重载
/**
* Returns a context containing elements from this context and elements from other [context].
* The elements from this context with the same key as in the other one are dropped.
*/
public operator fun plus(context: CoroutineContext): CoroutineContext =
if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
context.fold(this) { acc, element ->
//operation函数体。。。。。。。
}
fold 函数比较难理解,我们先说结论,就是把参数 this 内部与 context 的 key 一样的 CoroutineContext 移除后,剩下的 CoroutineContext 与 context 组成新的 CoroutineContext 对象。下边慢慢分析
CoroutineContext 的子类重写 fold 函数的一共有三个 EmptyCoroutineContext,CombinedContext,Element
-
上述代码第 6 行已经判断过 context 是 EmptyCoroutineContext。所以当前的 context 不可能是 EmptyCoroutineContext。其 fold 方法直接返回 this。如下:
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = initial
-
是 Element 时。acc 就是 fold 函数参数。element 就是 fold 函数调用者
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
operation(initial, this)
-
是 CombinedContext 比较复杂
internal class CombinedContext(
private val left: CoroutineContext,
private val element: Element
) : CoroutineContext, Serializable {
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
operation(left.fold(initial, operation), element)
}
要递归调用 fold 函数,并重复调用 operation 函数。直到最后调用 Element,或者 EmptyCoroutineContext 的 fold 函数。
最终需要分析的都是 Element 的 fold 函数执行情况
context.fold(this) { acc, element ->//acc就是fold函数参数。element就是fold函数调用者,当前就是Dispatchers.IO
//如果acc的key和element的key是相同,就返回新的EmptyCoroutineContext
//否则就返回acc
val removed = acc.minusKey(element.key)
if (removed === EmptyCoroutineContext) element else {
// make sure interceptor is always last in the context (and thus is fast to get when present)
//此时removed为acc的left,也就是SupervisorJob
//获得removed里key为ContinuationInterceptor.key的分发器。当前为null
val interceptor = removed[ContinuationInterceptor]
//合并removed和element。也就是SupervisorJob+Dispatchers.IO
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}
小结下:
newCoroutineContext 其实就是给自己传递的 context 添加一些附加技能。但是 key 相同的技能只包含一个
比如 ViewModel 中 viewModelScope 的 coroutineContext 的默认值 SupervisorJob() + Dispatchers.Main.immediate。默认主线程执行,并保证如果其中的某个子协程出现异常,不会影响子协程
比如切换 dispatcher,当前父协程 dispatcher 为 Dispatchers.Main.immediate,切换为 Dispatchers.IO
1.3 下面分析 StandaloneCoroutine 的 start 方法
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}
internal fun initParentJob() {
//当前的parentContext[job]就是SupervisorJob
initParentJobInternal(parentContext[Job])
}
/**
* Initializes parent job.
* It shall be invoked at most once after construction after all other initialization.
*/
internal fun initParentJobInternal(parent: Job?) {
assert { parentHandle == null }
if (parent == null) {
parentHandle = NonDisposableHandle
return
}
//start保证parent状态为isActive
parent.start() // make sure the parent is
//...
}
CoroutineStart 的 start 就是如下的 invoke 函数
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
通过这里可以大概猜测一下几种 start 的区别。当前我们只看 DEFAULT
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
//runSafely就是添加了一个try catch
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}
createCoroutineUnintercepted 在文件 kotlin.coroutines.intrinsics.intrinsicsJvm.kt
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
//当前对象是BaseContinuationImpl的子类
return if (this is BaseContinuationImpl)
//这个方法在哪?
create(receiver, probeCompletion)
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
}
}
}
create 方法在哪?需要反编译代码才能看的到
public final class MainViewModel extends ViewModel {
public static final Companion Companion = new Companion(null);
private static final String TAG = "MainViewModel";
public final void getData() {
Trace.beginSection("getData");
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("getData before ");
stringBuilder.append(Thread.currentThread().getName());
Log.e("MainViewModel", stringBuilder.toString());
MainViewModel$getData$eeeee$1 mainViewModel$getData$eeeee$1 = new MainViewModel$getData$eeeee$1(null);
BuildersKt.launch$default(ViewModelKt.getViewModelScope(this), (CoroutineContext)Dispatchers.getIO(), null, mainViewModel$getData$eeeee$1, 2, null);
}
@Metadata(d1 = {"\000\022\n\002\030\002\n\002\020\000\n\002\b\002\n\002\020\016\n\000\b\003\030\0002\0020\001B\007\b\002¢\006\002\020\002R\016\020\003\032\0020\004XT¢\006\002\n\000¨\006\005"}, d2 = {"Lcom/haier/uhome/coroutine/ui/main/MainViewModel$Companion;", "", "()V", "TAG", "", "coroutine_debug"}, k = 1, mv = {1, 6, 0}, xi = 48)
public static final class Companion {
private Companion() {}
}
@Metadata(d1 = {"\000\n\n\000\n\002\020\002\n\002\030\002\020\000\032\0020\001*\0020\002H@"}, d2 = {"<anonymous>", "", "Lkotlinx/coroutines/CoroutineScope;"}, k = 3, mv = {1, 6, 0}, xi = 48)
@DebugMetadata(c = "com.haier.uhome.coroutine.ui.main.MainViewModel$getData$eeeee$1", f = "MainViewModel.kt", i = {}, l = {}, m = "invokeSuspend", n = {}, s = {})
static final class MainViewModel$getData$eeeee$1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
int label;
MainViewModel$getData$eeeee$1(Continuation<? super MainViewModel$getData$eeeee$1> param1Continuation) {
super(2, param1Continuation);
}
public final Continuation<Unit> create(Object param1Object, Continuation<?> param1Continuation) {
return (Continuation<Unit>)new MainViewModel$getData$eeeee$1((Continuation)param1Continuation);
}
//。。。。。。。
}
}
可以看到我们的协程体其实是一个基础 SuspendLambda 的 class 对象。当调用 create 时,用参数 probeCompletion 又构造了一个新的协程体对象
SuspendLambda 的继承关系如下:
SuspendLambda-->ContinuationImpl-->BaseContinuationImpl-->Continuation<Any?>, CoroutineStackFrame, Serializable
所以 intercepted()方法就是调用 ContinuationImpl 内部实现的
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
context[ContinuationInterceptor]此时获得的就是 Dispatchers.IO,
其 interceptContinuation 方法如下
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
把 continuation 封装成了 DispatchedContinuation。其继承关系如下:
DispatchedContinuation-->DispatchedTask-->SchedulerTask-->Task-->Runnable
需要注意的是 continuation 就是协程体。就是我们要执行的内容
1.4 继续看 resumeCancellableWith 方法
在文件 kotlinx.coroutines.internal.DispatchedContinuation.kt
@Suppress("NOTHING_TO_INLINE")
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
//dispatcher就是协程代码传入的分发器,
//判断是否需要切换通过dispatcher执行,当前dispatcher.io,isDispatchNeeded是直接返回true
if (dispatcher.isDispatchNeeded(context)) {//代码1
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
resumeUndispatchedWith(result)
}
}
}
}
dispatcher.dispatch()方法就把上边生成的 runnable 放到了线程池队列中
文件 kotlinx.coroutines.scheduling.Dispatcher.kt#LimitingDispatcher
override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
private fun dispatch(block: Runnable, tailDispatch: Boolean) {
var taskToSchedule = block
while (true) {
// Commit in-flight tasks slot
val inFlight = inFlightTasks.incrementAndGet()
// Fast path, if parallelism limit is not reached, dispatch task and return
if (inFlight <= parallelism) {
dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
return
}
//....
}
}
2. dispatche 具体是什么呢?
流程图如下
2.1 其实是在 Dispatchers.IO 实例化时的参数,DefaultScheduler 对象
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
val IO: CoroutineDispatcher = LimitingDispatcher(
//这里实例化调度器对象
this,
systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
"Dispatchers.IO",
TASK_PROBABLY_BLOCKING
)
//....
}
而 DefaultScheduler 内部实例化了一个线程池
2.2 在文件 kotlinx.coroutines.scheduling.Dispatcher.kt
//kotlinx.coroutines.scheduling.Dispatcher.kt#ExperimentalCoroutineDispatcher
override val executor: Executor
get() = coroutineScheduler
private var coroutineScheduler = createScheduler()
private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
dispatcher.dispatchWithContext,就是调用线程池的 dispatch,把任务放到 globalQueue 队列里,我们看一下
在文件 kotlinx.coroutines.scheduling.CoroutineScheduler.kt
internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
try {
//coroutineScheduler就是线程池
coroutineScheduler.dispatch(block, context, tailDispatch)
} catch (e: RejectedExecutionException) {
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
// TaskContext shouldn't be lost here to properly invoke before/after task
DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context))
}
}
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
trackTask() // this is needed for virtual time support
//当前block就继承之Task
val task = createTask(block, taskContext)
// try to submit the task to the local queue and act depending on the result
//当前线程池不是work,所以此时currentWorker返回为null
val currentWorker = currentWorker()
//local放置失败
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
if (notAdded != null) {
//放到global队列里
if (!addToGlobalQueue(notAdded)) {
// Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
throw RejectedExecutionException("$schedulerName was terminated")
}
}
}
3. 任务具体如何执行?
时序图如下:
3.1 我们来看 kotlinx.coroutines.scheduling.CoroutineScheduler 文件
private fun runWorker() {
var rescanned = false
while (!isTerminated && state != WorkerState.TERMINATED) {
//通过上一步可以知道任务没有放置到local队列,mayHaveLocalTasks为false
val task = findTask(mayHaveLocalTasks)
// Task found. Execute and repeat
if (task != null) {
rescanned = false
minDelayUntilStealableTaskNs = 0L
executeTask(task)
continue
} else {
mayHaveLocalTasks = false
}
//。。。。。。
}
private fun findAnyTask(scanLocalQueue: Boolean): Task? {
/*
* Anti-starvation mechanism: probabilistically poll either local
* or global queue to ensure progress for both external and internal tasks.
*/
if (scanLocalQueue) {
val globalFirst = nextInt(2 * corePoolSize) == 0
if (globalFirst) pollGlobalQueues()?.let { return it }
localQueue.poll()?.let { return it }
if (!globalFirst) pollGlobalQueues()?.let { return it }
} else {
//从glocal中取出任务
pollGlobalQueues()?.let { return it }
}
return trySteal(blockingOnly = false)
}
private fun pollGlobalQueues(): Task? {
if (nextInt(2) == 0) {
globalCpuQueue.removeFirstOrNull()?.let { return it }
return globalBlockingQueue.removeFirstOrNull()
} else {
globalBlockingQueue.removeFirstOrNull()?.let { return it }
return globalCpuQueue.removeFirstOrNull()
}
}
//参数task就是一个runnable
private fun executeTask(task: Task) {
val taskMode = task.mode
idleReset(taskMode)
beforeTask(taskMode)
//执行task里的run方法
runSafely(task)
afterTask(taskMode)
}
3.2 Task 的 run 方法的实现在 kotlinx.coroutines.DispatchedTask 里
public final override fun run() {
// should have been set before dispatching
val taskContext = this.taskContext
var fatalException: Throwable? = null
try {
//...
withCoroutineContext(context, delegate.countOrElement) {
//。。。。
continuation.resume(getSuccessfulResult(state))
//。。。。。
}
} catch (e: Throwable) {
// This instead of runCatching to have nicer stacktrace and debug experience
fatalException = e
} finally {
val result = runCatching { taskContext.afterTask() }
handleFatalException(fatalException, result.exceptionOrNull())
}
}
3.3 continuation.resume 在 kotlin.coroutines.Continuation.kt 文件
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
3.4 最终执行内容在文件:kotlin.coroutines.jvm.internal.ContinuationImpl 里
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
//执行协程体内容
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
3.5 invokeSuspend 在哪呢?还是找不到!同样需要反编译查看。就是
public final class MainViewModel extends ViewModel {
public static final Companion Companion = new Companion(null);
private static final String TAG = "MainViewModel";
public final void getData() {
Trace.beginSection("getData");
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("getData before ");
stringBuilder.append(Thread.currentThread().getName());
Log.e("MainViewModel", stringBuilder.toString());
MainViewModel$getData$eeeee$1 mainViewModel$getData$eeeee$1 = new MainViewModel$getData$eeeee$1(null);
BuildersKt.launch$default(ViewModelKt.getViewModelScope(this), (CoroutineContext)Dispatchers.getIO(), null, mainViewModel$getData$eeeee$1, 2, null);
}
@Metadata(d1 = {"\000\n\n\000\n\002\020\002\n\002\030\002\020\000\032\0020\001*\0020\002H@"}, d2 = {"<anonymous>", "", "Lkotlinx/coroutines/CoroutineScope;"}, k = 3, mv = {1, 6, 0}, xi = 48)
@DebugMetadata(c = "com.haier.uhome.coroutine.ui.main.MainViewModel$getData$eeeee$1", f = "MainViewModel.kt", i = {}, l = {}, m = "invokeSuspend", n = {}, s = {})
static final class MainViewModel$getData$eeeee$1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
int label;
public final Object invokeSuspend(Object param1Object) {
IntrinsicsKt.getCOROUTINE_SUSPENDED();
if (this.label == 0) {
ResultKt.throwOnFailure(param1Object);
Trace.beginSection("DispatchersIO");
param1Object = new StringBuilder();
param1Object.append("getData IO 1 ");
param1Object.append(Thread.currentThread().getName());
Log.e("MainViewModel", param1Object.toString());
Thread.sleep(1000L);
param1Object = new StringBuilder();
param1Object.append("getData IO 2 ");
param1Object.append(Thread.currentThread().getName());
Log.e("MainViewModel", param1Object.toString());
Trace.endSection();
return Unit.INSTANCE;
}
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
}
}
到此处协程 launch 内容就执行完了。
4. 总结
其底层使用的就是对线程池的封装,把协程体封装到 runnable 里,放到线程池执行。使用了的线程池线程复用,不必频繁的创建,销毁线程等优点。提升了性能
其他的 Dispatcher,我就不一一跟踪了,有兴趣的同学可以自己跟踪一下。这里简单介绍下我的理解:
Dispatchers.Main:其内部使用的 MainCoroutineDispatcher,把任务放到主线程的 handler 顺序执行
Dispatchers.Default:是一个使用 DefaultScheduler 的线程池,据说比较适合做逻辑性任务(这个我看不出来😋)
Dispatchers.Unconfined:跟随父协程的 context,直接执行,不做线程切换
launch 主要逻辑不是很复杂,主要就是线程池的调度。难以跟踪的原因大概是因为源码中到处在使用函数扩展。再加上协程体的具体实现是 kotlin 编译过程中生成的。所以花的时间比较多,需要有耐心!
5. 团队介绍
「三翼鸟数字化技术平台-场景设计交互平台」主要负责设计工具的研发,包括营销设计工具、家电VR设计和展示、水电暖通前置设计能力,研发并沉淀素材库,构建家居家装素材库,集成户型库、全品类产品库、设计方案库、生产工艺模型,打造基于户型和风格的AI设计能力,快速生成算量和报价;同时研发了门店设计师中心和项目中心,包括设计师管理能力和项目经理管理能力。实现了场景全生命周期管理,同时为水,空气,厨房等产业提供商机管理工具,从而实现了以场景贯穿的B端C端全流程系统。