ArrayBlockingQueue与LinkedBlockingQueue底层原理

ArrayBlockingQueue与LinkedBlockingQueue底层原理

在线程池中,等待队列采用ArrayBlockingQueue或者LinkedBlockingDeque,那他们是怎么实现存放线程、阻塞、取出的呢?

一、ArrayBlockingQueue底层原理

1.1 简介

ArrayBlockingQueue是一个阻塞的队列,继承了AbstractBlockingQueue,间接的实现了Queue接口和Collection接口。 底层以数组的形式保存数据,所以它是基于数组的阻塞队列。ArrayBlockingQueue是有边界值的,在创建ArrayBlockingQueue时就要确定好该队列的大小,一旦创建,该队列大小不可更改。
内部的全局锁是使用的ReentrantLock

1.2 关系图谱

在这里插入图片描述

1.3 父类BlockingQueue的方法梳理
public interface BlockingQueue<E> extends Queue<E> {
    //将对象塞入队列,如果塞入成功返回true, 否则返回false。
    boolean add(E e);

    //将对象塞入到队列中,如果设置成功返回true, 否则返回false
    boolean offer(E e);

    //将元素塞入到队列中,如果队列中已经满了,
    //则该方法会一直阻塞,直到队列中有多余的空间。
    void put(E e) throws InterruptedException;

    //将对象塞入队列并设置时间
    //如果塞入成功返回 true, 否则返回 false.
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    //从队列中取对象,如果队列中没有对象,
    //线程会一直阻塞,直到队列中有对象,并且该方法取得了该对象。
    E take() throws InterruptedException;

    //在给定的时间里,从队列中获取对象,
    //时间到了直接调用普通的poll方法,为null则直接返回null。
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    //获取队列中剩余长度。
    int remainingCapacity();

    //从队列中移除指定的值。
    boolean remove(Object o);

    //判断队列中包含该对象。
    public boolean contains(Object o);

    //将队列中对象,全部移除,并加到传入集合中。
    int drainTo(Collection<? super E> c);

    //指定最多数量限制将队列中对,全部移除,并家到传入的集合中。
    int drainTo(Collection<? super E> c, int maxElements);
}
1.4 ArrayBlockingQueue源码解析
1.4.1 参数解释
	/** 队列中存放的值 */
    final Object[] items;

    /** 值的索引,这是取出位置的索引*/
    int takeIndex;

    /** 值的索引,这是插入位置的索引*/
    int putIndex;

    /** 队列中有多少个元素 */
    int count;

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes 取出时枷锁 */
    private final Condition notEmpty;

    /** Condition for waiting puts 存入时枷锁*/
    private final Condition notFull;
1.4.2 构造方法
	/**
  	* capacity 表示数组中最大容量,默认使用非公平锁
  	*/
   public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
 	/**
  	* capacity 表示数组中最大容量
  	* fair 为 false 时使用非公平锁,true 时使用公平锁
  	*/
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

	/**
  	* capacity 表示数组中最大容量
  	* fair 为 false 时使用非公平锁,true 时使用公平锁
  	* c 初始化时,可以加入将我们有的集合加入该队列中
 	*/
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e); //判空
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }
1.4.3 存入数据的put方法
public void put(E e) throws InterruptedException {
	//判空
    checkNotNull(e);
    //显示锁
    final ReentrantLock lock = this.lock;
    //可中断锁
    lock.lockInterruptibly();
    try {
    	//判断队列元素是否以及满了,满了就阻塞,如果队列满了await 是阻塞队列
        while (count == items.length)
            notFull.await();
            
        //队列未满,入队方法
        enqueue(e);
    } finally {
    //释放锁
        lock.unlock();
    }
}

数据入队方法

   private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        //判断队列是否以及满了
        if (++putIndex == items.length)
        	//满了就将下一个入队索引设置为 0 
            putIndex = 0;
        count++;
        //唤醒 其他阻塞的出队操作
        notEmpty.signal();
    }
