从源码全面解析LinkedBlockingQueue的来龙去脉

一、引言

并发编程在互联网技术使用如此广泛,几乎所有的后端技术面试官都要在并发编程的使用和原理方面对小伙伴们进行 360° 的刁难。

二、使用

对于阻塞队列,想必大家应该都不陌生,我们这里简单的介绍一下,对于 Java 里面的阻塞队列,其使用了 生产者和消费者 的模型

对于生产者来说,主要有以下几部分:

add(E)     	// 添加数据到队列,如果队列满了,无法存储,抛出异常
offer(E)    // 添加数据到队列,如果队列满了,返回false
offer(E,timeout,unit)   // 添加数据到队列,如果队列满了,阻塞timeout时间,如果阻塞一段时间,依然没添加进入,返回false
put(E)      // 添加数据到队列,如果队列满了,挂起线程,等到队列中有位置,再扔数据进去,死等!
复制代码

对于消费者来说,主要有以下几部分:

remove()    // 从队列中移除数据,如果队列为空,抛出异常
poll()      // 从队列中移除数据,如果队列为空,返回null,么的数据
poll(timeout,unit)   // 从队列中移除数据,如果队列为空,挂起线程timeout时间,等生产者扔数据,再获取
take()     // 从队列中移除数据,如果队列为空,线程挂起,一直等到生产者扔数据,再获取
复制代码

我们本篇来讲讲堵塞队列中的第二员猛将,LinkedBlockingQueue 的故事

我们先来看其基本使用

public class LinkedBlockingQueueTest {
    public static void main(String[] args) throws Exception {
        LinkedBlockingQueue queue = new LinkedBlockingQueue();

        // 生产者扔数据
        queue.add("1");
        queue.offer("2");
        queue.offer("3", 2, TimeUnit.SECONDS);
        queue.put("2");

        // 消费者取数据
        System.out.println(queue.remove());
        System.out.println(queue.poll());
        System.out.println(queue.poll(2, TimeUnit.SECONDS));
        System.out.println(queue.take());
    }
}
复制代码

三、源码

1、初始化

由于我们的 LinkedBlockingQueue 底层是链表实现的,所以我们初始化的时候不需要指定其大小

LinkedBlockingQueue queue = new LinkedBlockingQueue();

// 如果我们不指定容量大小的话,这里的容量默认为Integer.MAX_VALUE
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    // 如果容量传进来是小于等于0的,直接抛异常
    if (capacity <= 0){
        throw new IllegalArgumentException();
    }
    // 当前的容量赋值
    this.capacity = capacity;
    // 这里其实和我们的AQS有点像
    // 搞一个虚拟的头结点,减少后面的判空
    last = head = new Node<E>(null);
}
复制代码

当然,除了我们初始化的这些成员变量,我们还有一部分:

class Node<E> {
    // 当前的数据
    E item;
    // 指向下一个数据的指针
    Node<E> next;
    Node(E x) {
        item = x;
    }
}

// 当前链表中存在的数据数量
private final AtomicInteger count = new AtomicInteger();

// 读锁
private final ReentrantLock takeLock = new ReentrantLock();

// 唤醒消费者线程
private final Condition notEmpty = takeLock.newCondition();

// 写锁
private final ReentrantLock putLock = new ReentrantLock();

// 唤醒生产者线程
private final Condition notFull = putLock.newCondition();
复制代码

这里可能有的小伙伴有点懵逼,为什么这哥们(LinkedBlockingQueue)用了两个锁呢?为什么我 ArrayBlockingQueue 只能用一把锁?

不要急,我们慢慢的往下看他源码

2、生产者的源码

2.1 add()源码实现

public boolean add(E e) {
    return super.add(e);
}

// 走到这里会发现,我们的add方法就是调用了offer方法
// offer: 添加数据到队列,如果队列满了,返回false
// 所以这里offer满了,就会抛出异常:"Queue full"
public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
复制代码

2.2 offer()源码实现

