数据结构与算法-10_阻塞队列

文章目录

  • 1.单锁实现
  • 2.双锁实现

1.单锁实现

Java 中防止代码段交错执行,有两种锁选择

  • synchronized 代码块,属于关键字级别提供锁保护,功能少
  • ReentrantLock 类,功能丰富

以 ReentrantLock 为例

ReentrantLock lock = new ReentrantLock();

public void offer(String e) {
    lock.lockInterruptibly();
    try {
        array[tail] = e;
        tail++;
    } finally {
        lock.unlock();
    }
}

只要两个线程执行上段代码时,锁对象是同一个,就能保证 try 块内的代码的执行不会出现指令交错现象,即执行顺序只可能是下面两种情况之一

线程1线程2说明
lock.lockInterruptibly()t1对锁对象上锁
array[tail]=e1
lock.lockInterruptibly()即使 CPU 切换到线程2,但由于t1已经对该对象上锁,因此线程2卡在这儿进不去
tail++切换回线程1 执行后续代码
lock.unlock()线程1 解锁
array[tail]=e2线程2 此时才能获得锁,执行它的代码
tail++
  • 另一种情况是线程2 先获得锁,线程1 被挡在外面
  • 保护的是 tail 位置读写的安全

锁同时保护了 tail 和 size 的读写安全

ReentrantLock lock = new ReentrantLock();
int size = 0;

public void offer(String e) {
    lock.lockInterruptibly();
    try {
        if(isFull()) {
            // 满了怎么办?
        }
        array[tail] = e;
        tail++;
        
        size++;
    } finally {
        lock.unlock();
    }
}

private boolean isFull() {
    return size == array.length;
}

之前是返回 false 表示添加失败,前面分析过想达到这么一种效果:

  • 在队列满时,不是立刻返回,而是当前线程进入等待
  • 什么时候队列不满了,再唤醒这个等待的线程,从上次的代码处继续向下运行

ReentrantLock 可以配合条件变量来实现

ReentrantLock lock = new ReentrantLock();
Condition tailWaits = lock.newCondition(); // 条件变量
int size = 0;

public void offer(String e) {
    lock.lockInterruptibly();
    try {
        while (isFull()) {
            tailWaits.await();	// 当队列满时, 当前线程进入 tailWaits 等待
        }
        array[tail] = e;
        tail++;
        
        size++;
    } finally {
        lock.unlock();
    }
}

private boolean isFull() {
    return size == array.length;
}
  • 条件变量底层也是个队列,用来存储这些需要等待的线程,当队列满了,就会将 offer 线程加入条件队列,并暂时释放锁
  • 将来队列如果不满了(由 poll 线程那边得知)可以调用 tailWaits.signal() 来唤醒 tailWaits 中首个等待的线程,被唤醒的线程会再次抢到锁,从上次 await 处继续向下运行
操作前offer(4)offer(5)poll()操作后
[1 2 3]队列满,进入tailWaits 等待[1 2 3]
[1 2 3]取走 1,队列不满,唤醒线程[2 3]
[2 3]抢先获得锁,发现不满,放入 5[2 3 5]
[2 3 5]从上次等待处直接向下执行[2 3 5 ?]

关键点:为何要用 while 而不是 if

  • 从 tailWaits 中唤醒的线程,会与新来的 offer 的线程争抢锁,谁能抢到是不一定的,如果后者先抢到,就会导致条件又发生变化
  • 这种情况称之为虚假唤醒,唤醒后应该重新检查条件,是不是需要重新进入等待

代码

/**
 * 单锁实现
 * @param <E> 元素类型
 */
public class BlockingQueue1<E> implements BlockingQueue<E> {
    private final E[] array;
    private int head = 0;
    private int tail = 0;
    private int size = 0; // 元素个数

    @SuppressWarnings("all")
    public BlockingQueue1(int capacity) {
        array = (E[]) new Object[capacity];
    }

    ReentrantLock lock = new ReentrantLock();
    Condition tailWaits = lock.newCondition();
    Condition headWaits = lock.newCondition();

