欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

结构化取消 Kotlin 并发程序 - 取消并发程序

最编程 2024-10-14 08:03:39
...

官方文档

协程的取消和线程取消的套路是一样的,都是通过发送一个取消的信号,然后在合适的时机检测这个信号,做出相应的处理来实现取消。

在线程中

  1. 我们通过 thread.interrupt() 方法来给线程发送一个中断信号,线程的 isInterrupted 属性会变为 true
  2. 然后在合适的时机检测中断信号(isInterrupted),做出相应的处理来实现线程的取消。

在协程中

  1. 我们通过 job.cancel() 方法来传递一个取消的信号,协程的 isActive 属性会变为 false
    ,同时,改协程的所有子协程的cancel()方法也会被调用。
  2. 然后在合适的时机检测协程的活跃状态(isActive),做出相应的处理来实现协程的取消。

Job.cancel()

Job 接口提供了一个 cancel() 方法,用于取消协程的执行,取消后,协程的 isActive 属性会变为 false

我们还是先来看一个不配合取消的例子:

fun main() = runBlocking {
    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        while (true) {
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("coroutine is running:${nextPrintTime}")
                nextPrintTime += 500L
            }
        }
    }
    delay(2000)
    println("取消协程")
    job.cancel()//取消协程
    job.join()//等待协程执行完毕
    println("协程执行完毕")
}

运行结果:
在这里插入图片描述
可以看到,我们调用了 job.cancel() 方法,但是协程并没有停止,原因也很简单,跟线程的套路一样,是因为我们没有配合取消,因此,代码还是会继续执行下去。

配合 isActive 取消

下面再来看看如何配合取消:

fun main() = runBlocking {
    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        while (true) {
            if (!isActive){
                //isActive是协程的一个扩展属性,用来判断协程是否处于活跃状态,结束协程的执行跟线程不一样,结束协程是通过抛出CancellationException异常来实现的
                throw kotlinx.coroutines.CancellationException()
            }

            if (System.currentTimeMillis() >= nextPrintTime) {
                println("coroutine is running:${nextPrintTime}")
                nextPrintTime += 500L
            }
        }
    }
    delay(2000)
    println("取消协程")
    job.cancel()//取消协程
    job.join()//等待协程执行完毕
    println("协程执行完毕")
}

这里要注意的是 直接在 scope 中调用的 isActive 实际上是一个扩展属性,本质上还是调用的 job.isActive,用于判断协程是否处于活跃状态。

在这里插入图片描述
运行结果:
在这里插入图片描述

可以看到,协程的取消跟线程的取消套路是一样的,协程是要主动检测一下是否处于活跃状态,然后做出相应的处理来配合取消的。

但是,跟线程不一样的是,结束协程是要通过抛 CancellationException 的方式来结构化的取消协程,而不是仅仅结束掉当前代码块,这一点一定需要注意。

可以到这里会有疑问,为啥要通过抛 CancellationException 来结束协程呢?还有,都抛异常了,又没有对异常做处理,为什么程序没有崩溃呢?

实际上,这个 CancellationException 是一个特殊地异常,它不会导致程序崩溃,而是会被协程内部捕获,然后做出取消的相关处理。

取消流程可以看作是特殊的异常流程,异常流程比较复杂,下一篇文章会详细讲解。
感兴趣的可以先看下官方文档:协程的异常 了解一下。

方便的配合取消:ensureActive()

kotlin 协程提供了一个更方便的检查协程状态并配合取消的方法: ensureActive()

它的作用就是检查协程是否处于活跃状态,如果不是,那么就抛出一个 CancellationException 异常。

ensureActive() 是一个扩展方法,可以通过下面几种方式调用:

  • CoroutineScope.ensureActive()
  • coroutineContext.ensureActive()
  • Job.ensureActive()

我们来看下源码:
在这里插入图片描述
上面说到,我们为了配合 cancel(),流程基本都是检查协程是否处于活跃状态,如果不是活跃状态,那么就抛出一个 CancellationException 异常的方式来取消协程。

这正是 ensureActive() 内部的逻辑,所以,我们可以使用 ensureActive() 来简化我们的代码。
例如:

