AQS应用之BlockingQueue详解

概要

AQS全称是 AbstractQueuedSynchronizer,中文译为抽象队列式同步器。BlockingQueue,是java.util.concurrent 包提供的用于解决并发生产者 - 消费者问题的最有用的类,它的特性是在任意时刻只有一个线程可以进行take或者put操作,并且BlockingQueue提供了超时return null的机制,在许多生产场景里都可以看到这个工具的身影。

队列类型

  1. 无限队列 (unbounded queue ) - 几乎可以无限增长
  2. 有限队列 ( bounded queue ) - 定义了最大容量

队列数据结构

队列实质就是一种存储数据的结构

  • 通常用链表或者数组实现
  • 一般而言队列具备FIFO先进先出的特性,当然也有双端队列(Deque)优先级队列
  • 主要操作:入队(EnQueue)与出队(Dequeue)

常见的4种阻塞队列

  • ArrayBlockingQueue 由数组支持的有界队列
  • LinkedBlockingQueue 由链接节点支持的可选有界队列
  • PriorityBlockingQueue 由优先级堆支持的无界优先级队列
  • DelayQueue 由优先级堆支持的、基于时间的调度队列

ArrayBlockingQueue

队列基于数组实现,容量大小在创建ArrayBlockingQueue对象时已定义好

数据结构如下图:

队列创建:

BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>();

应用场景

在线程池中有比较多的应用,生产者消费者场景

工作原理

基于ReentrantLock保证线程安全,根据Condition实现队列满时的阻塞

LinkedBlockingQueue

是一个基于链表的无界队列(理论上有界)

BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();

上面这段代码中,blockingQueue 的容量将设置为 Integer.MAX_VALUE 。

向无限队列添加元素的所有操作都将永远不会阻塞,[注意这里不是说不会加锁保证线程安全],因此它可以增长到非常大的容量。

使用无限 BlockingQueue 设计生产者 - 消费者模型时最重要的是 消费者应该能够像生产者向队列添加消息一样快地消费消息 。否则,内存可能会填满,然后就会得到一个 OutOfMemory 异常。

DelayQueue

由优先级堆支持的、基于时间的调度队列,内部基于无界队列PriorityQueue实现,而无界队列基于数组的扩容实现。

队列创建:

BlockingQueue<String> blockingQueue = new DelayQueue();

要求

入队的对象必须要实现Delayed接口,而Delayed集成自Comparable接口

应用场景

电影票

工作原理:

队列内部会根据时间优先级进行排序。延迟类线程池周期执行。

BlockingQueue API

BlockingQueue 接口的所有方法可以分为两大类:负责向队列添加元素的方法和检索这些元素的方法。在队列满/空的情况下,来自这两个组的每个方法的行为都不同。

添加元素

方法

说明

add()

如果插入成功则返回 true,否则抛出 IllegalStateException 异常

put()

将指定的元素插入队列,如果队列满了,那么会阻塞直到有空间插入

offer()

如果插入成功则返回 true,否则返回 false

offer(E e, long timeout, TimeUnit unit)

尝试将元素插入队列,如果队列已满,那么会阻塞直到有空间插入

检索元素

方法

说明

take()

获取队列的头部元素并将其删除,如果队列为空,则阻塞并等待元素变为可用

poll(long timeout, TimeUnit unit)

检索并删除队列的头部,如有必要,等待指定的等待时间以使元素可用,如果超时,则返回 null

在构建生产者 - 消费者程序时,这些方法是 BlockingQueue 接口中最重要的构建块。

多线程生产者-消费者示例

接下来我们用“迟早药丸”的场景创建一个由两部分组成的程序 - 生产者 ( Producer ) 和消费者 ( Consumer ) 。

生产者将生成一个 0 到 100 的随机数(十全大补丸的编号),并将该数字放在 BlockingQueue 中。我们将创建 16 个线程(潘金莲)用于生成随机数并使用 put() 方法阻塞,直到队列中有可用空间。