    @Override
    public void offer(E e) throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (isFull()) {
                tailWaits.await();
            }
            array[tail] = e;
            if (++tail == array.length) {
                tail = 0;
            }
            size++;
            headWaits.signal();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void offer(E e, long timeout) throws InterruptedException {
        lock.lockInterruptibly();
        try {
            long t = TimeUnit.MILLISECONDS.toNanos(timeout);
            while (isFull()) {
                if (t <= 0) {
                    return;
                }
                t = tailWaits.awaitNanos(t);
            }
            array[tail] = e;
            if (++tail == array.length) {
                tail = 0;
            }
            size++;
            headWaits.signal();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public E poll() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (isEmpty()) {
                headWaits.await();
            }
            E e = array[head];
            array[head] = null; // help GC
            if (++head == array.length) {
                head = 0;
            }
            size--;
            tailWaits.signal();
            return e;
        } finally {
            lock.unlock();
        }
    }

    private boolean isEmpty() {
        return size == 0;
    }

    private boolean isFull() {
        return size == array.length;
    }
}

注意

  • JDK 中 BlockingQueue 接口的方法命名
    • 方法 offer(E e) 是非阻塞的实现,阻塞实现方法为 put(E e)
    • 方法 poll() 是非阻塞的实现,阻塞实现方法为 take()

2.双锁实现

单锁的缺点在于:

  • 生产和消费几乎是不冲突的,唯一冲突的是生产者和消费者它们有可能同时修改 size
  • 冲突的主要是生产者之间:多个 offer 线程修改 tail
  • 冲突的还有消费者之间:多个 poll 线程修改 head

进一步提高性能,可以用两把锁

  • 一把锁保护 tail
  • 另一把锁保护 head
ReentrantLock headLock = new ReentrantLock();  // 保护 head 的锁
Condition headWaits = headLock.newCondition(); // 队列空时,需要等待的线程集合

ReentrantLock tailLock = new ReentrantLock();  // 保护 tail 的锁
Condition tailWaits = tailLock.newCondition(); // 队列满时,需要等待的线程集合

offer 方法
缺点是 size 并不受 tailLock 保护,tailLock 与 headLock 是两把不同的锁,并不能实现互斥的效果

@Override
public void offer(E e) throws InterruptedException {
    tailLock.lockInterruptibly();
    try {
        // 队列满等待
        while (isFull()) {
            tailWaits.await();
        }
        
        // 不满则入队
        array[tail] = e;
        if (++tail == array.length) {
            tail = 0;
        }
        
        // 修改 size (有问题)
        size++;
        
    } finally {
        tailLock.unlock();
    }
}

size 需要用下面的代码保证原子性

AtomicInteger size = new AtomicInteger(0);	   // 保护 size 的原子变量

size.getAndIncrement(); // 自增
size.getAndDecrement(); // 自减

代码修改为

@Override
public void offer(E e) throws InterruptedException {
    tailLock.lockInterruptibly();
    try {
        // 队列满等待
        while (isFull()) {
            tailWaits.await();
        }
        
        // 不满则入队
        array[tail] = e;
        if (++tail == array.length) {
            tail = 0;
        }
        
        // 修改 size
        size.getAndIncrement();
        
    } finally {
        tailLock.unlock();
    }
}

poll 方法

@Override
public E poll() throws InterruptedException {
    E e;
    headLock.lockInterruptibly();
    try {
        // 队列空等待
        while (isEmpty()) {
            headWaits.await();
        }
        
        // 不空则出队
        e = array[head];
        if (++head == array.length) {
            head = 0;
        }
        
        // 修改 size
        size.getAndDecrement();
        
    } finally {
        headLock.unlock();
    }
    return e;
}

如何通知 headWaits 和 tailWaits 中等待的线程,比如 poll 方法拿走一个元素,通知 tailWaits。

代码