fun main() = runBlocking {
    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        while (true) {
            //检查协程的取消状态
            ensureActive()
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("coroutine is running:${nextPrintTime}")
                nextPrintTime += 500L
            }
        }
    }
    delay(2000)
    println("取消协程")
    job.cancel()//取消协程
    job.join()//等待协程执行完毕
    println("协程执行完毕")
}

效果跟我们手动检查 isActive 然后抛出 CancellationException 异常是一样的。

CancellationException 异常处理

在上面线程的示例中,我们还提到了 InterruptedException 异常,在线程中例如 sleepwaitjoin 等方法都会抛出 InterruptedException 异常,而且我们需要对 InterruptedException 异常进行处理,否则可能会导致程序崩溃。

那么,协程中是否也有类似的概念呢?

答案是肯定的,上面已经提到了,就是 CancellationException 异常。

在协程中 除了之前文章提到过的 suspendCoroutine 函数之外,其它所有的 挂起函数(suspend)
内部都会在关键位置检查协程的状态,如果协程处于非活跃状态,则抛出 CancellationException异常。

例如: delaywithTimeoutwithContext ,suspendCancellableCoroutine,job.join,deffer.await 等等。

这里以job.join为例,来看下源码:

在这里插入图片描述

可以看到,join 方法内部会主动调用 ensureActive() 方法来检查协程是否处于活跃状态。其它的挂起函数也都有类似的逻辑。

但是协程内部是对 CancellationException异常进行了特殊处理,所以我们在协程中调用这些挂起函数时,在没有需求的情况下,不需要我们手动处理 CancellationException
,因为 CancellationException 只会导致协程结构化的取消而不会导致程序崩溃,这点会比线程要方便很多了。

代码示例:

/*挂起函数自动会配合取消*/
fun main() = runBlocking {
    val job = launch(Dispatchers.IO) {
        while (true) {
            println("coroutine is running")/*
            * delay函数是一个挂起函数,它会检查协程的取消状态,如果协程被取消了,那么delay函数会抛出CancellationException异常。
            * 这个异常协程内部做了特殊处理,不会导致程序崩溃,只是用于取消协程吗,所以一般无需通过try-catch来捕获这个异常
            * */
            delay(500)
        }
    }
    delay(2000)
    println("cancel coroutine")
    job.cancel()//取消协程
}

执行结果:

在这里插入图片描述

可以看到,我们没有手动的处理 CancellationException 异常,也没有手动的配合 cancel
但是协程内部对这个异常做了处理,所以我们不需要担心这个异常会导致程序崩溃。

反而如果我们做了异常处理,可能会导致一些问题,例如:

fun main() = runBlocking {
    val job = launch(Dispatchers.IO) {
        while (true) {
            println("coroutine is running")
            try {
                delay(500)
            } catch (e: CancellationException) {
                println("coroutine is canceled:${Random.nextInt()}")
            }
        }
    }
    delay(2000)
    println("cancel coroutine")
    job.cancel()//取消协程
}

运行结果:
在这里插入图片描述
可以看到,手动捕获了 CancellationException 异常,反而会导致协程无法被取消而且 delay
也失去作用了,所以,如果我们在协程中调用挂起函数,一般不需要手动处理 CancellationException 异常。

如果业务需要处理异常情况,可以在 finally 中处理:

例如:

fun main() = runBlocking {
    val job = launch(Dispatchers.IO) {
        while (true) {
            println("coroutine is running")
            try {
                delay(500)
            } finally {
                //do something
            }
        }
    }
    delay(2000)
    println("cancel coroutine")
    job.cancel()//取消协程
}

协程的结构化取消

所谓的结构化取消是指:当一个协程被取消时,它的所有子协程也会被取消

这个特性在协程中是非常重要的,因为协程是一个树形结构,一个协程可以有多个子协程,子协程又可以有多个子协程。
在之前的文章中,我们提到过,协程会等待所有的子协程执行完毕后才会结束,这就是协程所谓的结构化结束

所以,如果一个协程被取消了,那么它的所有子协程也应该都被取消,这样才能保证协程可以正常的结构化的结束,这也就是协程的结构化取消

来看一个简单的示例:

fun main() = runBlocking {
    val job = launch {
        println("parent coroutine is start")
        //子协程
        launch {
            println("child coroutine1 is start")
            delay(1000)
            println("child coroutine1 is end")
        }
        //子协程
        launch {
            println("child coroutine2 is start")
            delay(1000)
            println("child coroutine2 is end")
        }

        println("parent coroutine is end")
    }
    delay(500)
    println("cancel coroutine")
    job.cancel()//取消协程
}

