RabbitMQ - 以 MQ 为例,手写一个 RPC 框架 demo

 

目录

前言 

一、再谈自定义应用层协议

二、再谈 BrokerServer

三、再谈 Connection、Channel

四、Demo

a)启动服务器

b)客户端连接


前言 


本篇文章来自于笔者之前写过的一个系列 —— “根据源码,模拟实现 RabbitMQ” 系列,不妨可以去看看~

一、再谈自定义应用层协议

a)这个自定义应用层协议实际上就是在描述将来 客户端 和 服务器 之间通讯的消息格式长啥样

b)首先是一个 Int 类型的 type,描述了这个消息到底是用来干什么的(要调用服务器这边的哪一个服务).

c)然后就是 payload 的数据载荷,承载着将来调用 VirtualHost 中的具体的服务所需要的参数(例如创建交换机所需要的参数就有:交换机名字、交换机类型、是否自动删除、是否持久化、扩展参数).

因为 TCP 是面向字节流的(IO 流中主要提供的就是二进制数据的读写),因此这里不太适合使用 JSON 格式数据进行网络传输(可读性不好,效率不高),因此这里 payload 是一个 字节数组,将具体的数据序列化成 byte 数组放进来.

d)这里要注意的一点是,TCP 是面向字节流的,因此会出现粘包问题,那么为了解决这个问题,由两种办法,第一种就是约定分割符(读到指定分隔符就截止),第二种就是描述好 payload 的长度.

这里我采用的就是第二种办法,只需要在协议里面在添加一个 length 字段,用来描述 payload 的长度.

import java.io.Serializable

//Socket 自定义应用层协议(请求)
data class Request(
    val type: Int,
    val length: Int,
    val payload: ByteArray,
): Serializable

//Socket 自定义应用层协议(响应)
data class Response(
    val type: Int,
    val length: Int,
    val payload: ByteArray,
): Serializable

//基本参数(每个请求都会携带的参数,这里进行了一个封住)
open class ReqBaseArguments(
    open val rid: String = "",
    open val channelId: String = "",
): Serializable

//基本响应参数(每个响应都会携带的参数),主要是为了应对 mq 回调响应处理
open class RespBaseArguments(
    open val rid: String,
    open val channelId: String,
    open val ok: Boolean,
): Serializable

//主要的请求: 创建交换机、删除交换机、创建队列
data class ExchangeDeclareReq(
    val name: String,
    val type: ExchangeType,
    val durable: Boolean,
    val autoDelete: Boolean,
    val arguments: MutableMap<String, Any>,
    override val rid: String,
    override val channelId: String,
): ReqBaseArguments(), Serializable

data class ExchangeDeleteReq(
    val name: String,
    override val rid: String,
    override val channelId: String,
): ReqBaseArguments(), Serializable

data class QueueDeclareReq(
    val name: String,
    val durable: Boolean,
    val exclusive: Boolean,
    val autoDelete: Boolean,
    val arguments: MutableMap<String, Any>,
    override val rid: String,
    override val channelId: String,
): ReqBaseArguments(), Serializable

 

二、再谈 BrokerServer

a)BrokerServer 就是一个中间服务,也可以简单理解为 VirtualHost 的代理(BrokerServer 接收客户端请求,调用 VirtualHost 中具体的服务).

b)BrokerServer 启动的时候,就会通过 accept 阻塞等待客户端这边的 TCP 连接,连接成功之后只需要为该客户端其分配一个线程,处理之后的任务.

c)此时这个线程就会处于一个死循环循环,通过 IO 流读取到 客户端 请求中的 type、length、payload ,并按照约定的格式进行解析 payload,得到具体数据(这里不仅包含了 VirtualHost 服务中所需要的具体的参数,还携带了 channelId 和 rid)

d)此时,只需要根据 IO 流中读取出的 type,调用对应 VirtualHost 中的服务即可.

e)最后再将 VirtualHost 处理后得到的响应封装成 我们约定的应用层协议格式,通过 IO 写入到流中,让客户端去读取.