@Override
public E poll() throws InterruptedException {
    E e;
    headLock.lockInterruptibly();
    try {
        // 队列空等待
        while (isEmpty()) {
            headWaits.await();
        }
        
        // 不空则出队
        e = array[head];
        if (++head == array.length) {
            head = 0;
        }
        
        // 修改 size
        size.getAndDecrement();
        
        // 通知 tailWaits 不满(有问题)
        tailWaits.signal();
        
    } finally {
        headLock.unlock();
    }
    return e;
}

要使用这些条件变量的 await(), signal() 等方法需要先获得与之关联的锁,上面代码运行出现错误

java.lang.IllegalMonitorStateException

解决:避免嵌套,两段加锁的代码平级
在这里插入图片描述

性能提升

  1. 代码调整后 offer 并没有同时获取 tailLock 和 headLock 两把锁,因此两次加锁之间会有空隙,这个空隙内可能有其它的 offer 线程添加了更多的元素,那么这些线程都要执行 signal(),通知 poll 线程队列非空吗?

    • 每次调用 signal() 都需要这些 offer 线程先获得 headLock 锁,成本较高,要想法减少 offer 线程获得 headLock 锁的次数
    • 可以加一个条件:当 offer 增加前队列为空,即从 0 变化到不空,才由此 offer 线程来通知 headWaits,其它情况不归它管
  2. 队列从 0 变化到不空,会唤醒一个等待的 poll 线程,这个线程被唤醒后,肯定能拿到 headLock 锁,因此它具备了唤醒 headWaits 上其它 poll 线程的先决条件。如果检查出此时有其它 offer 线程新增了元素(不空,但不是从0变化而来),那么不妨由此 poll 线程来唤醒其它 poll 线程,称之为级联通知(cascading notifies)

  3. 在 poll 时队列从满变化到不满,才由此 poll 线程来唤醒一个等待的 offer 线程,目的也是为了减少 poll 线程对 tailLock 上锁次数,剩下等待的 offer 线程由这个 offer 线程间接唤醒

代码

public class BlockingQueue2<E> implements BlockingQueue<E> {

    private final E[] array;
    private int head = 0;
    private int tail = 0;
    private final AtomicInteger size = new AtomicInteger(0);
    ReentrantLock headLock = new ReentrantLock();
    Condition headWaits = headLock.newCondition();
    ReentrantLock tailLock = new ReentrantLock();
    Condition tailWaits = tailLock.newCondition();

    public BlockingQueue2(int capacity) {
        this.array = (E[]) new Object[capacity];
    }

    @Override
    public void offer(E e) throws InterruptedException {
        int c;
        tailLock.lockInterruptibly();
        try {
            while (isFull()) {
                tailWaits.await();
            }
            array[tail] = e;
            if (++tail == array.length) {
                tail = 0;
            }            
            c = size.getAndIncrement();
            // a. 队列不满, 但不是从满->不满, 由此offer线程唤醒其它offer线程
            if (c + 1 < array.length) {
                tailWaits.signal();
            }
        } finally {
            tailLock.unlock();
        }
        // b. 从0->不空, 由此offer线程唤醒等待的poll线程
        if (c == 0) {
            headLock.lock();
            try {
                headWaits.signal();
            } finally {
                headLock.unlock();
            }
        }
    }

    @Override
    public E poll() throws InterruptedException {
        E e;
        int c;
        headLock.lockInterruptibly();
        try {
            while (isEmpty()) {
                headWaits.await(); 
            }
            e = array[head]; 
            if (++head == array.length) {
                head = 0;
            }
            c = size.getAndDecrement();
            // b. 队列不空, 但不是从0变化到不空,由此poll线程通知其它poll线程
            if (c > 1) {
                headWaits.signal();
            }
        } finally {
            headLock.unlock();
        }
        // a. 从满->不满, 由此poll线程唤醒等待的offer线程
        if (c == array.length) {
            tailLock.lock();
            try {
                tailWaits.signal();
            } finally {
                tailLock.unlock();
            }
        }
        return e;
    }

    private boolean isEmpty() {
        return size.get() == 0;
    }