public boolean offer(E e) {
    // 如果是空值,直接抛出异常
    if (e == null) throw new NullPointerException();
    // 引用,上篇我们分析过
    final AtomicInteger count = this.count;
    // 判断当前数据量是否和我们总容量一样
    if (count.get() == capacity){
        return false;
    }
    // 标记位
    int c = -1;
    // 创建节点
    Node<E> node = new Node<E>(e);
    // 引用写锁
    final ReentrantLock putLock = this.putLock;
    // 上锁
    putLock.lock();
    try {
        // 如果当前数据量小于总容量
        // 这里我们上面也检查过,相当于DCL的意思
        if (count.get() < capacity) {
            // 插入队列
            enqueue(node);
            // 得到当前数据量
            // 这里需要注意:getAndIncrement先返回数据,再加一
            c = count.getAndIncrement();
            // 如果我们发现当前数据量还小于总容量
            // 也就是我们可以继续放数据
            if (c + 1 < capacity)
                // 唤醒其他的生产者线程扔数据
                // 当然这里稍微多说一点,这里的唤醒指的是将生产者从Condition队列放到AQS队列中
                // 具体什么时候执行还需要看AQS的调度
                notFull.signal();
        }
    } finally {
        // 解锁
        putLock.unlock();
    }
    // 如果我们当前数据量为0,代表队列中原来无数据
    // 但上面现在扔进去了一个
    if (c == 0)
        // 需要唤醒所有的消费者消费数据
        signalNotEmpty();
    return c >= 0;
}

private void enqueue(Node<E> node) {
    // 将当面节点挂在last节点后
    // 将last节点指向当前节点
    last = last.next = node;
}


// 这里我们的Condition聊过
// 必须持有当前锁资源才可以使用Condition的方法
private void signalNotEmpty() {
    // 拿到读锁
    final ReentrantLock takeLock = this.takeLock;
    // 加锁
    takeLock.lock();
    try {
        // 唤醒消费者线程
        notEmpty.signal();
    } finally {
        // 解锁
        takeLock.unlock();
    }
}
复制代码

2.3 offer(time)源码实现

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
    // 如果是空值,直接抛出异常
    if (e == null) throw new NullPointerException();
    // 转成统一的单位
    long nanos = unit.toNanos(timeout);
    int c = -1;
    // 写锁
    final ReentrantLock putLock = this.putLock;
    // 当前容量
    final AtomicInteger count = this.count;
    // 加锁
    putLock.lockInterruptibly();
    try {
        // 如果当前数据量小于总容量
        // 这里我们上面也检查过,相当于DCL的意思
        while (count.get() == capacity) {
            // 如果我们剩余时间小于0,直接失败即可
            if (nanos <= 0)
                return false;
            // 反之生产者线程写入挂起nanos时间
            nanos = notFull.awaitNanos(nanos);
        }
        // 添加至队列
        enqueue(new Node<E>(e));
        // 得到当前数据量
        // 这里需要注意:getAndIncrement先返回数据,再加一
        c = count.getAndIncrement();
        // 如果我们发现当前数据量还小于总容量
        // 也就是我们可以继续放数据
        if (c + 1 < capacity)
            // 唤醒其他的生产者线程扔数据
            // 当然这里稍微多说一点,这里的唤醒指的是将生产者从Condition队列放到AQS队列中
            // 具体什么时候执行还需要看AQS的调度
            notFull.signal();
    } finally {
        // 解锁
        putLock.unlock();
    }
    // 如果我们当前数据量为0,代表队列中原来无数据
    // 但现在扔进去了一个,唤醒消费者线程
    if (c == 0)
        signalNotEmpty();
    return true;
}
复制代码

2.4 put()源码实现

  • 这里就不写了,其实和我们的 offer 一样,大家自己看看就好
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}
复制代码

3、消费者的源码

3.1 remove()源码实现

public E remove() {
    E x = poll();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}
复制代码

3.2 poll()源码实现