需要记住的重要一点是,我们需要阻止我们的消费者线程无限期地等待元素出现在队列中。

从生产者(潘金莲)向消费者(武大郎)发出信号的好方法是,不需要处理消息,而是发送称为毒 ( poison ) 丸 ( pill ) 的特殊消息。 我们需要发送尽可能多的毒 ( poison ) 丸 ( pill ) ,因为我们有消费者(武大郎)。然后当消费者从队列中获取特殊的毒 ( poison ) 丸 ( pill )消息时,它将优雅地完成执行。

以下生产者的代码:

@Slf4j
public class NumbersProducer implements Runnable {
    private BlockingQueue<Integer> numbersQueue;
    private final int poisonPill;
    private final int poisonPillPerProducer;

    public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
        this.numbersQueue = numbersQueue;
        this.poisonPill = poisonPill;
        this.poisonPillPerProducer = poisonPillPerProducer;
    }
    public void run() {
        try {
            generateNumbers();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void generateNumbers() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
            log.info("潘金莲-{}号,给武大郎的泡药!",Thread.currentThread().getId());
        }
        for (int j = 0; j < poisonPillPerProducer; j++) {
            numbersQueue.put(poisonPill);
            log.info("潘金莲-{}号,往武大郎的药里放入第{}颗毒丸!",Thread.currentThread().getId(),j+1);
        }
    }
}

我们的生成器构造函数将 BlockingQueue 作为参数,用于协调生产者和使用者之间的处理。我们看到方法 generateNumbers() 将 100 个元素(生产100副药给武大郎吃)放入队列中。它还需要有毒 ( poison ) 丸 ( pill ) (潘金莲给武大郎下毒)消息,以便知道在执行完成时放入队列的消息类型。该消息需要将 poisonPillPerProducer 次放入队列中。

每个消费者将使用 take() 方法从 BlockingQueue 获取一个元素,因此它将阻塞,直到队列中有一个元素。从队列中取出一个 Integer 后,它会检查该消息是否是毒 ( poison ) 丸 ( pill )(武大郎看潘金莲有没有下毒) ,如果是,则完成一个线程的执行。否则,它将在标准输出上打印出结果以及当前线程的名称。

@Slf4j
public class NumbersConsumer implements Runnable {
    private BlockingQueue<Integer> queue;
    private final int poisonPill;