运行结果:

在这里插入图片描述
可以看到,当我们取消了父协程后,它的所有子协程也都被取消了,这就是协程的 结构化取消

源码分析

了解了协程是如何取消的,以及协程取消的一些特性,那么接下来我们就来看看协程的取消是如何实现的。

Job.cancel()

cancel 方法主要做了两件事:

  1. 设置协程的状态为取消状态:isActive = false
  2. 递归的调用子协程的 cancel 方法

我们可以跟一下 cancel 方法的源码: 链路比较长,我们只看关键的部分:

先看下 job.cancel 的源码:

cancel 方法接收一个 CancelletionException 类型的参数,这个参数是用来指定取消的原因的,如果不传,那么默认会通过 defaultCancellationException() 方法来创建一个默认的 CancellationException 异常。

在这里插入图片描述

然后 CancellationException 异常会传递到 cancelImpl 方法中继续处理。
在这里插入图片描述
cancelImpl 中会根据协程状态做对应的处理,这里我们重点看 COMPLETING_ALREADY(未完成状态)时,会走 makeCancelling 逻辑。

makeCancelling 中,又执行了:tryMakeCancelling
在这里插入图片描述
tryMakeCancellingIncomplete 类型的 state 就包含了协程的 isActive 属性

在这里插入图片描述
在后续的执行中,会把协程的 isActive 的值设置为 false,然后接着走取消流程。
在这里插入图片描述
notifyCancelling 中,会调用子协程的 cancel 方法,子协程又会再调它的子协程的 cancel 方法, 这样就实现了协程的结构化取消。

同时,还会调用 cancelParent() 方法尝试去取消父协程,因为上面也提到了,CaclletionException 毕竟是一个异常,代码这里实际上是异常的处理流程,其它的异常是会导致父协程也被取消的,这个在协程的异常这篇文章中会详细讲解。

在这里插入图片描述
先来看下为什么 CaclletionException 不会导致父协程被取消,重点看下 cancelParent 方法:

cancelParent 的返回值是一个 Boolean 类型,如果返回 true,表示父协程自行处理了异常,不会再向上传递异常,false 则表示异常会继续向上传递。

在这里插入图片描述

cancelParent 方法中最后会调用 parent.childCanceled 方法。

该方法中会判断异常是否是 CacellationExecption,如果是,则返回 true,不会调用cancelImpl 走取消的实现了,同时告知父协程异常已经处理了,不需要再向上传递。
在这里插入图片描述
至此,协程的取消流程大体上就搞清楚了,这也是为什么 CacellationExecption 不会导致程序崩溃,以及只会影响当前协程以及子协程的原因。

不配合取消:NonCancellable

上面说到了,协程的取消是结构化的,一个协程被取消了,它的所有子协程也会被取消,这是协程的默认行为。

但是,有时候我们希望子协程中的一些代码不要受到父协程的取消影响,针对于这样的场景,协程提供了一个东西是 NonCancellable

先来看看 NonCancellable 的源码:
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

可以看到,NonCancellable 实际上就是一个单例的 Job ,它有以下特点:

  • isActive 永远是 true
  • cancel 方法被重写了,不会做任何处理
  • parentnull
  • childrenemptySequence

也就是说,NonCancellable 是一个特殊的 Job,它的状态永远是活跃的,它也不存在父协程和子协程,相当于切断了父子协程之间的联系,因此,它也就不存在结构化取消的问题。

如何使用注释中也已经给出了示例,这里我们再来看一个简单的示例:

来简单用一下:

fun main() = runBlocking {
    val job = launch {
        println("parent coroutine is start")
        //子协程
        launch {
            try {
                println("child coroutine1 is start")
                delay(1000)
                println("child coroutine1 is end")
            } finally {
                //例如一些清理工作是必须要执行完成的
                withContext(NonCancellable) {
                    println("nonCancellableJob is start")
                    delay(300)
                    println("nonCancellableJob is end")

                }
            }
            println("write file end")
        }
        //子协程
        launch {
            println("child coroutine2 is start")
            delay(1000)
            println("child coroutine2 is end")
        }

        println("parent coroutine is end")
    }
    delay(500)
    println("cancel coroutine")
    job.cancel()//取消协程
}