public E poll() {
    // 获取当前链表的数据量
    final AtomicInteger count = this.count;
    // 如果数据量为0,说明无数据
    // 消费者无法消费,直接返回null即可
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    // 拿到读锁
    final ReentrantLock takeLock = this.takeLock;
    // 加锁
    takeLock.lock();
    try {
        // 如果数据量大于0,说明有数据
        // 这里我们上面也检查过,相当于DCL的意思
        if (count.get() > 0) {
            // 取数
            x = dequeue();
            // 得到当前数据量
            // 这里需要注意:getAndIncrement先返回数据,再减一
            c = count.getAndDecrement();
            // 如果我们的数据量大于1,则唤醒消费者来消费
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        // 解锁
        takeLock.unlock();
    }
    // 如果数据量等于当前的总容量
    // 说明当前的链表已经有空余了,唤醒生产者生产
    if (c == capacity)
        signalNotFull();
    return x;
}

// 这个取数据和我们的AQS有点像
// 去除当前数据并且将当前节点作为头结点
private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

private void signalNotFull() {
    // 拿到写锁
    final ReentrantLock putLock = this.putLock;
    // 上锁
    putLock.lock();
    try {
        // 唤醒生产者
        notFull.signal();
    } finally {
        // 解锁
        putLock.unlock();
    }
}
复制代码

3.3 poll(time)源码实现

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    // 统一时间单位
    long nanos = unit.toNanos(timeout);
    // 拿到当前数据量 + 读锁
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 加可中断锁
    takeLock.lockInterruptibly();
    try {
        // 如果当前的数据量为0
        while (count.get() == 0) {
            // 如果时间没有剩余,直接返回null即可
            if (nanos <= 0)
                return null;
            // 让消费者线程等待nanos时间
            nanos = notEmpty.awaitNanos(nanos);
        }
        // 取数据
        x = dequeue();
        // 后面都是一样的
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
复制代码

3.4 take()源码实现

  • 这个大家可以自己看一下补充,也算一个小测试
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
复制代码

4、疑惑

看到这里,我想大家可能有和我一样的疑惑?

之前我们聊 ArrayBlockingQueue 的时候,他只用了一把锁(互斥锁),但 LinkedBlockingQueue 却使用了两把锁(读锁、写锁)

这时候你脑子会不会有一种疑问,我 ArrayBlockingQueue 能不能使用两把锁(读锁、写锁)来进行访问

如果你有这种想法,说明你确实思考了,哈哈哈

没错,博主我查阅了相关的资料,ArrayBlockingQueue 确实可以使用两把锁进行逻辑的更改

整体的逻辑基本上是仿造 LinkedBlockingQueue 的业务逻辑改造的,经测试这种性能要比原始的 ArrayBlockingQueue 要快 20%~30% 左右,感兴趣的也可以自己去测试一下。

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
 
public class ArrayBlockingQueueUsingTwoLockApproach {
    
     /** The queued items */
    final Object[] items;
 
    /** items index for next take, poll, peek or remove */
    int takeIndex;
 
    /** items index for next put, offer, or add */
    int putIndex;
 
    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();
 
    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();
 
    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();
 
    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();
 
    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
 
    public ArrayBlockingQueueUsingTwoLockApproach(int capacity) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
    }
    
    public void put(Object e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == items.length) {
                notFull.await();
            }
            enqueue(e);
            c = count.getAndIncrement();
            if (c + 1 < items.length)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
    
    public Object take() throws InterruptedException {
        Object x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == items.length)
            signalNotFull();
        return x;
    }
    
    
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }
 
    
    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(Object x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count.incrementAndGet();
    }
 
    
    /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private Object dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        Object x = (Object) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count.decrementAndGet();
        return x;
    }
    
    /**
     * Signals a waiting put. Called only from take/poll.
     */
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }
}
复制代码

四、流程图

其实,我们 LinkedBlockingQueue 整体的代码逻辑和 ArrayBlockingQueue 类似,只不过底层数据结构不同罢了

我们这里简单的画一下,有兴趣的同学也可以自己画吆

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/15342.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Python | 基于LendingClub数据的分类预测研究Part01——问题重述+特征选择+算法对比

