Kotlin 协程基础知识总结二 —— 启动与取消

协程启动与取消的主要内容:

  • 启动协程:启动构建器、启动模式、作用域构建器、Job 生命周期
  • 取消协程:协程的取消、CPU 密集型任务取消、协程取消的副作用、超时任务

1、协程构建器

(P20)launch 与 aysnc 两种协程构建器的返回值不同:

  • launch 返回的是一个 Job 接口的实例,这个 Job 就表示协程任务本身
  • async 返回的是一个 Deferred 接口实例,是 Job 的子接口,可以通过 await() 获取任务执行结果

Job 和 Deferred 源码上有大量的注释,解释了很多基础内容,你可以直接查看源码注释,也可以查看 Kotlin 的官方文档中对 Job 和 Deferred 的介绍,内容是一样的。

1.1 Job

基础知识:

  • 创建 Job 实例的方式通常有两种:
    • 通过协程启动器创建,代码块运行完协程也就完成了
    • 通过 CompletableJob 工厂函数创建,需要调用 CompletableJob.complete() 完成
  • Job 可取消,生命周期以任务完成为终点
  • Job 可形成父子层次结构,父任务取消会递归取消所有子任务。子任务失败并且出现 CancellationException 以外的异常(CancellationException 在取消任务时抛出,会被静默处理),会停止父任务,进而取消其他子任务
  • Job 的执行没有结果值(子接口 Deferred 才有)

作业状态:

2024-08-18.Job状态

各个状态之间的转换图如下:

                                          wait children
    +-----+ start  +--------+ complete   +-------------+  finish  +-----------+
    | New | -----> | Active | ---------> | Completing  | -------> | Completed |
    +-----+        +--------+            +-------------+          +-----------+
                     |  cancel / fail       |
                     |     +----------------+
                     |     |
                     V     V
                 +------------+                           finish  +-----------+
                 | Cancelling | --------------------------------> | Cancelled |
                 +------------+                                   +-----------+

状态描述:

  • 新建状态(New):通常,Job 是在活动状态下创建的(被创建并启动),但如果将协程构建器参数设置为 CoroutineStart.LAZY 就会使其处于新建状态,通过调用 start 或 join 才会进入活动状态
  • 活动状态(Active):协程在工作时或直到 CompletableJob 完成,或者直到失败或取消,Job 都处于活动状态
  • 取消(Cancelling)和已取消(Cancelled):
    • 一个处于活动状态的 Job 会因异常导致失败而处于取消状态
    • Job 可以随时通过 cancel() 取消并立即进入取消状态
    • 当 Job 本身的工作及其所有子 Job 都完成后会处于已取消状态(官方文档说的就是 “The job becomes cancelled when it finishes executing its work and all its children complete.”,但是个人结合状态机理解,应该是所有的 Job 都被取消后,而不是正常完成,才进入去取消状态)
  • 完成(Completing)和已完成(Completed):
    • 活动状态的协程体完成或者调用 CompletableJob.complete 将会让 Job 处于完成状态。Job 会在完成状态等待,直到其所有子 Job 完成才会进入已完成状态
    • 完成状态是针对 Job 内部的,对于外部观察者而言,这个完成状态的 Job 还是处于活动状态,它只是在内部等待其所有子 Job 完成

CoroutineContext 中的 Job 表示协程本身。

取消原因:

  • 异常完成:协程体抛出异常时,该 Job 就为异常完成,CompletableJob 可以通过 completeExceptionally() 实现异常完成
  • 取消原因:异常完成的 Job 会被取消,该异常就成为该 Job 的取消原因
  • 正常取消与失败的区别:在于取消原因的类型,抛出 Cancellation 的协程被视为正常取消,其他类型被视为失败:
    • Job 失败时,其父 Job 也会因为相同异常而被取消
    • 调用 cancel() 会使 Job 正常取消,而不会导致其父 Job 取消。这样 Job 可以取消子 Job 或递归取消所有子 Job 而无需取消自身

Job 接口及其派生的所有接口上的所有函数都是线程安全的,并且可以从并发协程中安全地调用,而无需外部同步。

1.2 Deferred

Deferred 就是一个有结果的 Job,也是非阻塞可取消的 Future。

Deferred 由 aysnc 构建器或 CompletableDeferred 的构造函数创建,在计算值时处于活动状态。

Deferred 具有与 Job 相同的状态机,并提供方法检索执行的计算结果成功或失败。Deferred 的结果在其完成后可通过 await() 检索,如果任务失败该方法会抛出异常(如果不调用 await,这个异常虽然也会抛出但是不会中断程序运行)。这里注意处于已取消状态的 Deferred 也被视为已完成。可以通过 getCompletionExceptionOrNull() 从已完成的 Deferred 中检索异常。

因为继承自 Job 且与其有相同状态机,所以 Deferred 通常也是在活动状态创建,只有设置 CoroutineStart.LAZY 时才处于新建状态,可以通过 start、join、await 使 Deferred 进入活动状态。

1.3 示例代码

	fun test01() = runBlocking {
        // launch 返回 Job,没有结果
        val job1 = launch {
            delay(200)
            println("job1 finished.")
        }

        // async 返回 Deferred 通过 await() 获取结果
        val job2 = async {
            delay(200)
            println("job2 finished.")
            // 任务结果
            "job2 result"
        }

        // 通过 await() 获取任务结果
        println("job2 result: ${job2.await()}")
    }

结果如下:

job1 finished.
job2 finished.
job2 result: job2 result

注意 runBlocking 函数会创建顶层协程,该协程会阻塞当前线程直到其作用域内所有协程都执行完。一般在项目开发中不太使用 runBlocking,因为它会阻塞(主)线程,这里用它主要是因为它作为顶层函数可以无需特定的协程上下文就创建并运行协程,且会等待内部所有协程运行完。但如果是一般的父协程,父协程本身的任务与其子协程的运行顺序是不确定的,父协程有可能先运行完而不等待子协程是否运行完。

1.4 等待协程作业

join 与 await

(P21)等待协程作业完成再继续向下执行,可以通过 join() 或 await(),二者的注释信息如下:

  • join():挂起协程直到作业完成。当作业因任何原因完成且调用协程的作业仍处于活动状态时,此调用会正常恢复(无异常):
    • 如果作业仍处于新建状态(通过 Lazy 创建的协程初始是在 New 状态),此函数还会启动相应的协程
    • 此挂起函数可以取消,并且始终检查调用协程的作业是否已经取消。如果调用此挂起函数或挂起函数执行期间,调用此函数的作业被取消或完成,则此函数会抛出 CancellationException
    • 如果父协程在子协程上调用 join,并且子协程失败时,父协程会抛出 CancellationException。因为默认情况下,子协程失败会取消父协程,除非子协程是从 supervisorScope 中启动的
    • 调用 join 时可以选择性的与 onJoin 一起使用,使用 isCompleted 来检查此作业的完成情况而无需等待
    • 还有一个 cancelAndJoin 函数结合了 cancel 和 join
  • await():不阻塞线程,等待 Deferred 的结果值完成计算,并且在 Deferred 计算完成时继续其他代码的执行。返回结果值,或者在 Deferred 被取消的情况下抛出相应的异常:
    • 此挂起函数可以取消,并且始终检查调用协程的作业是否已经取消。如果调用此挂起函数或挂起函数执行期间,调用此函数的作业被取消或完成,则此函数会抛出 CancellationException
    • 这里有一个快速取消的保证。如果在此函数被挂起时作业被取消,它将无法成功恢复(resume)。详细信息参考 suspendCancellableCoroutine
    • 此函数可以在选择性调用中与 onAwait 子句一起使用。使用 isCompleted 来检查此延迟值的完成情况而无需等待

作业也就是我们前面说的 Job。

需要注意作业仅在其所有子作业完成时才会完成。

join 用于等待普通的 Job 作业,而 await() 用于等待 Job 的子类 Deferred 作业。

先看 Job,按照如下代码,虽然 job1 在前,但是由于它要挂起 500ms,因此 job1 实际上是最后才执行完的:

    fun test02() = runBlocking {
        val job1 = launch {
            delay(500)
            println("job1 finished.")
        }

        val job2 = launch {
            delay(200)
            println("job2 finished.")
        }

        val job3 = launch {
            delay(200)
            println("job3 finished.")
        }
    }

输出如下:

job2 finished.
job3 finished.
job1 finished.

假如想让 job1 先执行完,再执行后续任务,使用 join():

	fun test02() = runBlocking {
        val job1 = launch {
            delay(500)
            println("job1 finished.")
        }

        // 在其他协程启动前调用,如果放在 job2 和 job3 之后,由于后两个协程
        // 已经启动,则 job2 和 job3 无法等待 job1 结束
        job1.join()

        val job2 = launch {
            delay(200)
            println("job2 finished.")
        }

        val job3 = launch {
            delay(200)
            println("job3 finished.")
        }
    }

async 与 await 也是类似的:

	fun test03() = runBlocking {
        val job1 = async {
            delay(500)
            println("job1 finished.")
            "job1 result"
        }

        // Deferred 作为 Job 的子类肯定也能使用 join 方法,只不过没有返回值罢了
        job1.await()

        val job2 = async {
            delay(200)
            println("job2 finished.")
            // 任务结果
            "job2 result"
        }

        val job3 = async {
            delay(200)
            println("job3 finished.")
            // 任务结果
            "job3 result"
        }
    }

组合并发

(P22)使用 async + await 可以实现组合并发,即让两个异步任务并发执行,最后通过 await() 同步获取这两个并发任务的结果。

先看一般情况下,在非协程环境中执行两个挂起函数,需要顺序执行,总执行时间为两个函数各自执行时间之和:

	fun test05() = runBlocking {
        val time = measureTimeMillis {
            val one = doOne()
            val two = doTwo()
            println("The result:${one + two}")
        }
        // Completed in 2030 ms
        println("Completed in $time ms")
    }

    private suspend fun doOne(): Int {
        delay(1000)
        return 14
    }

    private suspend fun doTwo(): Int {
        delay(1000)
        return 25
    }

现在使用 async 将两个函数分别放在两个协程中并发执行,最后通过 await 同步收集结果,再看执行时间大概就是一个函数的执行时间:

	fun test05() = runBlocking {
        val time = measureTimeMillis {
            // 开一个协程,执行挂起函数 doOne,挂起时继续执行下面代码
            val deferred1 = async { doOne() }
            // 也开一个协程,指定挂起函数 doTwo,挂起时继续执行下面代码
            val deferred2 = async { doTwo() }
            // 由于两个 Deferred 都调用 await 等待结果,所以等到两个结果后才进行相加
            // 并输出结果,由于 deferred1 在执行时,deferred2 也紧随其后执行,所以
            // 两个任务算是并发执行的,而不是串行执行的,节省了大量时间
            println("The result:${deferred1.await() + deferred2.await()}")
        }
        // Completed in 1029 ms
        println("Completed in $time ms")
    }

