【Kotlin】Channel简介

1 前言

        Channel 是一个并发安全的阻塞队列,可以通过 send 函数往队列中塞入数据,通过 receive 函数从队列中取出数据。

        当队列被塞满时,send 函数将被挂起,直到队列有空闲缓存;当队列空闲时,receive 函数将被挂起,直到队列中有新数据存入。

        Channel 中队列缓存空间的大小需要在创建时指定,如果不指定,缓存空间默认是 0。

2 Channel 中 send 和 receive 案例

2.1 capacity 为 0

fun main() {
    var channel = Channel<Int>()
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        repeat(3) {
            delay(10)
            println("send: $it")
            channel.send(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        repeat(3) {
            delay(100)
            var element = channel.receive()
            println("receive: $element")
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
receive: 0
send: 1
receive: 1
send: 2
receive: 2

        说明:send 的 delay 时间比 receive 的 delay 时间短,但是并没有出现连续打印两个 send,而是打印一个 send,再打印一个 recieve,它们交替打印。因为 Channel 中队列的缓存空间默认为 0,在执行了 send 后,如果没有执行 recieve,send 将一直被挂起,直到执行了 receive 才恢复执行 send。

2.2 capacity 大于 0

fun main() {
    var channel = Channel<Int>(2)
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        repeat(3) {
            delay(10)
            println("send: $it")
            channel.send(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        repeat(3) {
            delay(100)
            var element = channel.receive()
            println("receive: $element")
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
send: 1
send: 2
receive: 0
receive: 1
receive: 2

        说明:Channel 中队列的缓存空间为 2,send 的 delay 时间比 receive 的 delay 时间短,因此会出现连续打印多个 send。

3 Channel 中迭代器

3.1 iterator

fun main() {
    var channel = Channel<Int>()
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        repeat(3) {
            println("send: $it")
            channel.send(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        var iterator = channel.iterator()
        while (iterator.hasNext()) {
            var element = iterator.next()
            println("receive: $element")
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2

3.2 for in

fun main() {
    var channel = Channel<Int>()
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        repeat(3) {
            println("send: $it")
            channel.send(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        for (element in channel) {
            println("receive: $element")
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
receive: 0
send: 1
send: 2
receive: 1
receive: 2

4 Channel 中 produce 和 actor

        produce 函数用于构造一个生产者协程,并返回一个 ReceiveChannel;actor 函数用于构造一个消费者协程,并返回一个 SendChannel。

4.1 produce

fun main() {
    var receiveChannel = CoroutineScope(Dispatchers.Default).produce<Int> { // 生产者
        repeat(3) {
            println("send: $it")
            send(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        for (element in receiveChannel) {
            println("receive: $element")
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2

4.2 actor

fun main() {
    var sendChannel = CoroutineScope(Dispatchers.Default).actor<Int> { // 生产者
        repeat(3) {
            var element = receive()
            println("receive: $element")
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        repeat(3) {
            println("send: $it")
            sendChannel.send(it)
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2

5 Channel 的关闭

        对于一个 Channel,如果我们调用了它的 close 函数,它会立即停止发送新元素,也就是说这时它的 isClosedForSend 会立即返回 true。而由于 Channel 缓冲区的存在,这时候可能还有一些元素没有被处理完,因此要等所有的元素都被读取之后 isClosedForReceive 才会返回 true。

fun main() {
    var channel = Channel<Int>(3)
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        repeat(3) {
            println("send: $it")
            channel.send(it)
        }
        channel.close()
        println("producter, isClosedForSend=${channel.isClosedForSend}, isClosedForReceive=${channel.isClosedForReceive}")
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        repeat(3) {
            var element = channel.receive()
            println("receive: $element")
        }
        println("consumer, isClosedForSend=${channel.isClosedForSend}, isClosedForReceive=${channel.isClosedForReceive}")
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
send: 1
send: 2
producter, isClosedForSend=true, isClosedForReceive=false
receive: 0
receive: 1
receive: 2
consumer, isClosedForSend=true, isClosedForReceive=true

6 BroadcastChannel

        Channel 的生产者(producter)和消费者(consumer)都可以存在多个,但是同一个元素只会被一个消费者读到。BroadcastChannel 则不然,多个消费者不存在互斥行为。

6.1 Channel 中多个消费者

fun main() {
    var channel = Channel<Int>(2)
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        delay(10)
        repeat(3) {
            println("send: $it")
            channel.send(it)
        }
    }
    repeat(2) { index ->
        CoroutineScope(Dispatchers.Default).launch { // 消费者
            for (element in channel) {
                println("receive-$index: $element")
            }
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
send: 1
send: 2
receive-0: 0
receive-0: 2
receive-1: 1

        说明:结果表明,Channel 中同一个元素只会被一个消费者读到。

6.2 BroadcastChannel 中多个消费者

6.2.1 BroadcastChannel

fun main() {
    var broadcastChannel = BroadcastChannel<Int>(2)
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        delay(10)
        repeat(3) {
            println("send: $it")
            broadcastChannel.send(it)
        }
    }
    repeat(2) { index ->
        CoroutineScope(Dispatchers.Default).launch { // 消费者
            var receiveChannel = broadcastChannel.openSubscription()
            for (element in receiveChannel) {
                println("receive-$index: $element")
            }
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
send: 1
send: 2
receive-0: 0
receive-0: 1
receive-0: 2
receive-1: 0
receive-1: 1
receive-1: 2

        说明:结果表明,BroadcastChannel 中同一个元素可以被所有消费者读到。

6.2.2 broadcast

fun main() {
    var channel = Channel<Int>()
    var broadcastChannel = channel.broadcast(2)
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        delay(10)
        repeat(3) {
            println("send: $it")
            broadcastChannel.send(it)
        }
    }
    repeat(2) { index ->
        CoroutineScope(Dispatchers.Default).launch { // 消费者
            var receiveChannel = broadcastChannel.openSubscription()
            for (element in receiveChannel) {
                println("receive-$index: $element")
            }
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
send: 1
send: 2
receive-1: 0
receive-1: 1
receive-1: 2
receive-0: 0
receive-0: 1
receive-0: 2

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/579498.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

PotatoPie 4.0 实验教程(41) —— FPGA实现RISC-V 扩展 GPIO UART Timer功能

TD工程介绍 我们提供的TD工程里的RISC-V核默认就开启了GPIO UART扩展&#xff0c;可以看到还有SPI和I2C扩展。因此后面的实验中TD的工程我们基本不怎么修改TD的内容&#xff0c;只需要修改TD工具中Soc_Top.v文件中的TCM0_INITFILE为FD生成的固件名称即可&#xff0c;主要修我以…

数据集市的详细建设方案!

▶ 什么是数据集市&#xff1f; 数据集市是处理单一事务的数据仓库的子集。它们通常由单个业务部门构建和管理。由于它们是面向主题的&#xff0c;因此通常仅从少数来源获取数据&#xff0c;这些来源可能是内部操作系统&#xff0c; 数据湖&#xff0c;一个集中的 数据存储库&a…

c++理论篇(一) ——浅谈tcp缓存与tcp的分包与粘包

介绍 在网络通讯中,Linux系统为每一个socket创建了接收缓冲区与发送缓冲区,对于TCP协议来说,这两个缓冲区是必须的.应用程序在调用send/recv函数时,Linux内核会把数据从应用进程拷贝到socket的发送缓冲区中,应用程序在调用recv/read函数时,内核把接收缓冲区中的数据拷贝到应用…

Android 设置头像 - 相册拍照

Android开发在个人信息管理中&#xff0c;如果设置头像&#xff0c;一般都提供了从相册选择和拍照两种方式。下午将针对设置用户头像相册和拍照两种方式的具体实现进行详细说明。 在实际实现过程中需要使用到权限管理&#xff0c;新版本的Android需要动态申请权限&#xff0c;权…

rabbitmq下载安装最新版本--并添加开机启动图文详解!!

一、简介 RabbitMQ是一个开源的遵循AMQP协议实现的消息中间件支持多种客户端语言,用于分布式系统中存储和转发消息, 这是 Release RabbitMQ 3.13.0 rabbitmq/rabbitmq-server GitHub 二、安装前准备 1、查看自己系统 确认操作系统版本兼容性 uname -a2、下载Erlang依赖包…

【12580无线通信技术】第十一章 Ad hoc网络无线通信技术期末复习自考复习

第十一章 Ad hoc网络无线通信技术 P283&#xff08;名词&#xff09;Ad hoc技术&#xff1a;是一种特定的无线网络结构&#xff0c;强调的是多跳、自组织、无中心的概念。P285&#xff08;简答&#xff09;Ad hoc网络的特点:①自组织和无中心特性&#xff1b;②网络拓补动态变…

SpringCloud系列(20)--Ribbon的简介及使用

1、Ribbon的简介 Spring Cloud Ribbon是基于Netflix Ribboh实现的一套客户端负载均衡的工具&#xff0c;简单的说&#xff0c;Ribbon是Netflix发布的开源项目&#xff0c;主要功能是提供客户端的软件负载均衡算法和服务调用。Ribbon客户端组件提供一系列完善的配置项如连接超时…

学习100个Unity Shader (14) ---透明效果

文章目录 渲染队列透明度测试&#xff08;Alpha Test&#xff09;效果Shader 透明度混合&#xff08;Alpha Blending&#xff09;效果Shader 参考 渲染队列 由”Queue“ 标签决定&#xff0c;索引号越小越早被渲染&#xff1a; 名称队列索引号Background1000Geometry2000Alph…

论文阅读之MMSD2.0: Towards a Reliable Multi-modal Sarcasm Detection System

文章目录 论文地址主要内容主要贡献模型图技术细节数据集改进多视图CLIP框架文本视图图像视图图像-文本交互视图 实验结果 论文地址 https://arxiv.org/pdf/2307.07135 主要内容 这篇文章介绍了一个名为MMSD2.0的多模态讽刺检测系统的构建&#xff0c;旨在提高现有讽刺检测系…

通过大模型(LLM)的多模态辩论的恶意表情包识别

Towards Explainable Harmful Meme Detection through Multimodal Debate between Large Language Models https://arxiv.org/abs/2401.13298https://arxiv.org/abs/2401.13298 1.概论 对于恶意表情包的识别,以往的研究方法没有能够深入表情包所隐含的复杂意义和文化背景,因…

vue-manage-system 更新,后台管理系统开发更简单

vue-manage-system 近期进行了一次版本升级&#xff0c;主要是支持了更多功能、升级依赖版本和优化样式&#xff0c;并且上线了官方文档网站&#xff0c;大部分功能都有文档或者使用示例&#xff0c;更加适合新手上手开发&#xff0c;只需要根据实际业务简单修改&#xff0c;就…

用fgets()替换fscanf()解决文件读取在小熊猫C++失败问题

fscanf&#xff08;&#xff09;遇到空格就结束读取&#xff0c;导致文件读取数据没完就退出读取以至于不能导入游戏地图工程。 看看到右侧小方块轨迹知晓采样区移动情况 也已经实现摄像机追随玩家效果 // 程序&#xff1a;2D RPG 地图编辑器与摄像机追随 // 作者&#xff1…

C语言自定义类型【联合体与枚举】

文章目录 1.联合体1.1联合体的声明1.2联合体的特点1.3联合体的大小计算联合体的使用案例 2.枚举2.1枚举类型的声明2.2枚举类型的优点(为什么使用枚举)2.3枚举类型的使用 结语 1.联合体 1.1联合体的声明 和结构体一样&#xff0c;联合体也是由一个或多个成员构成&#xff0c;同…

如何在 Visual Studio 中通过 NuGet 添加包

在安装之前要先确定Nuget的包源是否有问题。 Visual Studio中怎样更改Nuget程序包源-CSDN博客 1.图形界面安装 打开您的项目&#xff0c;并在解决方案资源管理器中选择您的项目。单击“项目”菜单&#xff0c;然后选择“管理 NuGet 程序包”选项。在“NuGet 包管理器”窗口中…

CTF(web方向)--md5的“===”和“==”的绕过

一、PHP弱类型说明 1.简介 php是一种弱类型语言&#xff0c;对数据的类型要求并不严格&#xff0c;可以让数据类型互相转换。 在php中有两种比较符号: 一种是 &#xff0c;另外一种是 &#xff0c;都是用来比较两个数值是否相等的操作符&#xff0c;但他们也是有区别的: &a…

大数据架构相关知识总结

一、大数据处理系统架构特性 1. 鲁棒性和容错性&#xff1a; 系统必须对游bug的程序写入的错误数据游足够的适应能力 2. 低延迟读取和更新能力 3. 横向扩容&#xff1a; 可以通过增加机器数量来维持性能 4. 通用性&#xff1a; 需要支持绝大多数应用程序 5. 延展性&#xff1a;…

前端工程化Vue使用Node.js设置国内高速npm镜像源(踩坑记录版)

前端工程化Vue使用Node.js设置国内高速npm镜像源&#xff08;踩坑记录版&#xff09; 此篇仅为踩坑记录&#xff0c;并未成功更换高速镜像源&#xff0c;实际解决方法见文末跳转链接。 1.自身源镜像 自身镜像源创建Vue项目下载速度感人 2.更改镜像源 2.1 通过命令行配置 前提…

【工作】程序员工作压力八个常见来源与建议缓解压力小窍门

目录 ​编辑 一. 程序员工作压力八个常见来源与建议 1&#xff09;目标职位不对 2&#xff09;工作任务描述不清晰 3&#xff09;快节奏的工作环境 4&#xff09;项目后期突然被添加新的要求 5&#xff09;计划外的工作事务会打断并破坏注意力 6&#xff09;个人问题 7…

MySQL第一次作业

解压完安装包 以管理员进入命令行 初始化并记住初始随机密码 创建服务名称 启动mysql 使用随机密码登录 修改密码 退出并重登服务器 MySQL创建数据库和表 创建数据库 创建表 1.进入数据库 创建表 向表中插入数据

鸿蒙OpenHarmony【小型系统 编译】(基于Hi3516开发板)

编译 OpenHarmony支持hb和build.sh两种编译方式。此处介绍hb方式&#xff0c;build.sh脚本编译方式请参考[使用build.sh脚本编译源码]。 使用build.sh脚本编译源码 进入源码根目录&#xff0c;执行如下命令进行版本编译。 ./build.sh --product-name name --ccache 说明&…