class BrokerServer(
    port: Int
) {

    private val socket = ServerSocket(port)
    private val clientPool = Executors.newFixedThreadPool(5)

    //key: channelId ,value: Socket
    //注意:这里的 Channel 只表示一个 "逻辑" 上的连接(创建,销毁 channel),这个 Map 是为了后台信息统计
    private val channelSession = ConcurrentHashMap<String, Socket>()

    private val virtualHost = VirtualHost()

    fun start() {
        println("[BrokerServer] 启动!")
        while (true) {
            val client = socket.accept()
            clientPool.submit {
                clientProcess(client)
            }
        }
    }

    private fun clientProcess(client: Socket) {
        println("[BrokerServer] 客户端上线!ip: ${client.inetAddress}, port: ${client.port}")
        try {
            client.getInputStream().use { inputStream ->
                client.getOutputStream().use { outputStream ->
                    DataInputStream(inputStream).use { dataInputStream ->
                        DataOutputStream(outputStream).use { dataOutputStream ->
                            while (true) {
                                val request = readRequest(dataInputStream)
                                val response = process(request, client)
                                writeResponse(response, dataOutputStream)
                            }
                        }
                    }
                }
            }
        } catch (e: EOFException) {
            println("[BrokerServer] 客户端正常下线!ip: ${client.inetAddress}, port: ${client.port}")
        } catch (e: Exception) {
            println("[BrokerServer] 客户端连接异常!ip: ${client.inetAddress}, port: ${client.port}")
        } finally {
            client.close()
            removeChannelSession(client)
        }
    }

    private fun process(request: Request, client: Socket) = with(request) {
        //1.解析请求
        val req = BinaryTool.bytesToAny(payload)
        //2.获取请求中的 channelId,记录和 Socket 的关系(让每个 channel 都对应自己的 Socket,类似于 Session)
        val reqBase = req as ReqBaseArguments
        //3.根据 type 类型执行不同的服务(创建 Channel、销毁 Channel、创建交换机、删除交换机...)
        val ok = when(type) {
            1 -> {
                channelSession[reqBase.channelId] = client
                println("[BrokerServer] channel 创建成功!channelId: ${reqBase.channelId}")
                true
            }
            2 -> {
                channelSession.remove(reqBase.channelId)
                println("[BrokerServer] channel 销毁成功!channelId: ${reqBase.channelId}")
                true
            }
            3 -> virtualHost.exchangeDeclare(req as ExchangeDeclareReq)
            4 -> virtualHost.exchangeDelete(req as ExchangeDeleteReq)
            5 -> virtualHost.queueDeclare(req as QueueDeclareReq)
            //...
            else -> throw RuntimeException("[BrokerServer] 客户端请求 type 非法!type: $type")
        }
        //4.返回响应
        val respBase = RespBaseArguments(reqBase.rid, reqBase.channelId, ok)
        val payload = BinaryTool.anyToBytes(respBase)
        Response(type, payload.size, payload)
    }

    /**
     * 读取客户端请求
     * 使用 DataInputStream 的主要原因就是有多种读取方式,例如 readInt()、readLong(),这些都是原生 InputStream 没有的
     */
    private fun readRequest(dataInputStream: DataInputStream) = with(dataInputStream) {
        val type = readInt()
        val length = readInt()
        val payload = ByteArray(length)
        val n = read(payload)
        if (n != length) throw RuntimeException("[BrokerServer] 读取客户端请求异常!")
        Request(type, length, payload)
    }

    /**
     * 将响应写回给客户端
     */
    private fun writeResponse(response: Response, outputStream: DataOutputStream) = with(outputStream) {
        writeInt(response.type)
        writeInt(response.length)
        write(response.payload)
        flush()
    }

    //删除所有和这个 clientSocket 有关的 Channel
    private fun removeChannelSession(client: Socket) {
        val channelIdList = mutableListOf<String>()
        //这里不能直接删除,会破坏迭代器结构
        for (entry in channelSession) {
            if (entry.value == client) channelIdList.add(entry.key)
        }
        for (channelId in channelIdList) {
            channelSession.remove(channelId)
        }
    }

}

class VirtualHost {

    fun exchangeDeclare(req: ExchangeDeclareReq): Boolean {
        //执行业务逻辑
        //...
        println("[VirtualHost] 创建交换机成功!")
        return true
    }

    fun exchangeDelete(req: ExchangeDeleteReq): Boolean {
        //执行业务逻辑
        //...
        println("[VirtualHost] 删除交换机成功!")
        return true
    }

