ArrayBlockingQueue 数组阻塞队列 源码阅读

1. 概述

数组阻塞队列

  • 有界的阻塞数组, 容量一旦创建, 无法修改
  • 阻塞队列, 队列满的时候, 往队列put数据会被阻塞, 队列空, 取数据也会被阻塞
  • 并发安全

2. 数据结构

/** 存储队列元素的数组 */
/** 存储队列元素的数组 */
final Object[] items;

/** 队首位置,下一次 take, poll, peek, remove 方法在 items 中的位置  */
int takeIndex;

/** 队末位置,下一次 put, offer, add 方法在 items 中的位置 */
int putIndex;

/** 队列中元素的数量 */
int count;

// 锁 保证队列的并发安全性
final ReentrantLock lock;

/** 队列不为空的监视器 */
private final Condition notEmpty;

/** 不满(队列元素数量小于items.size())的监视器 */
private final Condition notFull;

3. 初始化 构造函数

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        // 数组
        this.items = new Object[capacity];
        // 队列公平不公平指的是用的ReentrantLock
        lock = new ReentrantLock(fair);
        // 创建队列不为空,不满的监视器
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;

        // 加锁
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                // 集合元素一个个入队
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

4. 方法

4.1 核心方法

核心方法入队,出队,移除元素,调用这些方法都加锁的。

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        // 数组满了,putIndex重新从0开始
        if (++putIndex == items.length)
            putIndex = 0;
        // 添加元素count+1
        count++;
        // 唤醒队列不为空的wait
        notEmpty.signal();
    }

    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        // 添加元素count-1
        count--;
        // 如果迭代器不为空维护一下
        if (itrs != null)
            itrs.elementDequeued();
        // 唤醒队列不满的wait
        notFull.signal();
        return x;
    }

   void removeAt(final int removeIndex) {
        // assert lock.getHoldCount() == 1;
        // assert items[removeIndex] != null;
        // assert removeIndex >= 0 && removeIndex < items.length;
        final Object[] items = this.items;
        if (removeIndex == takeIndex) {
            // 如果要移除的是就是要出队的元素
            // removing front item; just advance
            // 元素设置为null
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
        } else {
            // an "interior" remove
            // slide over all others up through putIndex.
            // 用removeIndex后面的元素,向前移动一位
            final int putIndex = this.putIndex;
            for (int i = removeIndex;;) {
                int next = i + 1;
                if (next == items.length)
                    next = 0;
                if (next != putIndex) {
                    items[i] = items[next];
                    i = next;
                } else {
                    items[i] = null;
                    this.putIndex = i;
                    break;
                }
            }
            count--;
            if (itrs != null)
                itrs.removedAt(removeIndex);
        }
        notFull.signal();
    }

4.2 常用外部调用的方法

offer(),入队,队列满了直接返回 false,不会阻塞,入队成功返回true

   public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

add,其实调用了 offer

    public boolean add(E e) {
        return super.add(e);
    }

    // 父类
    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

put lock可以被中断,队列满了会阻塞, notFull.await(),等队列元素出队notFull.signal() 唤醒

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
        notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

在put基础上,等待timeout个unit时间

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {

    checkNotNull(e);
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length) {
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}

取出元素,列表为空直接返回null,不阻塞

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

取元素,队列空阻塞等待

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
        notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

取元素,队列空等待参数指定的时间

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();
    } finally {
        lock.unlock();
    }
}

查看将要取出的元素,元素不出队

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}

final E itemAt(int i) {
    return (E) items[i];
}

