【源码解析】聊聊阻塞队列之BlockingArrayQueue

阻塞队列

阻塞队列:顾名思义 首先它是一个队列,而一个阻塞队列在数据结构中所起的作用大致如下入所示。

  • 当阻塞队列是空时,从队列中获取元素的操作将会被阻塞。
  • 当阻塞队列时满的时,往队列里添加元素的操作将会被阻塞。

试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。

同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程从队列中移除一个元素才可以插入队列中。

为什么使用阻塞队列 好处?

在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒。

为什么需要BlockingQueue

好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BolckingQueue都给你一手包办了,在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。

阻塞队列接口和实现类

实现类特点
ArrayBlockingQueue由数组结构组成的有界阻塞队列
LinkedBlockingQueue由链表结构组成的有界(但大小默认值有Integer.MAX_VALUE)阻塞队列
PriorityBlockingQueue支持优先级排序的无界阻塞队列
DelayQueue使用优先级队列实现的延迟无界阻塞队列
SynchronousQueue不存储元素的阻塞队列,也既单个元素的队列
LinkedTransferQueue由链表结构组成的无界阻塞队列
LinkedBlockingDeque由链表结构组成的双向阻塞队列

BlockingQueue

//阻塞队列
public interface BlockingQueue<E> extends Queue<E> {
   
//   将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量
//  在成功时返回 true,如果此队列已满,则抛IllegalStateException。
    boolean add(E e);

    //插入队列的尾部
    boolean offer(E e);

 		void put(E e) throws InterruptedException;

    //插入队列的尾部,可以设置等待时间,不成功抛出异常
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    //移除对头部元素。如果没有元素会阻塞
    E take() throws InterruptedException;

    //移除对头元素
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;
}

插入方法:

add(E e) : 添加成功返回true,失败抛IllegalStateException异常
  offer(E e) : 成功返回 true,如果此队列已满,则返回 false。
  put(E e) :将元素插入此队列的尾部,如果该队列已满,则一直阻塞 // 推荐使用这个
删除方法:

remove(Object o) :移除指定元素,成功返回true,失败返回false
  poll() : 获取并移除此队列的头元素,若队列为空,则返回 null
  take():获取并移除此队列头元素,若没有元素则一直阻塞。 //推荐使用这个

ArrayBlockingQueue

内部是通过冲入锁reentrantLock和Condition条件队列实现的,即有公平访问和非公平访问两种方式。

一个测试demo

public class BlockingArrayQueue {

    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1);
        new Thread(new Product(blockingQueue)).start();
        new Thread(new Consumer(blockingQueue)).start();
    }

}

class Product implements Runnable{

    private BlockingQueue<String> blockingQueue;