    public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
        this.queue = queue;
        this.poisonPill = poisonPill;
    }

    public void run() {
        try {
            while (true) {
                Integer number = queue.take();
                if (number.equals(poisonPill)) {
                    return;
                }
                log.info("武大郎-{}号,喝药-编号:{}",Thread.currentThread().getId(),number);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

需要注意的重要事项是队列的使用。与生成器构造函数中的相同,队列作为参数传递。我们可以这样做,是因为 BlockingQueue 可以在线程之间共享而无需任何显式同步。

既然我们有生产者和消费者,我们就可以开始我们的计划。我们需要定义队列的容量,并将其设置为 10个元素。

我们创建4 个生产者线程,并且创建等于可用处理器数量的消费者线程:

public class Main {

    public static void main(String[] args) {
        int BOUND = 10;
        int N_PRODUCERS = 16;
        int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
        int poisonPill = Integer.MAX_VALUE;
        int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
        int mod = N_CONSUMERS % N_PRODUCERS;

        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);
        //潘金莲给武大郎熬药
        for (int i = 1; i < N_PRODUCERS; i++) {
            new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
        }
        //武大郎开始喝药
        for (int j = 0; j < N_CONSUMERS; j++) {
            new Thread(new NumbersConsumer(queue, poisonPill)).start();
        }
        //潘金莲开始投毒,武大郎喝完毒药GG
        new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();
    }

}

BlockingQueue 是使用具有容量的构造创建的。我们正在创造 4 个生产者和 N 个消费者(武大郎)。我们将我们的毒 ( poison ) 丸 ( pill )消息指定为 Integer.MAX_VALUE,因为我们的生产者在正常工作条件下永远不会发送这样的值。这里要注意的最重要的事情是 BlockingQueue 用于协调它们之间的工作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/305612.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

近似点梯度法

最优化笔记——Proximal Gradient Method 最优化笔记&#xff0c;主要参考资料为《最优化&#xff1a;建模、算法与理论》 文章目录 最优化笔记——Proximal Gradient Method一、邻近算子&#xff08;1&#xff09;定义 二、近似点梯度法&#xff08;1&#xff09;迭代格式&…

4.4 媒资管理模块 - 分布式任务处理介绍、视频处理技术方案

媒资管理模块 - 视频处理 文章目录 媒资管理模块 - 视频处理一、视频转码1.1 视频转码介绍1.2 FFmpeg 基本使用1.2.1 下载安装配置1.2.2 转码测试 1.3 工具类1.3.1 VideoUtil1.3.2 Mp4VideoUtil1.3.3 测试工具类 二、分布式任务处理2.1 分布式任务调度2.2 XXL-JOB 配置执行器 中…

Java诊断利器Arthas

https://arthas.aliyun.com/doc/https://arthas.aliyun.com/doc/ 原理 利用java.lang.instrument(容器类) 做动态 Instrumentation(执行容器) 是 JDK5 的新特性。使用 Instrumentation&#xff0c;开发者可以构建一个独立于应用程序的代理程序&#xff08;Agent&#xff09;&…

three.js实现电子围栏效果(纹理贴图)

three.js实现电子围栏效果&#xff08;纹理贴图&#xff09; 实现步骤 围栏的坐标坐标转换为几何体顶点&#xff0c;uv顶点坐标加载贴图&#xff0c;移动 图例 代码 <template><div class"app"><div ref"canvesRef" class"canvas-…

自带恒压恒流环路的降压型单片车充专用芯片

一、基本概述 XL2009是一款高效降压型DC-DC转换器&#xff0c;固定180KHz开关频率&#xff0c;可以提供最高2.5A输出电流能力&#xff0c;具有低纹波&#xff0c;出色的线性调整率与负载调整率特点。XL2009内置固定频率振荡器与频率补偿电路&#xff0c;简化了电路设计。 PWM …

基于图像合成和注意力的深度神经网络从计算机断层扫描灌注图像中自动分割缺血性脑卒中病变

Automatic ischemic stroke lesion segmentation from computed tomography perfusion images by image synthesis and attention-based deep neural networks 基于图像合成和注意力的深度神经网络从计算机断层扫描灌注图像中自动分割缺血性脑卒中病变背景贡献实验Comparison o…

EMQX5设置客户端连接认证

文章目录 说明配置客户端连接认证配置1、访问控制-客户端认证-创建2、选择“密码”方式-下一步3、选择“内置数据库”-下一步4、账号类型选择“username”5、密码加密方式选择“plain”6、加盐方式“disable”-创建7、添加客户端连接的账密客户端连接验证 心得 说明 号外&…

旋变检测AD2s1205手册学习笔记

旋变故障检测故障表 信号丢失检测 检测原理&#xff1a;任一旋变输入(正弦或余弦)降至指定的LOS正弦/余弦阈值 以下时&#xff0c;器件会检测到信号丢失(LOS)。AD2S1205通过将 监视信号与固定最小值进行比较检测此点 丢失的效果表现&#xff1a;LOS由DOS和LOT引脚均闩锁为逻辑…

从文本(.txt)文件中读取数据时出现中文乱码

前言 当需要从记事本中读取数据时&#xff0c;发现读取的数据会出现中文乱码&#xff0c;我尝试了C和C读取文件&#xff0c;发现都是这样。 乱码原因 文本文件的保存默认使用UTF-8编码方式&#xff0c;而VS编译器的编码方式是GBK&#xff0c;所以不同的编码方式导致了乱码。…

OCP NVME SSD规范解读-5.命令超时限制-2

Sanitize清除的数据很彻底&#xff0c;对FTL映射表、User Data(包括已经写入NAND和仍在cache里的)、Meta Data、安全密匙、CMB中SQ/CQ相关信息、可能含有用户数据的log等等会全部清除。不过&#xff0c;sanitize操作不会改变RPMB、boot分区、不包含用户数据的cache等内容。 RP…

关于burpsuite对app(移动端)进行抓包的配置

可以使用手机模拟器&#xff0c;我这里以自己手机&#xff08;物理机&#xff09;演示配置过程 如果是使用的模拟器那么肯定和电脑是在同一局域网 如果使用物理机&#xff0c;那么可以通过连接同一WiFi确保在同一局域网环境下 查看电脑内网ip&#xff1a;192.168.1.105 &am…

Android 正圆

<?xml version"1.0" encoding"utf-8"?> <RelativeLayout xmlns:android"http://schemas.android.com/apk/res/android"android:layout_width"wrap_content"android:layout_height"wrap_content"android:padding&…

SPL-cmcRVFL+

吐槽 作者未提供代码&#xff0c;还有图1敢再糊点吗&#xff1f;

个性化Python GUI计算器搭建

大家好&#xff0c;本文将介绍在Python中使用Tkinter几分钟内制作自己的全功能GUI计算器。 要完成所提到的功能&#xff0c;除了通常随Python标准库一起安装的Tkinter之外&#xff0c;不需要任何额外的库。 如果使用的是Linux系统&#xff0c;可能需要安装&#xff1a; $ pi…

springboot学生综合测评系统源码和论文

随着信息化时代的到来&#xff0c;管理系统都趋向于智能化、系统化&#xff0c;学生综合测评系统也不例外&#xff0c;但目前国内仍都使用人工管理&#xff0c;学校规模越来越大&#xff0c;同时信息量也越来越庞大&#xff0c;人工管理显然已无法应对时代的变化&#xff0c;而…

linux 内存管理

地址类型 一个虚拟内存系统, 意味着用户程序见到的地址不直接对应于硬件使用 的物理地址. 虚拟内存引入了一个间接层, 它允许了许多好事情. 有了虚拟内存, 系统重 运行的程序可以分配远多于物理上可用的内存; 确实, 即便一个单个进程可拥有一个虚拟 地址空间大于系统的物理内存…

【大厂算法面试冲刺班】day0:数据范围反推时间复杂度

常见算法的时间复杂度 规定n是数组的长度/树或图的节点数 二分查找&#xff1a;O(logn) 双指针/滑动窗口&#xff1a;O(n) DFS/BFS&#xff1a;O(n) 构建前缀和&#xff1a;O(n) 查找前缀和&#xff1a;O(1) 一维动态规划&#xff1a;O(n) 二维动态规划&#xff1a;O(n^2) 回溯…

第7章-第9节-Java中的Stream流(链式调用)

1、什么是Stream流 Lambda表达式&#xff0c;基于Lambda所带来的函数式编程&#xff0c;又引入了一个全新的Stream概念&#xff0c;用于解决集合类库既有的鼻端。 2、案例 假设现在有一个需求&#xff0c; 将list集合中姓张的元素过滤到一个新的集合中&#xff1b;然后将过滤…

做科技类的展台3d模型用什么材质比较好---模大狮模型网

对于科技类展台3D模型&#xff0c;以下是几种常用的材质选择&#xff1a; 金属材质&#xff1a;金属材质常用于科技展台的现代感设计&#xff0c;如不锈钢、铝合金或镀铬材质。金属材质可以赋予展台一个科技感和高档感&#xff0c;同时还可以反射光线&#xff0c;增加模型的真实…

Verilog 高级教程笔记——持续更新中

Verilog advanced tutorial 转换函数 调用系统任务任务描述int_val $rtoi( real_val ) ;实数 real_val 转换为整数 int_val 例如 3.14 -> 3real_val $itor( int_val ) ;整数 int_vla 转换为实数 real_val 例如 3 -> 3.0vec_val $realtobits( real_val ) ;实数转换为…