    fun queueDeclare(req: QueueDeclareReq): Boolean {
        //执行业务逻辑
        //...
        println("[VirtualHost] 创建队列成功!")
        return true
    }

}

 

三、再谈 Connection、Channel

a)一个 Connection 就是一个 TCP 连接,因此频繁 建立/断开连接(三次握手、四次挥手...)的开销也是相当大的,因此就引入了 Channel. 

b)一个 Connection 下可以有多个 Channel(此处使用 map 来维护).  Channel 只是简单的表示一个逻辑上的连接,可以理解为一个大的项目下被拆分成的多个小的微服务. 实现了 TCP 连接的复用.

c)起初,我们需要先创建出 Connection 与服务端建立连接,初始化构造中只需要写一个死循环,不断的从服务端这边读取响应.

d)接着,通过 Connection 创建出 Channel 来完成具体的业务(Channel 中就提供了一系列方法,就像调用本地的方法一样,调用到远程服务器的接口).

e)例如 Channel 中提供的创建叫交换机方法(channel.exchangeDeclare(...)),这个方法中具体要做的就是将传入的参数,封装到一个对象中,序列化成 二进制 数据,这就是将来协议中要传输的 payload.   进一步的,协议 Request 就构造出来了,通过 IO 写到流中,供服务端读取.

d)为了能够让每次请求和响应都能对的上,Channel 这里我维护了一个 map(key 是 rid、value 是具体的响应),客户端和服务端之间的每个请求和响应都会携带上这个 rid 这个参数,这样将来 Connection 客户端接受到响应的时候,就可以直接把 响应中的 rid 提取出来,交给 Channel 的 map 中(响应来之前,Channel 一直阻塞等待,直到响应来了 -> 能通过 rid  从 map 中得到).