    public Product(BlockingQueue blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                System.out.println("添加元素了");
                blockingQueue.put(String.valueOf(UUID.randomUUID()));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable{

    private BlockingQueue<String> blockingQueue;

    public Consumer(BlockingQueue blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                String take = blockingQueue.take();
                System.out.println("消费元素了  "+take);
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

构造方法

    //默认是非公平锁的方式
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

	  //可以通过参数进行设置是否使用公平锁
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        //使用的是 ReentrantLock 去判断是否使用公平锁
        lock = new ReentrantLock(fair);
        //使用的condition 非空和非满的条件
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

属性

    /** The queued items */
    // 存储数据的数组 使用的是一个object数组
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    //获取数据的索引
    int takeIndex;

    /** items index for next put, offer, or add */
    //添加数据的索引
    int putIndex;

    /** Number of elements in the queue */
    //队列元素的个数
    int count;

    /** Main lock guarding all access */
    //控制并发访问的锁ReentrantLock
    final ReentrantLock lock;

    // notEmpty条件对象,用于通知take方法队列已有元素,可执行获取操作
    /** Condition for waiting takes */
    private final Condition notEmpty;

    // notFull条件对象,用于通知put方法队列未满,可执行添加操作
    /** Condition for waiting puts */
    private final Condition notFull;

    //迭代器
    transient Itrs itrs = null;

添加

   //add方法 其实调用的是offer()
    //加入数据成功 返回true
    //加入失败就抛出异常
    public boolean add(E e) {
        return super.add(e);
    }

    public boolean offer(E e) {
        //判断是否为空
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //加锁操作
        lock.lock();
        try {
            //如果当前队列满 返回false
            if (count == items.length)
                return false;
            else {
                // 添加元素到队列中
                enqueue(e);
                return true;
            }
        } finally {
            //释放锁
            lock.unlock();
        }
    }


    //阻塞方法
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly(); //该方法可被中断
        try {
            //当队列元素和数组长度相等  当前线程挂起,添加到notFull条件队列中等待唤醒
            while (count == items.length)
                notFull.await();
            //不满则,直接添加元素
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

获取

   public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //队列元素为空 返回null
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                //如果没有元素 会进行阻塞
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    
    
    //直接返回队列头的元素 不进行修改
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); // null when queue is empty
        } finally {
            lock.unlock();
        }
    }

在这里插入图片描述
put和take是阻塞方法,所以在实际的编程中使用阻塞队列的话,其实更多的还是使用者一组。

至于阻塞和唤醒的原因,其实就是当put 队列满了之后,await 阻塞,队列如果取数据的话,就自动唤醒使用notFull.signal。
同理take()也是一样的,如果获取的元素队列是空,那么就会等待。如果有元素入队列,notEmpty.singal()
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/223529.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

湖南麒麟下默认使用串口输出系统日志

有时候为了调试方便&#xff0c;需要将系统日志通过CPU的串口进行输出&#xff0c;以下是针对至强E5V4处理器上安装湖南麒麟操作系统后将日志通过串口输出的配置。 首先在bios中打开串口重定向功能&#xff0c;这里的BIOS是AMI的BIOS 内部配置如下&#xff0c;波特率115200配置…

Python实现FA萤火虫优化算法优化XGBoost分类模型(XGBClassifier算法)项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 萤火虫算法&#xff08;Fire-fly algorithm&#xff0c;FA&#xff09;由剑桥大学Yang于2009年提出 , …

校园教务管理系统

学年论文&#xff08;课程设计&#xff09; 题目&#xff1a; 信息管理系统 校园教务管理系统 摘要&#xff1a;数据库技术是现代信息科学与技术的重要组成部分&#xff0c;是计算机数据处理与信息管理系统的核心&#xff0c;随着计算机技术的发展&#xff0c;数据库技…

Android的前台服务

概述 前台服务是用户主动意识到的一种服务&#xff0c;因此在内存不足时&#xff0c;系统也不会考虑将其终止。前台服务必须为状态栏提供通知&#xff0c;将其放在运行中的标题下方。这意味着除非将服务停止或从前台移除&#xff0c;否则不能清除该通知。 在 Android 8.0&…

【使用高德开放平台API和js的Ajax代码实现定位并获得城市的天气情况】

使用高德开放平台API和js的Ajax代码实现定位并获得城市的天气情况 1、注册高德开放平台账号&#xff0c;免费获得Web服务API应用key 高德开放平台Web服务API 按照API点击申请KEY 登录后进入应用管理 新建应用&#xff08;随意起名&#xff09; 然后添加key提交即可 然后就可…

MySQL生僻字修改编码utf8mb4

1、查看你编码 SHOW VARIABLES WHERE Variable_name LIKE character_set_% OR Variable_name LIKE collation%;&#xff08;如果不是下图则继续&#xff09; 2、修改默认参数 /etc/my.cnf [mysqld] datadir/usr/local/mysql/data basedir/usr/local/mysql socket/usr/local/my…

zookeeper集群 +kafka集群

1.zookeeper kafka3.0之前依赖于zookeeper zookeeper是一个开源&#xff0c;分布式的架构&#xff0c;提供协调服务&#xff08;Apache项目&#xff09; 基于观察者模式涉及的分布式服务管理架构 存储和管理数据&#xff0c;分布式节点上的服务接受观察者的注册&#xff0c…

【Linux】冯诺依曼体系结构(硬件)、操作系统(软件)、系统调用和库函数 --- 概念篇

&#x1f466;个人主页&#xff1a;Weraphael ✍&#x1f3fb;作者简介&#xff1a;目前正在学习c和Linux还有算法 ✈️专栏&#xff1a;Linux &#x1f40b; 希望大家多多支持&#xff0c;咱一起进步&#xff01;&#x1f601; 如果文章有啥瑕疵&#xff0c;希望大佬指点一二 …

Vcenter 6.7 VCSA证书过期问题处理

1. 故障现象 2022年10月25日&#xff0c;登陆VC报错。 按照报错信息&#xff0c;结合官方文档&#xff0c;判断为STS证书过期导致。 vCenter Server Appliance (VCSA) 6.5.x, 6.7.x or vCenter Server 7.0.x 在/var/log/vmware/vpxd-svcs/vpxd-svcs.log看到类似报错: ERRO…

深度探索 Python Pyramid 框架

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com Pyramid是一个灵活且强大的Python web框架&#xff0c;广泛用于构建各种规模的Web应用程序。本文将深度探索Pyramid框架&#xff0c;介绍其核心概念、应用场景以及一些高级特性。 安装与基础用法 首先&#xf…

Python if else条件语句详解

if 分支使用布尔表达式或布尔值作为分支条件来进行分支控制。Python 的 if 分支既可作为语句使用&#xff0c;也可作为表达式使用。下面先介绍 if 分支作为语句使用的情形。 if 语句可使用任意表达式作为分支条件来进行分支控制。Python 的 if 语句有如下三种形式&#xff1a;…

通过仿真理解信道化接收机分析过程

概要 信道化从子信道带宽划分上可分为临界抽取和非临界抽取两种&#xff0c;从各子信道中心频率布局上可分为偶型排列和奇型排列&#xff0c;从处理流程上可分为信道化分析与信道化综合过程。本文主要通过仿真来理解偶型排列/临界抽取/信道化分析过程。 基本原理 常规的数字…

maven生命周期回顾

目录 文章目录 **目录**两种最常用打包方法&#xff1a;生命周期&#xff1a; 两种最常用打包方法&#xff1a; 1.先 clean&#xff0c;然后 package2.先 clean&#xff0c;然后install 生命周期&#xff1a; 根据maven生命周期&#xff0c;当你执行mvn install时&#xff0c…

Python中字符串拼接及其应用场景

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com 字符串拼接是Python中常见而重要的操作&#xff0c;它涉及到将多个字符串连接成一个字符串。本文将深入探讨Python中字符串拼接的不同方式、性能比较、以及在实际应用中的场景和最佳实践。 常见的字符串拼接方法…

【Pytorch使用自制数据集,Dataloader】

数据集结构 话不多说&#xff0c;直接上核心代码 myDataset.py from collections import Counter from torch.utils.data import Dataset import os from PIL import Imageclass MyDataset(Dataset):"""读取自制的数据集args:- image_dir: 图片的地址- labe…

腾讯云轻量应用服务器怎么安装BT宝塔面板?

腾讯云轻量应用服务器宝塔面板怎么用&#xff1f;轻量应用服务器如何安装宝塔面板&#xff1f;在镜像中选择宝塔Linux面板腾讯云专享版&#xff0c;在轻量服务器防火墙中开启8888端口号&#xff0c;然后远程连接到轻量服务器执行宝塔面板账号密码查询命令&#xff0c;最后登录和…

数据结构 | 查漏补缺之哈希表、最短路径、二叉树与森林的转换

哈希表是什么&#xff1f; 或者说 设图采用邻接表的存储结构&#xff0c;写对图的删除顶点和删除边的算法步骤 删除边 删除点 最短路径问题 判断一个有向图是否有环&#xff08;回路&#xff09;可以用下列哪些办法&#xff08;&#xff09; 存在拓扑序列&#xff0c;就有回路…

JVM之基本概念(一)

(1) 基本概念&#xff1a; JVM 是可运行 Java 代码的假想计算机 &#xff0c;包括一套字节码指令集、一组寄存器、一个栈、一个垃圾回收&#xff0c;堆 和 一个存储方法域。JVM 是运行在操作系统之上的&#xff0c;它与硬件没有直接的交互。 (2) 运行过程&#xff1a; 我们都…

关于PDE频率的问题

讨论&#xff1a;关于PDE频率的问题 关于PDE频率的问题问题复现讨论解决方法 关于PDE频率的问题 问题 在这篇文章下有人提出一个问题&#xff1a; 在使用物理信息神经网络对固定求解区域内进行物理场的预测&#xff0c;具体来说是在求解传热问题。在定义域为1或者大于0.4的正…

Python Opencv实践 - 简单的AR项目

这个简单的AR项目效果是&#xff0c;通过给定一张静态图片作为要视频中要替换的目标物品&#xff0c;当在视频中检测到图片中的物体时&#xff0c;通过单应矩阵做投影&#xff0c;将视频中的物体替换成一段视频播放。这个项目的所有素材来自自己的手机拍的视频。 静态图片&…