执行结果:
在这里插入图片描述
可以看到,虽然父协程的取消导致两个子协程被取消了,但是withContext 中的代码块依旧能正常执行完毕。

至于其它的场景就不一一列举了,大家可以根据自己的业务场景来使用 NonCancellable

我们完全可以把 NonCancellable 看作是一个语义化的 Job,用来表示一些不能被取消的操作,这样就能更好地表达我们的业务逻辑。

实际上,使用 NonCancellable 和直接使用 Job() 能达到的效果是一样的

例如:
在这里插入图片描述
作用都是切断了父子协程之间的联系,使得代码块可以不受其它协程取消的影响,只是NonCancellable 更加语义化,更加直观并且更加严谨。

让阻塞任务也配合取消:runInterruptible

上面我们提到过,协程的取消是需要代码判断 isActive 来配合取消的。虽然kotlin中绝大部分挂起函数都是做了配合取消的,但是,总有一些代码是没有做配合取消的。

例如,我们需要调用一个三方的api,这个api是一个阻塞的耗时任务,但是,我们又希望协程取消时这个阻塞任务也能配合取消,这时候就需要用到 runInterruptible 了。

就以写文件为例吧,先来看看不使用 runInterruptible 时的情况:

/**
 * 模拟阻塞读取文件
 */
fun blockingFileRead(): String {
    println("开始阻塞任务")
    Thread.sleep(2000)
    println("阻塞任务结束")
    return "File Content"
}

fun main() = runBlocking {
    val timeMillis = measureTimeMillis {

        val job = launch {
            try {
                val result = withContext(Dispatchers.IO) {
                    blockingFileRead() //模拟阻塞读取文件
                }
                println("文件读取完成: $result")
            } catch (e: CancellationException) {
                println("任务被取消")
            }
        }

        // 模拟在 1 秒后取消任务
        delay(1000)
        println("取消job")
        job.cancelAndJoin() // 取消任务并等待它完成

    }
    println("耗时: $timeMillis")
}

执行结果:
在这里插入图片描述

可以看到,虽然我们在1秒中后取消了任务,但是,阻塞任务并没有被取消,而是执行完毕后,协程才被取消,从耗时也能看出来。
这也符合我们的预期,因为,阻塞代码块中并没有做配合取消的操作。

下面来换成 runInterruptible 看下:

/**
 * 模拟阻塞读取文件
 */
fun blockingFileRead(): String {
    println("开始阻塞任务")
    Thread.sleep(2000)
    println("阻塞任务结束")
    return "File Content"
}

fun main() = runBlocking {
    val timeMillis = measureTimeMillis {

        val job = launch {
            try {
                //使用 runInterruptible 来确保阻塞任务也可以配合取消
                val result = runInterruptible(Dispatchers.IO) {
                    blockingFileRead() //模拟阻塞读取文件
                }
                println("文件读取完成: $result")
            } catch (e: CancellationException) {
                println("任务被取消")
            }
        }

        // 模拟在 1 秒后取消任务
        delay(1000)
        println("取消job")
        job.cancelAndJoin() // 取消任务并等待它完成

    }
    println("耗时: $timeMillis")
}

执行结果

在这里插入图片描述

可以看到,runInterruptible 可以使得阻塞任务能够配合取消,这是怎么做到的呢?

源码分析

主要关注两个点:

  1. 在执行代码块之前,会先创建 ThreadState,并通过调用 threadState.setup() 注册一个取消回调,用于在协程取消时中断正在执行的线程。
  2. 如果出现了 InterruptedException,则会抛出 CancellationException
    在这里插入图片描述

关键逻辑在 ThreadState 中,我们来看一下:

首先是注册取消回调,invokeOnCompletion 是协程中的一个结束回调,不管是正常结束还是异常结束,都会执行该方法。异常结束时会携带 Throwable

在这里插入图片描述

然后在取消回调中会检查协程状态,如果发现协程被取消了,那么就执行线程的 interrupt 操作。

在这里插入图片描述
线程的 interrupt 会导致 InterruptedException 异常,InterruptedException 又会被捕获并抛出 CancellationException,至此,协程的取消就完成了。

推荐阅读