1.4.4 取出数据的take方法
  public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        //可中断锁
        lock.lockInterruptibly();
        try {
            while (count == 0)
            	//如果队列数量为0,则阻塞取数据的锁
                notEmpty.await();
                //队列长度不为0,开始取数据
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

从队列取出数据

 private E dequeue() {
        //取得当前items对象
        final Object[] items = this.items;
        //获取数组最后一个数据
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        //取走后置空数组最后一个元素
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
         //唤醒 存入数据的锁
        notFull.signal();
        return x;
    }

二、LinkedBlockingQueue底层原理

2.1 主要参数 解释
//队列中元素个数
private final AtomicInteger count = new AtomicInteger();
//头节点
transient Node<E> head;
//尾节点
private transient Node<E> last;
//出队锁
private final ReentrantLock takeLock = new ReentrantLock();
//如果队列为空,出队就会陷入等待
private final Condition notEmpty = takeLock.newCondition();
//入队锁
private final ReentrantLock putLock = new ReentrantLock();
//如果队列满了,入队就陷入等待
private final Condition notFull = putLock.newCondition();
2.2 存入元素put
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    
    int c = -1;
    Node<E> node = new Node<E>(e);
    //利用ReentrantLock独占锁来加锁,保证同时只有一个线程来put
    final ReentrantLock putLock = this.putLock;
    //利用AtomicInteger来表示queue中的元素个数
    final AtomicInteger count = this.count;
    //可打断的加锁
    putLock.lockInterruptibly();
    try {
		// private final Condition notFull = putLock.newCondition();
        //如果队列满了,就调用notFull。await()。notFull是putLock的条件变量,当调用notFull.await()会将putLock释放,阻塞在等待队列notFull上
        while (count.get() == capacity) {
            notFull.await();
        }

        //入队,不用获得takeLock,因为与出队操作不涉及共享变量
        //从入队代码可以看出head是一个哨兵节点,不存放任何实际数据
        //last = last.next = node;
        enqueue(node);

        //count++
        c = count.getAndIncrement();
        //如果队列未满,唤醒被阻塞的入队线程
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    //如果c == 0,说明入队之前队列为空,唤醒出队的等待线程
    if (c == 0)
        signalNotEmpty();
}

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    //获取出队锁
    takeLock.lock();
    try {
        //唤醒出队等待线程
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

取出元素take

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        //如果队列为空,放弃takeLock,阻塞在等待队列notEmpty上
        while (count.get() == 0) {
            notEmpty.await();
        }
        //出队
        x = dequeue();
        //count--;
        c = count.getAndDecrement();
        //如果队列不为空,唤醒出队等待线程
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    //如果队列不为空,唤醒入队等待线程
    if (c == capacity)
        signalNotFull();
    return x;
}

private E dequeue() {
    //head是哨兵节点,不存放数据,实际的头节点是head.next
    Node<E> h = head;
    //head的next
    Node<E> first = h.next;
    h.next = h;
    //将head踢出
    head = first;
    //first的item才是第一个元素,head是哨兵节点
    E x = first.item;
    first.item = null;
    //从dequeue方法可以看出,queue中始终有一个哨兵head节点,不存储任何数据,queue中第一个元素是head.next
    return x;
}

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        //唤醒入队等待线程
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

三、入队出队总结

3.1 入队
方法做法
put如果队列满了,就阻塞,当队列不满的时候,会再执行入队操作
offer如果队列满了,返回false。未满就返回true
add如果队列满了,抛出异常,未满就返回true
3.2 出队
方法做法
take如果队列为空,就阻塞,当队列不空的时候,会再执行出队操作
poll如果队列空了,返回null
peek返回队列首元素,不会出队

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/459879.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

centos7 install rocketmq 宿主机快速搭建RocketMQ单机开发环境

为什么采用宿主机而不采用 Docker 方式快速搭建 在搭建 RocketMQ 测试环境时&#xff0c;我们可以选择在宿主机上直接安装和配置&#xff0c;也可以使用 Docker 容器来快速搭建。然而&#xff0c;为什么我们选择了在宿主机上安装而不是使用 Docker 方式呢&#xff1f; 调整配置…

电池的性能指标

1 电池基本概念 电池是通过电化学反应将化学能转换为电能的设备。根据是否可反复充电使用&#xff0c;可分为干电池和蓄电池两类。 电池中涉及电化学反应的部分为电极和电解质&#xff0c;电化学反应发生在电极和电解质接触的表面&#xff0c;电解质仅负责协助带电粒子的移动…

腾讯云服务器入站规则端口开放使用指南(CentOS系统)

第一步&#xff1a;开放安全组入站规则 来源处0.0.0.0/0是对IPv4开发&#xff0c;::/0是对IPv6开放&#xff1b; 协议端口按照提示填写即可。云服务器防火墙开放 第三步&#xff1a;本地防火墙开放 sudo firewall-cmd --zonepublic --add-port你的端口号/tcp --perma…

瑞典斯凯孚SKF激光对中仪维修TKSA40/60

SKF对中仪维修型号包括&#xff1a;TKSA40、TKSA20、TKSA31、TKSA41、TKSA51、TKSA71等 斯凯孚SKF激光对中仪维修产品技术参数及注意事项&#xff1a; 该设备有两个无线测量单元、大尺寸测位传感器和更大功率的激光器&#xff0c;即使在严苛的条件下也能实现测量。 显示器单元…

显著性检验P值...

显著性检验&#xff1a;P值和置信度_显著性p<0.05,p<0.01,p<0.001-CSDN博客 看论文里面一般在结果后面都会加上 虽然学过概率统计&#xff0c;但是一直不懂在结果这里加上这个代表什么含义&#xff0c;以及如何计算&#xff0c;参考上面链接进行学习。 P值指的是比较…

蓝桥杯刷题|03入门真题

目录 [蓝桥杯 2020 省 B1] 整除序列 题目描述 输入格式 输出格式 输入输出样例 说明/提示 代码及思路 [蓝桥杯 2020 省 AB3] 日期识别 题目描述 输入格式 输出格式 输入输出样例 说明/提示 代码及思路 [蓝桥杯 2019 省 B] 特别数的和 题目描述 输入格式 输出格…

Web本体语言OWL

语义网&#xff08;Semantic Web&#xff09;&#xff1a; 语义网是万维网联盟&#xff08;W3C&#xff09;提出的一种愿景&#xff0c;旨在增强现有Web的表达能力和智能处理能力&#xff0c;通过标准化的技术手段赋予网络数据更加精确和可计算的语义&#xff0c;使得机器能够…

分布式系统常见负载均衡实现模式

分布式系统常见负载均衡实现模式 1. 4层负载均衡1.1. 负载均衡的常见需求1.2. 负载均衡的实现模式1.2.1 DR模式1.2.2 TUN模式1.2.3 NAT模式1.2.4 FULLNAT模式1.2.5 4种模式的差异 1.3. 负载均衡的均衡算法1.3.1 静态负载均衡1.3.2 轮询法1.3.3 加权循环法1.3.4 IP 哈希法1.3.5 …

数式Oinone应邀参加国有企业数字化优秀案例分享会,现场演讲引发热议

活动背景 为了贯彻落实党的二十大精神,加快中国数字化发展的步伐,以及推动国有资本和国有企业的高质量发展,广东省首席信息官协会于3月14日举办了“第七期国有企业数字化优秀案例分享会”。 此次会议吸引了广深两地:广东建科院、广州白云山中一药业有限公司、越秀交通、中国能…

腾讯云4核8G服务器支持多少人在线?CPU性能如何?

腾讯云轻量4核8G12M服务器配置446元一年&#xff0c;646元12个月&#xff0c;腾讯云轻量应用服务器具有100%CPU性能&#xff0c;系统盘为180GB SSD盘&#xff0c;12M带宽下载速度1536KB/秒&#xff0c;月流量2000GB&#xff0c;折合每天66.6GB流量&#xff0c;超出月流量包的流…

鸿蒙Harmony应用开发—ArkTS声明式开发(基础手势:Progress)

进度条组件&#xff0c;用于显示内容加载或操作处理等进度。 说明&#xff1a; 该组件从API version 7开始支持。后续版本如有新增内容&#xff0c;则采用上角标单独标记该内容的起始版本。 子组件 无 接口 Progress(options: ProgressOptions<Type>) 创建进度组件&a…

操作多级(一、二、三级)指针才是我们的该有的姿态~

Hello&#xff0c;很有缘在这篇文章上我们相遇了&#xff0c;那么我就用题目巩固我们多级指针的知识&#xff0c;当然这里的题目是比较有点难度的&#xff0c;我们需要有点基础呀&#xff0c;如果你能轻松理解题目那说明你对指针的了解已经很有基础了呢&#xff0c;那废话不多说…

this是什么?为什么要改变this?怎么改变 this 指向?

目录 this 是什么&#xff1f; 箭头函数中的 this 为什么要改变 this 指向&#xff1f; 改变 this 指向的三种方法 call(无数个参数) apply(两个参数) bind(无数个参数) this 是什么&#xff1f; 在对象方法中&#xff0c;this 指的是所有者对象&#xff08;方法的拥有者…

鸿蒙Harmony应用开发—ArkTS声明式开发(容器组件:Navigator)

路由容器组件&#xff0c;提供路由跳转能力。 说明&#xff1a; 该组件从API Version 7开始支持。后续版本如有新增内容&#xff0c;则采用上角标单独标记该内容的起始版本。 子组件 可以包含子组件。 接口 Navigator(value?: {target: string, type?: NavigationType}) …

网络安全之URL过滤

知识改变命运&#xff0c;技术就是要分享&#xff0c;有问题随时联系&#xff0c;免费答疑&#xff0c;欢迎联系&#xff01; URL过滤是一种针对用户的URL请求进行上网控制的技术&#xff0c;通过允许或禁止用户访问某些网页资源&#xff0c;达到规范上网行为和降低安全风险…

前端项目(vue3)自动化部署(Gitlab CI/CD)

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

VUE 运行NPM 报错:npm ERR! code CERT_HAS_EXPIRED 解决方案

现象 由于各种原因需要调试一下VUE代码&#xff0c;用Git拉下来运行不了&#xff08;之前是可以正常运行的&#xff09;&#xff0c;报错为&#xff1a;npm ERR! code CERT_HAS_EXPIRED........... 原因 NPM 证书签名过期了 解决方法 第一步&#xff1a;CMD 命令 查看NPM代理源…

计算机设计大赛 题目:基于卷积神经网络的手写字符识别 - 深度学习

文章目录 0 前言1 简介2 LeNet-5 模型的介绍2.1 结构解析2.2 C1层2.3 S2层S2层和C3层连接 2.4 F6与C5层 3 写数字识别算法模型的构建3.1 输入层设计3.2 激活函数的选取3.3 卷积层设计3.4 降采样层3.5 输出层设计 4 网络模型的总体结构5 部分实现代码6 在线手写识别7 最后 0 前言…

文件操作与IO流

文章目录 File文件操作类IO流原理及流的分类节点流FileInputStreamFileOutputStreamFileReaderFileWriter 处理流BufferedReaderBufferedWriterBufferedInputStreamBufferedOutputStreamObjectInputStreamObjectOutputStreamPrintStreamPrintWriter 标准输入输出流 Properties …

Sublime Text简介、下载、安装、汉化、常用插件和激活——《跟老吕学Python编程》附录资料

Sublime Text简介、下载、安装、汉化、常用插件和激活——《跟老吕学Python编程》附录资料 Sublime Text 简介Sublime Text 下载、安装、汉化、常用插件和激活Sublime Text 官网Sublime Text 下载Sublime Text 安装1.安装2.右键菜单3.启动安装4.耐心等待5.安装完成 Sublime Tex…