欢迎交流学习~~ 专栏&#xff1a; 机器学习&深度学习 本文利用Python对数据集进行数据分析&#xff0c;并用多种机器学习算法进行分类预测。 具体文章和数据集可以见我所发布的资源&#xff1a;发布的资源 Python | 基于LendingClub数据的分类预测研究Part01——问题重述特…

在.NET Core中正确使用HttpClient的方式

HttpClient 是 .NET Framework、.NET Core 或 .NET 5以上版本中的一个类&#xff0c;用于向 Web API 发送 HTTP 请求并接收响应。它提供了一些简单易用的方法&#xff0c;如 GET、POST、PUT 和 DELETE&#xff0c;可以很容易地构造和发送 HTTP 请求&#xff0c;并处理响应数据。…

【Excel统计分析插件】上海道宁为您提供统计分析、数据可视化和建模软件——Analyse-it

Analyse-it是Microsoft Excel中的 统计分析插件 它为Microsoft Excel带来了 易于使用的统计软件 Analyse-it在软件中 引入了一些新的创新统计分析 Analyse-it与 许多Excel加载项开发人员不同 使用完善的软件开发和QA实践 包括单元/集成/系统测试 敏捷开发、代码审查 …

虹科案例|虹科Micronor光纤传感器,实现核磁共振新应用!

PART 1 背景介绍 光纤传感器已成为推动MRI最新功能套件升级和新MRI设备设计背后的关键技术。将患者的某些活动与MRI成像系统同步是越来越受重视的需求。磁场强度随着每一代的发展而增大&#xff0c;因此&#xff0c;组件的电磁透明度在每一代和新应用中变得更加重要。 光学传…

《Netty》从零开始学netty源码(四十六)之PooledByteBuf

PooledByteBuf Netty中一大块内存块PoolChunk默认大小为4MB&#xff0c;为了尽可能充分利用内存会将它切成很多块PooledByteBuf&#xff0c;PooledByteBuf的类关系图如下&#xff1a; PooledUnsafeDirectByteBuf与PooledUnsafeHeapByteBuf直接暴露对象的底层地址。 PooledByt…

【英语】100个句子记完7000个托福单词

其实主要的7000词其实是在主题归纳里面&#xff0c;不过过一遍100个句子也挺好的&#xff0c;反正也不多。 文章目录 Sentence 01Sentence 02Sentence 03Sentence 04Sentence 05Sentence 06Sentence 07Sentence 08Sentence 09Sentence 10Sentence 11Sentence 12Sentence 13Sent…

数据分析中常见标准的参考文献

做数据分析过程中&#xff0c;有些分析法方法的标准随便一搜就能找到&#xff0c;不管是口口相传还是默认&#xff0c;大家都按那样的标准做了。日常分析不细究出处还可以&#xff0c;但是正式的学术论文你需要为你写下的每一句话负责&#xff0c;每一个判断标准都应该有参考文…

Docker | 解决docker 容器中csv文件乱码的情况

问题描述&#xff1a;在Ubuntu docker容器中&#xff0c;打开.csv文件时显示乱码 问题如图 错误分析&#xff1a; 用locale查看所用容器支持的字符集 从输出可以看到&#xff0c;系统使用的是POSIX字符集&#xff0c;POSIX字符集是不支持中韩文的&#xff0c;而UTF-8是支持中…

刷题4.28

1、 开闭原则软件实体&#xff08;模块&#xff0c;类&#xff0c;方法等&#xff09;应该对扩展开放&#xff0c;对修改关闭&#xff0c;即在设计一个软件系统模块&#xff08;类&#xff0c;方法&#xff09;的时候&#xff0c;应该可以在不修改原有的模块&#xff08;修改关…

vue之--使用TypeScript

搭配 TypeScript 使用 Vue​ 像 TypeScript 这样的类型系统可以在编译时通过静态分析检测出很多常见错误。这减少了生产环境中的运行时错误&#xff0c;也让我们在重构大型项目的时候更有信心。通过 IDE 中基于类型的自动补全&#xff0c;TypeScript 还改善了开发体验和效率。…

【Android Framework (八) 】- Service