class ConnectionFactory(
    private val host: String,
    private val port: Int,
) {

    fun newConnection() = Connection(host, port)

}
class Connection(
    ip: String,
    port: Int,
) {

    private val socket = Socket(ip, port)
    private val channelMap = ConcurrentHashMap<String, Channel>()
    //下述这样提前创建好,是为了将来 Channel 在读写请求的时候的方便(Channel 就不用获取输入输出流了)
    private val inputStream = socket.getInputStream()
    private val outputStream = socket.getOutputStream()
    private val dataInputStream = DataInputStream(inputStream)
    private val dataOutputStream = DataOutputStream(outputStream)

    init {
        //此线程负责不停的从服务器这边获取响应
         Thread {
             try {
                 while (!socket.isClosed) {
                     //读取服务器响应
                     val resp = readResp()
                     //将响应交给对应的 Channel
                     putRespToChannel(resp)
                 }
             } catch (e: SocketException) {
                 println("[Connection] 客户端正常断开连接")
             } catch (e: Exception) {
                 println("[Connection] 客户端异常断开连接")
                 e.printStackTrace()
             }
         }.start()
    }


    /**
     * 将客户端 Connection 接收到的请求,交给对应的 Channel 处理(此时 Channel 还在阻塞等待服务端响应)
     */
    private fun putRespToChannel(resp: Response) {
        //这里由于不涉及回调,所以每个 type 类型的响应都长一样,就按照一样的方式解析了
        val baseResp = BinaryTool.bytesToAny(resp.payload) as RespBaseArguments
        val channel = channelMap[baseResp.channelId]
            ?: throw RuntimeException("[Connection] 该响应对应的 Channel 不存在!channelId: ${baseResp.channelId}")
        //将响应交给 Channel
        channel.notifyResp(baseResp)
    }

    /**
     * 创建 Channel
     */
    fun createChannel(): Channel { //1.创建 Channel,保存到 map 种
        val channelId = "C-${UUID.randomUUID()}"
        val channel = Channel(channelId, this)
        channelMap[channelId] = channel
        //2.告知服务端 Channel 创建
        val ok = channel.createChannel()
        //3.如果 Channel 创建不成功,客户端这边也应该要删除对应的 Channel 信息
        if (!ok) channelMap.remove(channelId)
        return channel
    }

    private fun readResp() = with(dataInputStream) {
        val type = readInt()
        val length = readInt()
        val payload = ByteArray(length)
        val n = read(payload)
        if (n != length) throw RuntimeException("[Connection] 客户端读取响应异常!")
        Response(type, length, payload)
    }

    fun writeReq(request: Request) = with(dataOutputStream) {
        writeInt(request.type)
        writeInt(request.length)
        write(request.payload)
        flush()
    }

}
class Channel(
    private val channelId: String,
    private val connection: Connection,  //自己当前属于哪个 Channel
) {

    //key: rid(为了能让每个 Channel 对应上自己的响应)
    //value: RespBaseArguments(具体的响应)
    //当 Connection 的扫描线程接收到响应之后,就会将响应传给这个 map
    private val ridRespMap = ConcurrentHashMap<String, RespBaseArguments>()
    //这个锁是用来阻塞等待服务端响应的(避免轮询),当服务端传来响应时,Connection 就会唤醒锁
    private val locker = Object()

    private fun generateRid() = "R-${UUID.randomUUID()}"

    private fun waitResp(rid: String): RespBaseArguments {
        val respBase: RespBaseArguments
        while (ridRespMap[rid] == null) { // 如果为空,说明此时服务端还没有传来响应
            synchronized(locker) { //为了避免轮询,就让其阻塞等待
                locker.wait()
            }
        }
        //出了这个循环,那么 ridRespMap[rid] 一定不为空
        return ridRespMap[rid]!!
    }

    fun notifyResp(respBase: RespBaseArguments) {
        ridRespMap[respBase.rid] = respBase
        synchronized(locker) {
            //当前也不直到有多少线程在等待响应,就全部唤醒
            locker.notifyAll()
        }
    }

    /**
     * 创建 Channel
     */
    fun createChannel(): Boolean {
        //1.创建基本请求
        val reqBase = ReqBaseArguments(
            rid = generateRid(),
            channelId = channelId
        )
        //2.构造 TCP 通信请求
        val payload = BinaryTool.anyToBytes(reqBase)
        val req = Request(
            type = 1,
            length = payload.size,
            payload = payload
        )
        //3.发送请求
        connection.writeReq(req)
        //4.等待客户端响应
        val respBase = waitResp(reqBase.rid)
        return respBase.ok
    }

    fun removeChannel(): Boolean {
        //1.创建基本请求
        val reqBase = ReqBaseArguments(
            rid = generateRid(),
            channelId = channelId
        )
        //2.构造 TCP 通信请求
        val payload = BinaryTool.anyToBytes(reqBase)
        val req = Request(
            type = 2,
            length = payload.size,
            payload = payload
        )
        //3.发送请求
        connection.writeReq(req)
        //4.等待客户端响应
        val respBase = waitResp(reqBase.rid)
        return respBase.ok
    }

    fun exchangeDeclare(
        name: String,
        type: ExchangeType,
        durable: Boolean,
        autoDelete: Boolean,
        arguments: MutableMap<String, Any>,
    ): Boolean {
        val exchangeDeclareReq = ExchangeDeclareReq(
            name = name,
            type = type,
            durable = durable,
            autoDelete = autoDelete,
            arguments = arguments,
            rid = generateRid(),
            channelId = channelId,
        )
        val payload = BinaryTool.anyToBytes(exchangeDeclareReq)
        val req = Request(
            type = 3,
            length = payload.size,
            payload = payload,
        )
        connection.writeReq(req)
        val respBase = waitResp(exchangeDeclareReq.rid)
        return respBase.ok
    }

    fun exchangeDelete(name: String): Boolean {
        val exchangeDeleteReq = ExchangeDeleteReq(
            name = name,
            rid = generateRid(),
            channelId = channelId,
        )
        val payload = BinaryTool.anyToBytes(exchangeDeleteReq)
        val req = Request(
            type = 4,
            length = payload.size,
            payload = payload,
        )
        connection.writeReq(req)
        val respBase = waitResp(exchangeDeleteReq.rid)
        return respBase.ok
    }

    fun queueDeclare(
        name: String,
        durable: Boolean,
        exclusive: Boolean,
        autoDelete: Boolean,
        arguments: MutableMap<String, Any>,
    ): Boolean {
        val queueDeclareReq = QueueDeclareReq(
            name = name,
            durable = durable,
            exclusive = exclusive,
            autoDelete = autoDelete,
            arguments = arguments,
            rid = generateRid(),
            channelId = channelId,
        )
        val payload = BinaryTool.anyToBytes(queueDeclareReq)
        val req = Request(
            type = 5,
            length = payload.size,
            payload = payload,
        )
        connection.writeReq(req)
        val resp = waitResp(queueDeclareReq.rid)
        return resp.ok
    }

}

 