移除某个元素,移除找到的第一个这个元素,并返回true

    public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                // 遍历
                int i = takeIndex;
                do {
                    if (o.equals(items[i])) {
                        // 找到了移除对应元素
                        removeAt(i);
                        return true;
                    } 
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

  public boolean contains(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                do {
                    if (o.equals(items[i]))
                        // 找到了返回true
                        return true;
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

清理队列

public void clear() {
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int k = count;
        if (k > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            do {
                items[i] = null;
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);
            takeIndex = putIndex;
            count = 0;
            if (itrs != null)
                itrs.queueIsEmpty();
            for (; k > 0 && lock.hasWaiters(notFull); k--)
            notFull.signal();
        }
    } finally {
        lock.unlock();
    }
}

将队列中的元素移动到c集合中

public int drainTo(Collection<? super E> c) {
    return drainTo(c, Integer.MAX_VALUE);
}

/**
 * @throws UnsupportedOperationException {@inheritDoc}
 * @throws ClassCastException            {@inheritDoc}
 * @throws NullPointerException          {@inheritDoc}
 * @throws IllegalArgumentException      {@inheritDoc}
 */
public int drainTo(Collection<? super E> c, int maxElements) {
    checkNotNull(c);
    if (c == this)
        throw new IllegalArgumentException();
    if (maxElements <= 0)
        return 0;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // n取 maxElements 和 集合元素的最小值
        int n = Math.min(maxElements, count);
        int take = takeIndex;
        int i = 0;
        try {
            while (i < n) {
                @SuppressWarnings("unchecked")
                E x = (E) items[take];
                c.add(x);
                items[take] = null;
                if (++take == items.length)
                    take = 0;
                i++;
            }
            return n;
        } finally {
            // Restore invariants even if c.add() threw
            if (i > 0) {
                count -= i;
                // 修改队首位置
                takeIndex = take;
                if (itrs != null) {
                    if (count == 0)
                        itrs.queueIsEmpty();
                    else if (i > take)
                        itrs.takeIndexWrapped();
                }
                for (; i > 0 && lock.hasWaiters(notFull); i--)
                notFull.signal();
            }
        }
    } finally {
        lock.unlock();
    }
}

5. 单元测试


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class ArrayBlockingQueueStudy {
    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);
        queue.put("1");
        queue.put("2");

        // true
        System.out.println(queue.offer("3"));
        // false
        System.out.println(queue.offer("4"));

        // 抛出异常
        // Exception in thread "main" java.lang.IllegalStateException: Queue full
        // at java.util.AbstractQueue.add(AbstractQueue.java:98)
        // at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
        //  at arrayBlockingQueue.ArrayBlockingQueueStudy.main(ArrayBlockingQueueStudy.java:18)
        // System.out.println(queue.add("4"));

        // 阻塞
        // queue.put("4");

        // 等待1s 返回false
        System.out.println(queue.offer("4", 1, TimeUnit.SECONDS));

        System.out.println(queue.poll());
        System.out.println(queue.poll());
        System.out.println(queue.poll());
        // null
        System.out.println(queue.poll());

        // 阻塞
        // System.out.println(queue.take());

        // 等待1s 返回null
        System.out.println(queue.poll(1, TimeUnit.SECONDS));

        queue.put("1");
        // 1
        System.out.println(queue.peek());
        // 1
        System.out.println(queue.peek());

        queue.put("1");
        System.out.println(queue.remove("1"));
        // 1
        System.out.println(queue.peek());
        System.out.println(queue.remove("1"));
        // null
        System.out.println(queue.peek());

        queue.put("1");
        queue.put("1");
        queue.clear();
        // null
        System.out.println(queue.peek());

        List<String> c = new ArrayList<>();
        queue.put("1");
        queue.put("1");
        queue.drainTo(c);
        // 2 ["1","1"]
        System.out.println(c.size());
        // null
        System.out.println(queue.peek());
    }
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/425208.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

D2BNet

task-specific contextual information {M l _l l​} that we split into { M l m , M l a M_l^m,M_l^a Mlm​,Mla​} 辅助信息 作者未提供代码

基于Spring Boot+Vue的旅游网站

末尾获取源码作者介绍&#xff1a;大家好&#xff0c;我是墨韵&#xff0c;本人4年开发经验&#xff0c;专注定制项目开发 更多项目&#xff1a;CSDN主页YAML墨韵 学如逆水行舟&#xff0c;不进则退。学习如赶路&#xff0c;不能慢一步。 目录 一、项目简介 二、开发技术与环…

Fastwhisper + Pyannote 实现 ASR + 说话者识别

文章目录 前言一、faster-whisper简单介绍二、pyannote.audio介绍三、faster-whisper pyannote.audio 实现语者识别四、多说几句 前言 最近在研究ASR相关的业务&#xff0c;也是调研了不少模型&#xff0c;踩了不少坑&#xff0c;ASR这块&#xff0c;目前中文普通话效果最好的…

ASM处理字节码流程

ASM处理字节码流程 ASM是一个操作Java字节码类库&#xff0c;其操作的对象是字节码数据&#xff0c;处理字节码方式是“拆分-修改-合并”。 将.class文件拆分成多个部分 对某一个部分的信息进行修改 将多个部分重新组织成一个新的class文件

机器学习-4

文章目录 前言数组创建切片索引索引遍历切片编程练习 总结 前言 本篇将介绍数据处理 Numpy 库的一些基本使用技巧&#xff0c;主要内容包括 Numpy 数组的创建、切片与索引、基本运算、堆叠等等。 数组创建 在 Python 中创建数组有许多的方法&#xff0c;这里我们使用 Numpy 中…

校企合作项目总结

校企合作总结 前言项目框架开发待办水平越权总结 前言 寒假里小组给了校企合作的项目&#xff0c;分配给我的工作量总共也就两三套crud&#xff0c;虽然工作很少&#xff0c;但还是从里面学到了很多东西&#xff0c;收获了大量的实习经验&#xff0c;在这里总结记录一下。 项…

LLM@本地大语言模型@Gemma的安装与使用@dockerDesktop的安装和启动

文章目录 准备refsollama安装过程2b模型的效果小结&#x1f47a; ollama的进一步使用帮助文档查看ollama安装了哪些模型使用皮肤来使聊天更易用 使用Chatbot UI皮肤安装docker&#x1f47a;启动docker载入和退出dockerchatbot 网页版皮肤 使用命令行聊天小结&#x1f47a; 准备…

vulhub中JBoss 4.x JBossMQ JMS 反序列化漏洞复现(CVE-2017-7504)