文章目录 知识回顾启动第一个流程initZygote的流程system_serverServiceManagerBinderLauncher的启动AMS 前言源码分析1.startService2.bindService 拓展知识1:Service的两种启动方式对Service生命周期有什么影响&#xff1f;2:Service的启动流程3:Service的onStartCommand返回…

nginx(七十二)nginx中与cookie相关的细节探讨

背景知识铺垫 一 nginx中与cookie相关 ① Cookie请求头内容回顾 cookie的形式和属性 ② nginx获取cookie值的两种方法 1) $http_cookie -->获取Cookie请求头"所有值"2) $COOKIE_flag -->获取Cookie请求头的"某个key"[1]、脱敏场景在日志中只…

榜上有名 | 创宇盾荣登“2023 IT市场权威榜单”!

4月20日&#xff0c;已连续成功举办23届的IT市场年会在北京举行&#xff0c;作为权威咨询机构赛迪主办&#xff0c;中国IT业界延续时间最长的年度盛会之一&#xff0c;“2023 IT市场年会”隆重发布重磅权威榜单。 创宇盾作为云防护领域专业防护产品&#xff0c;在国家经济产业…

零成本教你部署一个ChatGPT网站

&#x1f4cb; 个人简介 &#x1f496; 作者简介&#xff1a;大家好&#xff0c;我是阿牛&#xff0c;全栈领域优质创作者。&#x1f61c;&#x1f4dd; 个人主页&#xff1a;馆主阿牛&#x1f525;&#x1f389; 支持我&#xff1a;点赞&#x1f44d;收藏⭐️留言&#x1f4d…

为何电商这么难做…...你是否忽略了这个问题?

物流时效是影响买家体验的重要环节&#xff0c;物流服务优劣也是买家网上购物时的重要参考依据。但电商企业对于快递公司的时效承诺、服务质量基本处于被动接受的状况&#xff0c;直到买家投诉才知道快递公司服务缺失&#xff0c;若买家不投诉也没法主动知道大量的订单是否按约…

Excel技能之实用技巧,高手私藏

今天来讲一下Excel技巧&#xff0c;工作常用&#xff0c;高手私藏。能帮到你是我最大的荣幸。 与其加班熬夜赶进度&#xff0c;不如下班学习提效率。能力有成长&#xff0c;效率提上去&#xff0c;自然不用加班。 消化吸收&#xff0c;工作中立马使用&#xff0c;感觉真不错。…

随手记录:Livox 时间戳修改为ROS时间戳

参考与前言 传感器类型&#xff1a;Livox-Mid70 参考链接&#xff1a;Ubuntu20.04系统安装Livox ROS Driver 官方驱动&#xff1a;https://github.com/Livox-SDK/livox_ros_driver 碎碎念&#xff1a;之所以要改成rostime主要是 提取pcd的时候发现这个timestamp 300.xxx 打…

史上最全Maven教程(三)

文章目录 &#x1f525;Maven工程测试_Junit使用步骤&#x1f525;Maven工程测试_Junit结果判定&#x1f525;Maven工程测试_Before、After&#x1f525;依赖冲突调解_最短路径优先原则&#x1f525;依赖冲突调解_最先声明原则&#x1f525;依赖冲突调解_排除依赖、锁定版本 &a…

ByteHouse云数仓版查询性能优化和MySQL生态完善

ByteHouse云数仓版是字节跳动数据平台团队在复用开源 ClickHouse runtime 的基础上&#xff0c;基于云原生架构重构设计&#xff0c;并新增和优化了大量功能。在字节内部&#xff0c;ByteHouse被广泛用于各类实时分析领域&#xff0c;最大的一个集群规模大于2400节点&#xff0…

2023年6月CDGP数据治理专家认证报名及费用

目前6月DAMA-CDGP数据治理认证考试开放报名地区有&#xff1a;北京、上海、广州、深圳、长沙、呼和浩特。 目前南京、济南、西安、杭州等地区还在接近开考人数中&#xff0c;打算参加6月考试的朋友们可以抓紧时间报名啦&#xff01;&#xff01;&#xff01; DAMA认证为数据管…