四、Demo

a)启动服务器

fun main() {
    val server = BrokerServer(9000)
    server.start()
}

b)客户端连接

class Test2 {
}

fun main() {
    val factory = ConnectionFactory("127.0.0.1", 9000)
    val connection = factory.newConnection()
    val channel = connection.createChannel()

    val ok1 = channel.createChannel()
    val ok2 = channel.exchangeDeclare("e1", ExchangeType.DIRECT, false, false, mutableMapOf())
    val ok3 = channel.removeChannel()

    println("ok1: $ok1, ok2: $ok2, ok3: $ok3")
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/613477.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

WPF之工具栏菜单栏功能区。

1&#xff0c;菜单栏&#xff0c;工具栏&#xff0c;状态栏。 1.1&#xff0c;Menu中可添加菜单分隔条<Separator></Separator>作为分割线&#xff0c;使用Separator可以通过改变其template来自定义&#xff0c;Separator是无焦点的&#xff0c;如果简单的在MenuIt…

c++ 获取机器码

看到网上代码代码都没什么好的&#xff0c;自己备用一个 #include <iostream> #include <string> #include <sstream> #include <iomanip> #include <Windows.h> #include <iphlpapi.h> // 包含这个头文件以获取 PIP_ADAPTER_INFO #inclu…

面试官:SPA(单页应用)首屏加载速度慢怎么解决

一、什么是首屏加载 首屏时间&#xff08;First Contentful Paint&#xff09;&#xff0c;指的是浏览器从响应用户输入网址地址&#xff0c;到首屏内容渲染完成的时间&#xff0c;此时整个网页不一定要全部渲染完成&#xff0c;但需要展示当前视窗需要的内容 首屏加载可以说是…

Macbook2024电脑必备系统优化软件CleanMyMacX

随着时间的推移&#xff0c;你可能会发现你的MacBook运行速度变慢&#xff0c;甚至在执行一些基本任务时也会感觉到卡顿。这不仅影响了工作效率&#xff0c;也大大降低了使用体验。特别是当你运行大型应用程序&#xff0c;比如视频编辑软件或图形设计工具时&#xff0c;卡顿现象…

Python计算器程序代码

from tkinter import * import random class App: def __init__(self, master): self.master master self.initwidgets() #表达式的值 self.expr None def initwidgets(self): #定义一个输入组件 self.show Label(relief SUNKEN, font (Courier New, 24), width 25, bg …

[JAVASE] 类和对象(二)

目录 一. 封装 1.1 面向对象的三大法宝 1.2 封装的基本定义与实现 二. 包 2.1 包的定义 2.2 包的作用 2.3 包的使用 2.3.1 导入类 2.3.2 导入静态方法 三. static 关键字 (重要) 3.1 static 的使用 (代码例子) 3.1.1 3.1.2 3.1.3 3.1.4 四. 总结 一. 封装 1.1 面向对象…

反了!美国假冒邮政服务钓鱼网站访问量竟然超过正规官网

美国邮政是美国主要的包裹信件投递机构之一&#xff0c;长期以来该单位都是网络钓鱼和诈骗的针对目标。对美国公民来说&#xff0c;在假期通常都会收到声称来自美国邮政的诈骗。美国邮政甚至单独建设的网页提醒消费者警惕诈骗信息&#xff1a; 专用提醒网页 Akamai 的研究人员…

IP证书签发申请

IP证书签发申请 IP证书的全称是IP SSL证书&#xff0c;其主要的作用是为IP实现https访问&#xff0c;且IP SSL证书可以完美的解决企业对于IP地址实现https加密需求。 这种类型的证书特别适合于那些没有域名只有公网IP或者不方便使用域名的企业或个人。证书允许通过特定的IP地…

简单的表单初始密码验证的实现

目录 简单示例&#xff1a;表单初始密码验证 1.1准备工作(图1&#xff09; 1.2 index部分 1.3 css部分 1.3.1先把css部分链接到index.html中&#xff0c;注意链接的地址。 1.3.2添加样式 1.4 JS部分 1.4.1 先把js部分链接到index.html中&am…

LAE SHOW 2024 大湾区国际低空经济产业博览会

LAE SHOW 2024 大湾区国际低空经济产业博览会 2024 CHN GBA Intl Low-Altitude Economy Industrial Show ◎ 展会基本信息&#xff1a; 展览时间&#xff1a;2024年12月4日-6日 展览地点&#xff1a;深圳国际会展中心&#xff08;宝安新馆&#xff09; 展览面积&#xff1a…

2万字干货:如何从0到1搭建一套会员体系(2)

2.用户等级 还是一样&#xff0c;我们为什么要搭建用户等级&#xff1f; 一个国家有几亿人口的时候你怎么来管理&#xff1f;老祖宗秦始皇给出了我们答案&#xff1a;郡县制。发展到现在则演进成了省-市-区县-乡镇(街道)-村(社区)5层行政治理结构。 产品同理&#xff0c;当你…

人脸识别技术在访客管理中的应用

访客办理体系&#xff0c;能够使用于政府、戎行、企业、医院、写字楼等众多场所。在办理时&#xff0c;需求对来访人员身份进行精确认证&#xff0c;才能保证来访人员的进入对被访单位不被外来风险入侵。在核实身份时&#xff0c;比较好的方法就是选用人脸辨认技能&#xff0c;…

QT 小项目:登录注册账号和忘记密码(下一章实现远程登录)

一、环境搭建 参考上一章环境 二、项目工程目录 三、主要源程序如下&#xff1a; registeraccountwindow.cpp 窗口初始化&#xff1a; void registeraccountWindow::reginit() {//去掉&#xff1f;号this->setWindowFlags(windowFlags() & ~Qt::WindowContextHelpButt…

针对 % 号 | 引起的 不安全情况

把网站开放的课程都检索下来了 一、情况1 org.apache.tomcat.util.http.Parameters processParameters 信息: Character decoding failed. Parameter [Mac] with value [%%%] has been ignored. Note that the name and value quoted here may be corrupted due to the failed…

陪诊陪护小程序基于ThinkPHP + FastAdmin + 微信小程序开发(源码搭建/上线/运营/售后/更新

支持多运营区&#xff0c;陪护师、推广者等完整闭环功能&#xff0c;快速搭建陪护业务平台。 消息通知&#xff1a;系统可以向用户发送订单状态变更、陪诊员信息更新等通知&#xff0c;确保用户及时了解相关信息&#xff0c;提高用户体验。 订单管理&#xff1a;患者可以查看自…

C++运算符重载(操作符重载)

运算符重载 1. 运算符重载基础1.1 运算符重载语法1.2 运算符重载细节补充1.3 更多的运算符重载 2. 重载单目运算符3. 如何直接输入输出对象类型——重载运算符 << 和 >>3.1 单个对象实现 cou <<3.2 多个对象实现 cout<<3.3 右移运算符 输入 cin >&g…

python输出希腊字母

有时候在绘制一些函数图像时&#xff0c;需要坐标轴和图例显示希腊字母 plt.xlabel(r’ ϵ \epsilon ϵ’)

土壤多参数检测仪在农业生产有哪些优势?

在现代农业的快速发展中&#xff0c;土壤多参数检测仪凭借其独特的功能和优势&#xff0c;为农业生产注入了新的活力。以下是其在农业生产中的几个显著优势&#xff1a; 一、全面精准的检测能力 土壤多参数检测仪能够全面、精准地检测土壤中的多种参数&#xff0c;如pH值、电…

【机器学习】线性回归:以房价预测为例

线性回归&#xff1a;揭秘房价预测的黑科技 一、引言二、线性回归概述三、房价预测实例数据收集与预处理特征选择与建模模型评估与优化 四、总结与展望 一、引言 在数字化时代&#xff0c;数据科学已成为推动社会进步的重要引擎。其中&#xff0c;线性回归作为数据科学中的基础…

Android 面试之Kotlin 协程上下文和异常处理

本文首发于公众号“AntDream”&#xff0c;欢迎微信搜索“AntDream”或扫描文章底部二维码关注&#xff0c;和我一起每天进步一点点 上下文是什么 CoroutineContext是一组用于定义协程行为的元素&#xff0c;包括以下几部分&#xff1a; Job&#xff1a;控制协程的生命周期Co…