Red Hat JBoss Application Server&#xff08;AS&#xff0c;也称WildFly&#xff09;是美国红帽&#xff08;Red Hat&#xff09;公司的一款基于JavaEE的开源的应用服务器&#xff0c;它具有启动超快、轻量、模块化设计、热部署和并行部署、简洁管理、域管理及第一类元件等特…

R750 install AMD MI210GPU

一、 查看服务器GPU卡信息 可以首先在服务器上check 当前GPU的详细信息是否匹配 二、安装 Ubuntu22.04操作系统 服务器CHECK 安装的AMD GPU 是否被系统识别 #lspci | grep AMD 查看GPU信息 可以看到已经识别成功 三、安装AMD GPU驱动 https://rocm.docs.amd.com/projec…

使用query请求数据出现500的报错

我在写项目的时候遇到了一个问题&#xff0c;就是在存商品id的时候我将它使用了JSON.stringify的格式转换了&#xff01;&#xff01;&#xff01;于是便爆出了500这个错误&#xff01;&#xff01;&#xff01; 我将JSON.stringify的格式去除之后&#xff0c;它就正常显示了&…

数的范围 刷题笔记

思路 寻找第一个大于等于目标的 数 因为该数组是升序的 所以 我们可以采用二分的方式 逼近答案 定义一个左指针和一个右指针 当左右指针重合时 就是我们要找的答案 当我们寻找第一个大于等于x的数时 a[mid]>x,答案在mid处 或者在mid的左边 因此让rmid继续逼近 如果…

【Python】进阶学习:pandas--groupby()用法详解

&#x1f4ca;【Python】进阶学习&#xff1a;pandas–groupby()用法详解 &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教程&#x1f448;…

【机器学习】有监督学习算法之:支持向量机

支持向量机 1、引言2、决策树2.1 定义2.2 原理2.3 实现方式2.4 算法公式2.5 代码示例 3、总结 1、引言 小屌丝&#xff1a;鱼哥&#xff0c;泡澡啊。 小鱼&#xff1a;不去 小屌丝&#xff1a;… 此话当真&#xff1f; 小鱼&#xff1a;此话不假 小屌丝&#xff1a;到底去还是…

ssm666社区流浪动物救助领养系统的设计与开发

** &#x1f345;点赞收藏关注 → 私信领取本源代码、数据库&#x1f345; 本人在Java毕业设计领域有多年的经验&#xff0c;陆续会更新更多优质的Java实战项目希望你能有所收获&#xff0c;少走一些弯路。&#x1f345;关注我不迷路&#x1f345;** 一 、设计说明 1.1 课题…

Window下编写的sh文件在Linux/Docker中无法使用

Window下编写的sh文件在Linux/Docker中无法使用 一、sh文件目的1.1 初始状态1.2 目的 二、过程与异常2.1 首先获取标准ubuntu20.04 - 正常2.2 启动ubuntu20.04容器 - 正常2.3 执行windows下写的preInstall文件 - 报错 三、检查和处理3.1 评估异常3.2 处理异常3.3 调整后运行测试…

笔记本hp6930p安装Android-x86补记

在上一篇日记中&#xff08;笔记本hp6930p安装Android-x86避坑日记-CSDN博客&#xff09;提到hp6930p安装Android-x86-9.0&#xff0c;无法正常启动&#xff0c;本文对此再做尝试&#xff0c;原因是&#xff1a;Android-x86-9.0-rc2不支持无线网卡&#xff0c;需要在BIOS中关闭…

前端学习第六天-css浮动和定位

达标要求 了解浮动的意义 掌握浮动的样式属性 熟练应用清除浮动 熟练掌握定位的三种方式 能够说出网页布局的不同方式的意义 1. 浮动(float) 1.1 CSS 布局的三种机制 网页布局的核心——就是用 CSS 来摆放盒子。CSS 提供了 3 种机制来设置盒子的摆放位置&#xff0c;分…

【推荐算法系列十七】:GBDT+LR 排序算法

排序算法经典中的经典 参考 推荐系统之GBDTLR 极客时间 手把手带你搭建推荐系统 课程 逻辑回归&#xff08;LR&#xff09;模型 逻辑回归&#xff08;LR,Logistic Regression&#xff09;是一种传统机器学习分类模型&#xff0c;也是一种比较重要的非线性回归模型&#xff…

0.8秒一张图40hx矿卡stable diffusion webui 高质极速出图组合(24.3.3)

新消息是。经过三个月的等待&#xff0c;SD Webui (automatic1111)终于推出了新版本1.8.0&#xff0c;本次版本最大的更新&#xff0c;可能就是pytorch更新到2.1.2, 不过还是晚了pytorch 2.2.2版。 不过这版的一些更新&#xff0c;在forget分支上早就实现了&#xff0c;所以。…

快递批量查询高手:轻松管理物流信息,提升工作效率

快递批量查询高手&#xff1a;轻松管理物流信息&#xff0c;提升工作效率着 电商市场的不断壮大&#xff0c;物流行业的发展也日新月异。在如此繁忙的物流环境中&#xff0c;如何高效地管理物流信息成为了一个重要的课题。而在这个背景下&#xff0c;一款名为“快递批量查询高…