    private boolean isFull() {
        return size.get() == array.length;
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/678024.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

tomcat-memcached会话共享配置

目录 1、安装memcache服务 2、把依赖的jar包移至tomcat/lib目录下 3、配置tomcat/conf/context.xml 4、重启tomcat服务 1、安装memcache服务 具体安装步骤此处不详细说明&#xff0c;自行根据实际情况安装即可 2、把依赖的jar包移至tomcat/lib目录下 3、配置tomcat/conf/c…

自定义类型:联合体和枚举

1. 联合体类型的声明 2. 联合体的特点 3. 联合体大小的计算 4. 枚举类型的声明 5. 枚举类型的优点 6. 枚举类型的使用 欢迎关注 熬夜学编程 创作不易&#xff0c;请多多支持 感谢大家的阅读、点赞、收藏和关注 如有问题&#xff0c;欢迎指正 1. 联合体 1.1 联合体类型的声…

java自学阶段一:基础知识学习

《项目案例—黑马tlias智能学习辅助系统》 目录&#xff1a; 异常 一&#xff1a;学习目标&#xff1a; 异常&#xff1a;能够看懂异常信息&#xff0c;了解异常体系结构和分类&#xff0c;掌握异常的两种处理方式&#xff0c;自定义异常。 二、异常&#xff1a; 1.异常的概…

yolo-v8window环境运行

源码https://github.com/ultralytics/ultralytics 1.用pycharm打开YOLOv8文件夹&#xff0c;下载依赖项&#xff0c;依赖项已经以作者的名字来封装好&#xff0c;所以直接在终端输入&#xff1a;pip install ultralytics&#xff0c;安装好之后会默认安装的cpu版本的torch&am…

WannaMine4.0病毒应急处置

一、前言 某日&#xff0c;通过流量监测设备和EDR发现挖矿请求告警&#xff0c;并存在长期445端口扫描。 二、病毒排查 上机排查&#xff0c;发现该服务器存在WannaMine4.0病毒&#xff0c;通过网上文章了解&#xff0c;如果请求挖矿域名遭安全设备拦截&#xff0c;会导致挖矿…

AI大模型页面

自己做的AI&#xff0c;模仿GPT。 访问地址&#xff1a;欢迎 请大家给点意见&#xff0c;需要追加哪些功能。

《企业应用架构模式》学习指南

导读&#xff1a;企业应用包括哪些&#xff1f;它们又分别有哪些架构模式&#xff1f; 世界著名软件开发大师Martin Fowler给你答案 01什么是企业应用 我的职业生涯专注于企业应用&#xff0c;因此&#xff0c;这里所谈及的模式也都是关于企业应用的。&#xff08;企业应用还有…

做视频号小店什么类目最容易爆单?其实,弄懂这三点就会选品了

大家好&#xff0c;我是电商花花。 我们做视频号小店做什么类目最容易爆单&#xff1f; 其实任何类目都有属于自己的受众人群和客户&#xff0c;都非常容易爆单&#xff0c;我们想要爆单&#xff0c;就要选对类目&#xff0c;选对产品。 视频号上所有的类目基本上可以分为标…

塑料焊接机熔深对激光焊接质量有什么影响

塑料焊接机的熔深对焊接质量具有直接且显著的影响。以下是熔深对焊接质量影响的详细解释&#xff1a; 1. 焊接强度&#xff1a;熔深直接决定了焊缝的截面积&#xff0c;从而影响焊接接头的强度。较深的熔深意味着焊缝的截面积更大&#xff0c;可以提供更强的结合力&#xff0c;…

Apache DolphinScheduler 社区5月月报更新!

各位热爱 DolphinScheduler 的小伙伴们&#xff0c;社区5月份月报更新啦&#xff01;这里将记录 DolphinScheduler 社区每月的重要更新&#xff0c;欢迎关注&#xff0c;期待下个月你也登上Merge Star月度榜单哦~ 月度Merge Star 感谢以下小伙伴5月份为 Apache DolphinSchedu…

SpringBoot发送Gmail邮件

1. 登录Gmail Gmail网址 点击右上角“小齿轮”&#xff0c;然后点击"查看所有设置" 点击“转发和 POP/IMAP”&#xff0c;按图中设置&#xff0c;然后点击保存&#xff1a; 2. 启用两步验证(https://myaccount.google.com/security) 登录上述网址&#xff0c;找…

【MyBatis-plus】saveBatch 性能调优和【MyBatis】的数据批量入库

总结最优的两种方法&#xff1a; 方法1&#xff1a; 使用了【MyBatis-plus】saveBatch 但是数据入库效率依旧很慢&#xff0c;那可能是是因为JDBC没有配置&#xff0c;saveBatch 批量写入并没有生效哦&#xff01;&#xff01;&#xff01; 详细配置如下&#xff1a;批量数据入…

用 HTML+CSS 实现全屏爱心滴落的动画效果,中间可显示名字

需求 在页面上显示一行白色文字,同时有爱心滴落的动画效果。 效果 HTML 和 CSS 代码 <!DOCTYPE html> <html lang="en"> <head

Kerberoasting攻击

一. Kerberoasting攻击原理 1. 原理 Kerberoasting 是域渗透中经常使用的一项技术&#xff0c;是Tim Medin 在 DerbyCon 2014 上发布的一种域口令攻击方法&#xff0c;Tim Medin 同时发布了配套的攻击工具 kerberoast。此后&#xff0c;不少研究人员对 Kerberoasting 进行了改…

揭秘Facebook:数字时代的社交奥秘

前言 在当今的数字时代&#xff0c;社交网络已经深刻改变了人们的沟通方式、信息获取方式和社交方式。其中&#xff0c;Facebook作为全球最大的社交网络平台之一&#xff0c;扮演了至关重要的角色。从一个大学生项目发展成覆盖全球的社交巨头&#xff0c;Facebook不仅见证了互…

大尺寸图像分类检测分割统一模型:Resource Efficient Perception for Vision Systems

论文题目&#xff1a;Resource Efficient Perception for Vision Systems 论文链接&#xff1a;http://arxiv.org/abs/2405.07166 代码链接&#xff1a;https://github.com/Visual-Conception-Group/Localized-Perception-Constrained-Vision-Systems 作者设计了一个统一的模…

2024年想转行WebGIS前端开发还有就业前景吗?

当然有。 无论是测绘外业、数据处理&#xff0c;还是城乡规划、遥感等等专业&#xff0c;只要你的行业就业水平一直停留在“工资低、工作条件差、对身体消耗大、没发展”的现状&#xff0c;我都劝你果断转GIS开发。 在新中地学习的学生无非就是因为上述原因&#xff0c;选择放…

Springboot 在线学习交流平台-计算机毕业设计源码46186

摘 要 随着科学技术的飞速发展&#xff0c;社会的方方面面、各行各业都在努力与现代的先进技术接轨&#xff0c;通过科技手段来提高自身的优势&#xff0c;在线学习交流平台当然也不能排除在外。在线学习交流平台是以实际运用为开发背景&#xff0c;运用软件工程原理和开发方法…

乡村振兴的乡村环境综合整治:加强农村环境综合整治,改善农村人居环境,打造干净整洁的美丽乡村

目录 一、引言 二、农村环境问题的现状与挑战 &#xff08;一&#xff09;农村环境问题的现状 &#xff08;二&#xff09;农村环境问题的挑战 三、加强农村环境综合整治的必要性 &#xff08;一&#xff09;提升农民生活质量 &#xff08;二&#xff09;促进农村经济发…

AI工具:如何通过智能助手简化工作流程?

工欲善其事&#xff0c;必先利其器。 随着AI技术与各个行业或细分场景的深度融合&#xff0c;日常工作可使用的AI工具呈现出井喷式发展的趋势&#xff0c;AI工具的类别也从最初的AI文本生成、AI绘画工具&#xff0c;逐渐扩展到AI思维导图工具、AI流程图工具、AI生成PPT工具、AI…