这样缩短了任务执行时间,但是需要注意不要写成如下这种形式:

	fun test05() = runBlocking {
        val time = measureTimeMillis {
            val one = async { doOne() }.await()
            val two = async { doTwo() }.await()
            println("The result:${one + two}")
        }
        // Completed in 2043 ms
        println("Completed in $time ms")
    }

这种就又是同步代码了,因为在 async 内执行 doOne 后加了 await 等待协程结果,只有等到了该结果后才会继续向下执行 doTwo。

2、协程启动模式与作用域构建器

2.1 启动模式

(P23)协程启动模式,实际与 Job 生命周期有紧密联系。

通过 async 和 launch 这些协程构建器构建协程对象时,可以传入参数指定协程的启动模式:

public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    // 启动模式,默认为 DEFAULT 模式
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyDeferredCoroutine(newContext, block) else
        DeferredCoroutine<T>(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

CoroutineStart 枚举类中定义了四种协程启动模式:

  1. DEFAULT:默认情况,协程创建后立即开始调度。执行前如果协程被取消,其将直接进入取消响应的状态
  2. LAZY:只有协程被需要时,包括主动调用协程的 start、join、await 等函数才会开始调度,如果调度前被取消,协程会直接进入异常结束状态
  3. ATOMIC:协程创建后立即开始调度,协程执行到第一个挂起点之前不响应取消
  4. UNDISPATCHED:协程创建后立即在当前函数调用栈中执行,直到遇到第一个真正挂起的点

稍作解释:

  • DEFAULT:结合本文 1.1 节中 Job 的生命周期和状态机图例,DEFAULT 模式创建的协程在创建后会直接进入活动(Active)状态。调度就是指安排线程来执行这个协程任务,如果在执行任务前取消协程,协程会先进入取消/取消响应(Cancelling)状态,然后才进入已取消(Cancelled)状态

  • LAZY:创建后协程进入新建(New)状态,在需要启动协程时,手动调用 start、join、await 等函数才会开启调用以运行协程

  • ATOMIC:与 Default 类似,只不过在执行到第一个挂起点(挂起函数)之前不会响应取消

  • UNDISPATCHED:与前面几种不同的是,前面说的都是调度任务,调度不代表立即执行,而 UNDISPATCHED 就是在当前函数调用栈中立即执行。看如下示例代码:

    	fun test06() = runBlocking {
            val job = async(context = Dispatchers.IO, start = CoroutineStart.UNDISPATCHED) {
                // thread:Test worker @coroutine#2
                println("thread:${Thread.currentThread().name}")
                delay(1000)
                // thread:DefaultDispatcher-worker-1 @coroutine#2
                println("thread:${Thread.currentThread().name}")
            }
        }
    

    由于 runBlocking 是在测试环境中运行,因此 aysnc 内第一次打印出的就是测试环境的线程,然后执行 delay 遇到了第一个挂起点再切换到 Dispatchers.IO 指定的子线程中。假如你是在实际项目的主线程中为协程指定 CoroutineStart.UNDISPATCHED 这种启动模式,那么第一次打印会输出主线程 main @coroutine#2

2.2 作用域构建器

(P24)协程作用域构建器实际上是结构化并发的相关知识,Kotlin 提供了两个协程作用域构建器,主要用于管理协程作用域(控制协程之间的关系)和异常处理:

  1. coroutineScopecoroutineScope 是一个挂起函数,用于创建一个协程作用域,它会等待所有启动的子协程执行完毕。如果其中任何一个子协程失败,coroutineScope 会取消其余的子协程
  2. supervisorScopesupervisorScope 也是一个挂起函数,用于创建一个作用域,其中的子协程可以独立失败而不会影响其他子协程。当一个子协程由于异常而失败时,supervisorScope 会继续执行其它子协程

coroutineScope 与 runBlocking 对比:

  • runBlocking 是常规函数,coroutineScope 是挂起函数
  • 二者都会等待自身的协程体以及所有子协程结束,主要区别在于 runBlocking 会阻塞当前线程一直等待,而 coroutineScope 只是挂起,会释放底层线程用于其他用途

3、取消

3.1 Job 的生命周期

讲取消之前先了解 Job 的生命周期

(P25)Job 生命周期

实际上 1.1 一节中已经贴出的官方的 Job 生命周期和状态机,这里算是再总结一下:

  • launch 和 async 创建协程会返回 Job 实例,该实例是协程的唯一标识,负责管理协程的生命周期
  • Job 的状态:新建(New)、活动(Active)、完成中(Completing)、已完成(Completed)、取消中(Cancelling)、已取消(Cancelled),这些状态无法直接访问,但是可以通过 Job 的属性获取:isActive、isCancelled、isCompleted

状态机转换图:

                                          wait children
    +-----+ start  +--------+ complete   +-------------+  finish  +-----------+
    | New | -----> | Active | ---------> | Completing  | -------> | Completed |
    +-----+        +--------+            +-------------+          +-----------+
                     |  cancel / fail       |
                     |     +----------------+
                     |     |
                     V     V
                 +------------+                           finish  +-----------+
                 | Cancelling | --------------------------------> | Cancelled |
                 +------------+                                   +-----------+

协程在 Active 或 Completing 状态下,如果调用了 Job.cancel 取消函数或者运行出错,都会将 Job 置为取消中(Cancelling)状态,此时 isActive = false,isCancelled = true。当所有子协程完成后会进入已取消(Cancelled)状态,此时 isCompleted = true。因此我们有时候说协程完成,包含了正常的任务执行完毕,也包含 cancel/fail 这种异常完成。

3.2 协程的取消

协程的取消:

  • 取消作用域会取消其所有子协程
  • 取消单个子协程不会影响其余兄弟协程
  • 协程通过抛出一个特殊异常 CancellationException 来处理取消操作
  • 所有 kotlinx.coroutines 中的挂起函数(withContext、delay 等)都是可取消的

(P26)取消作用域

在进入取消作用域代码演示之前,先来看一段错误的演示代码(这段演示放在 P24 中才合适):

	@Test
	fun test07() = runBlocking {
        val coroutineScope = CoroutineScope(Dispatchers.Default)
        coroutineScope.launch {
            delay(1000)
            println("job1")
        }
        coroutineScope.launch {
            delay(1000)
            println("job2")
        }
    }

上述代码看起来似乎没什么问题,但是在运行时会抛出 InvalidTestClassError,原因是我们在单元测试类中写的这个函数,被测试的函数不能有返回值,必须都是 void 的,因此 runBlocking 的泛型必须显式的指定为 Unit:

	@Test
    fun test07() = runBlocking<Unit> {
        val coroutineScope = CoroutineScope(Dispatchers.Default)
        coroutineScope.launch {
            delay(1000)
            println("job1")
        }
        coroutineScope.launch {
            delay(1000)
            println("job2")
        }
    }

当然,这样写不会报错了,但是你会发现两个子协程的内容没有输出。这是因为主协程没有等待两个子协程执行完就结束了。

这里就会有两个疑问:

  1. 为什么之前的那么多示例不用显式为 runBlocking 指定 Unit?
  2. 为什么之前的示例,在 runBlocking 中开启的子协程就能够执行完?

第一个疑问,刚刚说过,被测试的方法不能有返回值,返回值类型为 void,Kotlin 中应为 Unit。由于 coroutineScope.launch 会返回 Job 对象,如不显式指定 Unit,Kotlin 会将 Job 推断为 runBlocking 的返回值进而报错。前面的测试方法没有报错,刚好是因为那些方法的 runBlocking 的最后一个语句没有返回值。

第二个疑问,是因为 runBlocking 内新建的作用域对象 coroutineScope 没有继承 runBlocking 的上下文,而是指定 Dispatchers.Default 作为上下文,所以 runBlocking 不会因为 coroutineScope 启动的两个子协程而阻塞线程。

CoroutineScope 构造函数传入的就是 CoroutineContext,而 Dispatchers 内定义的四种调度器都是 CoroutineContext 的子类。比如 Dispatchers.Default 是一个 DefaultScheduler,DefaultScheduler 继承自 SchedulerCoroutineDispatcher,后者又继承自抽象类 ExecutorCoroutineDispatcher,后续父类或接口依次为:CoroutineDispatcher、AbstractCoroutineContextElement、Element,其中 Element 接口就是 CoroutineContext 的子接口。

所以想要让 runBlocking 等待 coroutineScope 启动的子协程执行完毕,需要让它挂起一段时间:

	@Test
    fun test07() = runBlocking<Unit> {
        val coroutineScope = CoroutineScope(Dispatchers.IO)
        coroutineScope.launch {
            delay(1000)
            println("job1")
        }
        coroutineScope.launch {
            delay(1000)
            println("job2")
        }

        delay(1200)
    }

这样能看到两个协程的输出内容。想取消就直接调用 cancel 即可:

	@Test
    fun test07() = runBlocking<Unit> {
        val coroutineScope = CoroutineScope(Dispatchers.IO)
        val job1 = coroutineScope.launch {
            delay(1000)
            println("job1")
        }
        val job2 = coroutineScope.launch {
            delay(1000)
            println("job2")
        }

        // 取消该作用域下的所有协程
        coroutineScope.cancel()
        
        // 等待 job1 和 job2 执行完毕的正规方式应该是 job1.join() 和 job2.join()
        delay(1200)
    }

取消作用域会取消其所有子协程。

(P27)如果想取消兄弟协程,那么就调用 job1.cancel()job2.cancel(),取消其中一个 Job 不会影响兄弟协程。

coroutineScope 与 CoroutineScope 的区别:

  • CoroutineScope 是一个接口,用于定义协程作用域。它通常由具有协程作用域的类实现(如 GlobalScopeMainScope 等),用于管理协程的生命周期
  • coroutineScope 是一个挂起函数,用于在一个协程中(以方便的方式)创建一个新的协程作用域。它会继承外部协程的上下文,并等待作用域内的所有协程执行完毕才会继续执行

3.3 协程取消的异常

(P28)协程取消会抛出 CancellationException:

	fun test08() = runBlocking {
        val job = GlobalScope.launch {
            delay(1000)
            println("job1")
        }

        job.cancel()

        // GlobalScope 启动的协程也不在 runBlocking 的上下文中,
        // 所以需要通过 join 让主线程等待 job 执行完毕
        job.join()
    }

以上代码会正常的取消 job 表示的协程,而不会抛出异常。因为外部在处理时会认为 CancellationException 是取消时的正常情况,因此就静默处理了。如果想拿出 CancellationException 看一下的话,可以通过 try-catch:

	fun test08() = runBlocking {
        val job = GlobalScope.launch {
            try {
                delay(1000)
                println("job1")
            } catch (e: Exception) {
                e.printStackTrace()
            }
        }

        // 挂起一小段时间再取消,不马上取消
        delay(100)
        job.cancel()

        // GlobalScope 启动的协程也不在 runBlocking 的上下文中,
        // 所以需要通过 join 让主线程等待 job 执行完毕
        job.join()
    }

这样运行代码会看到如下异常信息:

kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job="coroutine#2":StandaloneCoroutine{Cancelling}@1ce92cf7
	at kotlinx.coroutines.JobSupport.cancel(JobSupport.kt:1579)
	at kotlinx.coroutines.Job$DefaultImpls.cancel$default(Job.kt:183)
	at com.coroutine.basic.CoroutineTest01$test08$1.invokeSuspend(CoroutineTest01.kt:158)
	(Coroutine boundary)
	at com.coroutine.basic.CoroutineTest01$test08$1$job$1.invokeSuspend(CoroutineTest01.kt:148)
...

cancel 可以传一个 CancellationException 参数,那么取消时就抛出该异常对象:

	fun test08() = runBlocking {
        val job = GlobalScope.launch {
            try {
                delay(1000)
                println("job1")
            } catch (e: Exception) {
                e.printStackTrace()
            }
        }

        // 挂起一小段时间再取消,不马上取消
        delay(100)
        job.cancel(CancellationException("取消"))

        // GlobalScope 启动的协程也不在 runBlocking 的上下文中,
        // 所以需要通过 join 让主线程等待 job 执行完毕
        job.join()
    }

catch 到的异常如下:

java.util.concurrent.CancellationException: 取消
	at com.coroutine.basic.CoroutineTest01$test08$1.invokeSuspend(CoroutineTest01.kt:158)
	(Coroutine boundary)
	at com.coroutine.basic.CoroutineTest01$test08$1$job$1.invokeSuspend(CoroutineTest01.kt:149)
...

需要说明的是,调用 cancel 后的 join 还是需要的,因为 runBlocking 中后续可能还会执行其他的代码,这里加一个 join 意思就是你等待我取消完成后再执行其他代码。当然,Kotlin 官方提供了 cancelAndJoin 函数将 cancel 和 join 操作组合起来了:

public suspend fun Job.cancelAndJoin() {
    cancel()
    return join()
}

3.4 CPU 密集型任务取消

CPU 密集型任务取消会用到以下三个属性或函数:

  • (P29)isActive 是 CoroutineScope 的扩展属性,用于检查 Job 是否处于活动状态(该属性实际上是 Job 的 isActive 属性的快捷方式)
  • (P30)ensureActive() 如果在 Job 处于非活动状态时被调用会立即抛出异常
  • (P31)yield() 会检查所在协程的状态,如果已取消,则抛出 CancellationException 予以响应。此外,它还会尝试让出线程的执行权,给其他协程提供执行机会

CPU 密集型任务不是简单调用一下 cancel 就能取消的,来看个例子:

	fun test09() = runBlocking {
        val startTime = System.currentTimeMillis()
        val job = launch(Dispatchers.Default) {
            var nextPrintTime = startTime
            var i = 0
            while (i < 5) {
                if (System.currentTimeMillis() >= nextPrintTime) {
                    println("job: I'm sleeping ${i++} ...")
                    nextPrintTime += 500L
                }
            }
        }

        delay(1300)
        println("main: I'm tired of waiting!")
        job.cancelAndJoin()
        println("main: Now I can quit.")
    }

运行程序发现没能取消:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm sleeping 3 ...
job: I'm sleeping 4 ...
main: Now I can quit.

这里没取消掉的根本原因是协程内部没有调用任何挂起函数,只有调用挂起函数才有状态检查,才能顺利取消协程。比如在协程中调用一个挂起函数 delay(1):

	fun test09() = runBlocking {
        val startTime = System.currentTimeMillis()
        val job = launch(Dispatchers.Default) {
            var nextPrintTime = startTime
            var i = 0
            while (i < 5) {
                if (System.currentTimeMillis() >= nextPrintTime) {
                    println("job: I'm sleeping ${i++} ...")
                    nextPrintTime += 500L
                }
                delay(1)
            }
        }

        delay(1300)
        println("main: I'm tired of waiting!")
        job.cancelAndJoin()
        println("main: Now I can quit.")
    }

那么无需其他额外操作即可取消协程:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.

那么什么情况下需要将函数声明为挂起函数呢?

  • 与异步操作相关:当函数需要执行涉及异步操作的代码时,比如网络请求、文件 I/O、定时器等,通常需要将函数声明为挂起函数,以便在异步操作完成前暂停执行
  • 与协程相关:如果函数需要调用其他挂起函数,或者需要在协程中执行(并不是所有在协程中执行的函数都是挂起函数),通常也需要将函数声明为挂起函数。这样可以保证在函数内部可以使用协程构建器(如 launchasync)来创建新的协程
  • 与延迟操作相关:当函数需要使用 delay 函数来引入延迟,暂停协程的执行一段时间后再恢复时,通常需要将函数声明为挂起函数
  • 与协程作用域相关:当函数需要在协程作用域内执行,并且需要等待作用域内的其他协程完成后再继续执行时,应该将函数声明为挂起函数

所以你能看到,进行大量数据计算的 CPU 密集型任务无需声明为挂起函数,所以 test09 的 launch 中运行的代码可以近似的认为在进行大量数据计算,是一个 CPU 密集型任务。取消这种任务除了调用 cancel 还需额外操作(因为 cancel 就取消协程太过简单,Kotlin 想要保护正在计算的数据,因此增加了取消协程的操作)。

第一种额外操作就是手动增加 isActive 的判断,意思是只有在处于活动状态时才进行数据操作,如处于非活动状态则不进行操作:

	fun test09() = runBlocking {
        val startTime = System.currentTimeMillis()
        val job = launch(Dispatchers.Default) {
            var nextPrintTime = startTime
            var i = 0
            while (i < 5 && isActive) {
                if (System.currentTimeMillis() >= nextPrintTime) {
                    println("job: I'm sleeping ${i++} ...")
                    nextPrintTime += 500L
                }
            }
        }

        delay(1300)
        println("main: I'm tired of waiting!")
        job.cancelAndJoin()
        println("main: Now I can quit.")
    }

第二种额外操作是使用 ensureActive():

	fun test09() = runBlocking {
        val startTime = System.currentTimeMillis()
        val job = launch(Dispatchers.Default) {
            var nextPrintTime = startTime
            var i = 0
            while (i < 5) {
                ensureActive()
                if (System.currentTimeMillis() >= nextPrintTime) {
                    println("job: I'm sleeping ${i++} ...")
                    nextPrintTime += 500L
                }
            }
        }

        delay(1300)
        println("main: I'm tired of waiting!")
        job.cancelAndJoin()
        println("main: Now I can quit.")
    }

ensureActive() 实际上也是在判断 isActive 属性,只不过它会在 isActive = false 时抛出 CancellationException:

// CoroutineScope.kt
public fun CoroutineScope.ensureActive(): Unit = coroutineContext.ensureActive()

// Job.kt
public fun CoroutineContext.ensureActive() {
    get(Job)?.ensureActive()
}

public fun Job.ensureActive(): Unit {
    if (!isActive) throw getCancellationException()
}

前面说过,CancellationException 会被静默处理,因此通过 ensureActive() 也可以取消 CPU 密集型任务,只不过提供了对 CancellationException 的进一步处理。

最后一种方式是 yield,会出让线程的执行权,适用于 CPU 计算特别密集,可能会耗尽 CPU 资源的情况,此时出让调度器所在的线程(池)执行权,让所有线程(包括自己)重新竞争 CPU。同时,由于 yield 内部会调用 ensureActive(),因此它也有与 ensureActive() 相同的功能:

	fun test09() = runBlocking {
        val startTime = System.currentTimeMillis()
        val job = launch(Dispatchers.Default) {
            var nextPrintTime = startTime
            var i = 0
            while (i < 5) {
                yield()
                if (System.currentTimeMillis() >= nextPrintTime) {
                    println("job: I'm sleeping ${i++} ...")
                    nextPrintTime += 500L
                }
            }
        }

        delay(1300)
        println("main: I'm tired of waiting!")
        job.cancelAndJoin()
        println("main: Now I can quit.")
    }

三种方式的最终输出都如下:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.

3.5 协程取消的副作用

(P32)取消协程时,如果协程运行的是网络或者 IO 任务,取消后需要释放相关资源,可以在 finally 中释放:

	fun test10() = runBlocking {
        val job = launch(Dispatchers.Default) {
            try {
                repeat(1000) { i ->
                    println("job: I'm sleeping $i ...")
                    delay(500L)
                }
            } finally {
                // 在这里释放资源
                println("job: I'm running finally")
            }
        }

        delay(1300L)
        println("main: I'm tired of waiting!")
        job.cancelAndJoin()
        println("main: Now I can quit.")
    }

输出结果:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm running finally
main: Now I can quit.

(P33)标准函数 use 可以用于资源释放,它只能被实现了 Closeable 的对象使用,程序结束时会自动调用 close 方法,适合文件对象:

	fun test11() = runBlocking {
        withContext(Dispatchers.IO) {
            BufferedReader(FileReader("D:\\test.txt")).use {
                var line: String?
                while (true) {
                    line = it.readLine() ?: break
                    print(line)
                }
            }
        }
    }

以上只是协程取消造成的副作用之一,其实还会有其他副作用,具体取决于正在执行的操作以及代码的实现方式。以下是一些常见的协程取消可能带来的副作用:

  1. 资源泄漏:如果协程被取消时没有正确释放资源(如文件描述符、网络连接、锁等),可能会导致资源泄漏问题。
  2. 不一致的状态:取消可能会导致对象或数据结构处于未定义的状态,这可能会导致应用程序的错误行为。
  3. 线程安全问题:如果取消导致共享数据结构的不一致性,可能会引发线程安全问题,如竞态条件或死锁。
  4. 未完成的操作:如果协程在执行某些操作时被取消,可能会导致这些操作无法完成,这可能会影响应用程序的正确性。
  5. 回调泄漏:如果协程在执行回调时被取消,可能会导致回调泄漏,即回调持有对已释放资源的引用。
  6. 异常处理:取消可能导致未捕获的异常,这可能会导致应用程序崩溃或未定义的行为。
  7. 性能开销:频繁的取消可能会引起性能开销,因为协程框架需要管理取消操作并清理相关资源。

3.6 不能被取消的代码块

(P34)不能被取消的任务,课程命名不严谨,应该是不能被取消的代码块,因为前面在关闭资源时会使用 finally 代码块。一般情况下,良好的关闭操作都是非阻塞的,不涉及挂起函数。但极少数情况下可能需要在已取消的协程中挂起:

	fun test12() = runBlocking {
        val job = launch(Dispatchers.Default) {
            try {
                repeat(1000) { i ->
                    println("job: I'm sleeping $i ...")
                    delay(500L)
                }
            } finally {
                println("job: I'm running finally")
                // 在已取消的协程中挂起时,会抛出 CancellationException
                delay(1000L)
                println("job: And I've just delayed for 1 sec because I'm non-cancellable")
            }
        }

        delay(1300L)
        println("main: I'm tired of waiting!")
        job.cancelAndJoin()
        println("main: Now I can quit.")
    }

并不会输出 finally 中 delay 之后的 Log,因为在已取消的协程中挂起会抛出 CancellationException,虽然该异常被静默处理,但是之后的代码都无法被执行:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm running finally
main: Now I can quit.

解决办法是使用 withContext(NonCancellable) 来包装相应代码:

	fun test12() = runBlocking {
        val job = launch(Dispatchers.Default) {
            try {
                repeat(1000) { i ->
                    println("job: I'm sleeping $i ...")
                    delay(500L)
                }
            } finally {
                withContext(NonCancellable) {
                    println("job: I'm running finally")
                    delay(1000L)
                    println("job: And I've just delayed for 1 sec because I'm non-cancellable")
                }
            }
        }

        delay(1300L)
        println("main: I'm tired of waiting!")
        job.cancelAndJoin()
        println("main: Now I can quit.")
    }

输出如下:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm running finally
job: And I've just delayed for 1 sec because I'm non-cancellable
main: Now I can quit.

withContext 的作用就是使用给定的协程上下文执行闭包代码(该闭包是一个挂起函数)。

简言之,协程取消后还想调用挂起函数需要置于 NonCancellable 这个协程上下文环境中。

3.7 超时任务

(P35)很多情况下协程取消的理由是其有可能超时,通过 withTimeout 或 withTimeoutOrNull 可以设置协程超时时间:

	fun test13() = runBlocking {
        withTimeout(1300) {
            repeat(1000) { i ->
                println("job: I'm sleeping $i ...")
                delay(500L)
            }
        }
    }

withTimeout 会在到时后抛出 TimeoutCancellationException:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...

Timed out waiting for 1300 ms
kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms
	(Coroutine boundary)
	at com.coroutine.basic.CoroutineTest01$test13$1$1.invokeSuspend(CoroutineTest01.kt:256)
	at com.coroutine.basic.CoroutineTest01$test13$1.invokeSuspend(CoroutineTest01.kt:253)

而 withTimeoutOrNull 在到时后不会抛出异常,而是返回 null:

	fun test14() = runBlocking {
        val result = withTimeoutOrNull(1300) {
            repeat(1000) { i ->
                println("job: I'm sleeping $i ...")
                delay(500L)
            }
            "Done"
        }
        
        println("result:$result")
    }

输出结果:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
result:null

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/943608.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

kong网关使用pre-function插件,改写接口的返回数据

一、背景 kong作为api网关&#xff0c;除了反向代理后端服务外&#xff0c;还可对接口进行预处理。 比如本文提及的一个小功能&#xff0c;根据http header某个字段的值&#xff0c;等于多少的时候&#xff0c;返回一个固定的报文。 使用到的kong插件是pre-function。 除了上…

群落生态学研究进展▌Hmsc包对于群落生态学假说的解读、Hmsc包开展单物种和多物种分析的技术细节及Hmsc包的实际应用

HMSC&#xff08;Hierarchical Species Distribution Models&#xff09;是一种用于预测物种分布的统计模型。它在群落生态学中的应用广泛&#xff0c;可以帮助科学家研究物种在不同环境条件下的分布规律&#xff0c;以及预测物种在未来环境变化下的潜在分布范围。 举例来说&a…

MacroSan 2500_24A配置

双控制器电源同时按下,切记/切记/切记 默认信息 默认地址:192.168.0.210 输入ODSP授权后设置密码## 配置端口 物理资源–>设备–>网口–>eth-1:0:0或eth-2:0:0 创建存储池 存储资源–>存储池 介质类型:混合(支持机械及SSD)全闪(仅支持SSD) RAID类型:CRAID-P(基于磁…

SQL-leetcode-180. 连续出现的数字

180. 连续出现的数字 表&#xff1a;Logs -------------------- | Column Name | Type | -------------------- | id | int | | num | varchar | -------------------- 在 SQL 中&#xff0c;id 是该表的主键。 id 是一个自增列。 找出所有至少连续出现三次的数字。 返回的…

ISDP010_基于DDD架构实现收银用例主成功场景

信息系统开发实践 &#xff5c; 系列文章传送门 ISDP001_课程概述 ISDP002_Maven上_创建Maven项目 ISDP003_Maven下_Maven项目依赖配置 ISDP004_创建SpringBoot3项目 ISDP005_Spring组件与自动装配 ISDP006_逻辑架构设计 ISDP007_Springboot日志配置与单元测试 ISDP008_SpringB…

ElementPlus 自定义封装 el-date-picker 的快捷功能

文章目录 需求分析 需求 分析 我们看到官网上给出的案例如下&#xff0c;但是不太满足我们用户想要的快捷功能&#xff0c;因为不太多&#xff0c;因此需要我们自己封装一些&#xff0c;方法如下 外部自定义该组件的快捷内容 export const getPickerOptions () > {cons…

怎么模仿磁盘 IO 慢的情况?并用于MySQL进行测试

今天给大家分享一篇在测试环境或者是自己想检验自己MySQL性能的文章 实验环境&#xff1a; Rocky Linux 8 镜像&#xff1a;Rocky-8.6-x86_64-dvd.iso 1. 创建一个大文件作为虚拟磁盘 [rootlocalhost ~] dd if/dev/zero of/tmp/slowdisk.img bs1M count100 记录了1000 的读入…

C++--------继承

一、继承的基本概念 继承是 C 中的一个重要特性&#xff0c;它允许一个类&#xff08;派生类或子类&#xff09;继承另一个类&#xff08;基类或父类&#xff09;的属性和方法。这样可以实现代码的重用和建立类之间的层次关系。 #include <iostream>// 基类 class Base…

Ubuntu24.04安装NVIDIA驱动及工具包

Ubuntu24.04安装NVIDIA驱动及工具包 安装nvidia显卡驱动安装cuda驱动安装cuDNN安装Anaconda 安装nvidia显卡驱动 NVIDIA 驱动程序&#xff08;NVIDIA Driver&#xff09;是专为 NVIDIA 图形处理单元&#xff08;GPU&#xff09;设计的软件&#xff0c;它充当操作系统与硬件之间…

探秘“香水的 ChatGPT”:AI 开启嗅觉奇幻之旅!

你没有看错&#xff0c;AI也能闻到味道了&#xff01;这是一家名为Osmo公司公布的信息&#xff0c;他们成功创造出了由AI生成的李子味道&#xff0c;快跟着小编一探究竟吧~ 【图片来源于网络&#xff0c;侵删】 Osmo公司的这项技术&#xff0c;通过分析香味的化学成分和人类嗅…

vue之axios基本使用

文章目录 1. axios 网络请求库2. axiosvue 1. axios 网络请求库 <body> <input type"button" value"get请求" class"get"> <input type"button" value"post请求" class"post"> <!-- 官网提供…

高通 ISP pipeline

目录 ISP理解&#xff1a; 1. IFE : Image front-end engine&#xff08;图像前端引擎&#xff09; 1.1 相关特点 1.2 IFE作用 2. BPS : Bayer processing segment&#xff08;拜耳加工段&#xff09; 2.1 相关特点 2.2 BPS基本概念 2.3 BPS与IFE区别&#xff1a; 3. …

Linux知识点回顾(期末提分篇)

前言&#xff1a;本篇文章为WK学子量身打造&#xff0c;其余读者也可根据题目进行巩固提升。 目录 前言&#xff1a;本篇文章为WK学子量身打造&#xff0c;其余读者也可根据题目进行巩固提升。 一、Linux的内核版本每一部分的含义 二、查看当前系统中所有用户的详细信息的文…

【图像处理lec10】图像压缩

目录 一、图像压缩基础 1、图像压缩的基本概念 2、数据冗余与压缩比 3、三种主要的数据冗余类型 4、保真度评估标准&#xff08;Fidelity Criteria&#xff09; 5、应用与实践 二、图像压缩模型 1、图像压缩模型概述 &#xff08;1&#xff09;压缩系统的结构 &#…

Java和Go语言的优劣势对比

文章目录 Java和Go语言的优劣势对比一、引言二、设计哲学与语法特性1、设计哲学2、语法特性 三、性能与内存管理1、性能2、内存管理和垃圾回收 四、并发编程模型五、使用示例1、Go语言示例代码2、Java语言示例代码 六、对比表格七、总结 Java和Go语言的优劣势对比 一、引言 在…

CH340系列芯片驱动电路·CH340系列芯片驱动!!!

目录 CH340基础知识 CH340常见类型 CH340引脚功能讲解 CH340驱动电路 CH340系列芯片数据手册 编写不易&#xff0c;仅供学习&#xff0c;请勿搬运&#xff0c;感谢理解 常见元器件驱动电路文章专栏连接 LM7805系列降压芯片驱动电路降压芯片驱动电路详解-CSDN博客 ME62…

[Python3] Sanic中间件

在 Sanic 中&#xff0c;中间件&#xff08;middleware&#xff09;是指在请求和响应之间执行的代码。它们是一个非常强大的工具&#xff0c;用于处理请求的预处理、响应的后处理、全局错误处理、日志记录、认证、权限校验、跨域资源共享&#xff08;CORS&#xff09;等任务。中…

pikachu靶场搭建详细步骤

一、靶场下载 点我去下载 二、靶场安装 需要的环境&#xff1a; mysqlApaches&#xff08;直接使用小皮面板Phpstudy&#xff1a;https://www.xp.cn/&#xff09;&#xff0c;启动他们 设置网站&#xff0c;把靶场的路径对应过来 对应数据库的信息 由于没有核对数据库的信…

Goland 安装与使用

GoLand安装 官方网址&#xff1a; JetBrains GoLand&#xff1a;不只是 Go IDE 1. 进入官网&#xff0c;点击下载&#xff1a; ​ 2. 如下图一步步安装 ​ ​ ​ ​ ​ 3. 如下图一步步安装

计算属性 简写和 完整写法

计算属性渲染不加上括号 methods方法和computed属性区别&#xff1a; computed只计算一次&#xff0c;然后缓存&#xff0c;后续直接拿出来使用&#xff0c;而methods每次使用每次计算&#xff0c;不会缓存 计算属性完整写法&#xff1a; 既获取又设置 slice 截取 成绩案例 …