【阻塞队列】阻塞队列的模拟实现及在生产者和消费者模型上的应用

文章目录

  • 📄前言
  • 一. 阻塞队列初了解
    • 🍆1. 什么是阻塞队列?
    • 🍅2. 为什么使用阻塞队列?
    • 🥦3. Java标准库中阻塞队列的实现
  • 二. 阻塞队列的模拟实现
    • 🍚1. 实现普通队列
    • 🍥2. 实现队列的阻塞功能
    • 🧊3. 解除阻塞状态
  • 三. 使用模拟的阻塞队列验证生产者和消费者模型

📄前言

本文是对阻塞队列的应用场景的介绍,对阻塞队列的作用以及具体实现的讨论。


一. 阻塞队列初了解

🍆1. 什么是阻塞队列?

阻塞队列是一种带有阻塞功能的“先进先出”线性表。即在一个带有最大容量的队列中,在某时刻队列容量已满时继续入队 或 队列为空时继续出队,就会进入阻塞等待状态,直到队列变为 未满或非空 便解除阻塞状态,继续入队或出队。

🍅2. 为什么使用阻塞队列?

若存在以下简易的分布式系统:
在这里插入图片描述
上述分布式系统虽然能完成客户端与服务器端的交互需求,但可能存在以下问题:

  1. 在正常情况下,用户可以通过客户端想服务器发起请求并获取相应的服务,但假如在某刻服务器A突然出现了故障,与服务器A直接通信的服务器B也可能因此出现故障,导致整个服务瘫痪。
  2. 若未来想增加 更多的服务器 来处理服务器A发起的请求,则需求对 服务器A 的接口 进行一定的改动,付出一定的时间和人力成本。
  3. 当某个时刻,很多的客户端同时向 服务器A 发起请求,作为与用户直接交互的服务器,服务器A具备承载这些并发量的能力,但服务器集群中负责其他功能的服务器接收请求的承载能力可能较弱,此时可能造成其他服务器的崩溃。

造成上述现象的原因可以归结为以下两点:

  1. 模块间的耦合性较高(例如问题1和2)
  2. 承载能力较弱的模块不具备抗冲击能力。(例如问题3)

上述的解决方法是在服务器之间加入一个阻塞队列,利用生产者和消费者模型解决以上问题。
什么是生产者消费者模型呢?(如下图)
在这里插入图片描述

当服务器A接收来自客户端的请求时,不把请求直接发给服务器B,而是将请求数据加入到队列中,服务器B通过队列接收请求并把请求除了的结果返回给A。


当上述分布式系统引入阻塞队列后工作模式如下图所示:
在这里插入图片描述

引入阻塞队列的好处:

  1. 解耦合。当服务器A或服务器B出现问题时,就不会对其他服务器造成直接的影响;当需要添加新的服务器来处理这些请求时,新的服务器也同样只需从队列中取数据,无需对原有服务器的接口(代码)进行任何的改动。
  2. 削峰填谷”。当服务器A 瞬间接收客户端发来的大量请求时,由于服务器B处理请求的速度较慢,剩余的请求会在阻塞队列里面堆积,虽然客户端获取服务的时间相对增加了,但一定程度上缓解了其他承受并发量能力较弱的服务器的压力。

🥦3. Java标准库中阻塞队列的实现

在这里插入图片描述

BlockingQueue的主要方法:
在这里插入图片描述
方法演示如下:(使用普通入队方法入队4次,再使用带有阻塞的出队方法出队4次)

public static void main(String[] args) throws InterruptedException {

    BlockingQueue<Integer> q = new ArrayBlockingQueue<>(3);
    System.out.println("数据 5 入队状态: " + q.offer(5));
    System.out.println("数据 6 入队状态: " + q.offer(6));
    System.out.println("数据 7 入队状态: " + q.offer(7));
    System.out.println("数据 8 入队状态: " + q.offer(8));
    System.out.print("队列中的数据: ");
    System.out.println(q);

    
    System.out.println("数据出队: ");
    for (int i = 0; i < 4; i++) {
        System.out.print(q.take() + " ");
    }
    System.out.println("程序结束 !");
}

在这里插入图片描述

可以发现,当调用 take()方法取出队列元素时,因为队列最终为空,程序进入了阻塞状态,没有打印“程序结束”。


二. 阻塞队列的模拟实现

🍚1. 实现普通队列

阻塞队列的关键方法是两个带有阻塞功能的 put() 和 take()方法,而这两个方法是在原有出入队方法上使用 Object类 带有wait()方法 和 notify() 方法让线程进入等待状态 或 唤醒线程。
因此,我们可以先把基础的队列进行实现,随后在原有基础上进行修改。队列可以使用数组(环形队列)或链表两种方式实现,这里我采用数组的方式实现队列。(由于队列的实现方法较为常见,这里直接给出实现代码)

class MyBlockingQueue<E> {
    private Object[] elem;
    private int defaultCapacity = 11;	// 阻塞队列默认容量
    private int front;	// 记录队头元素位置
    private int rear;	// 记录队尾元素位置
    private int size;   // 用于记录当前队列元素的实际个数

    public MyBlockingQueue(){
        this.elem = new Object[defaultCapacity + 1];
    }
    public MyBlockingQueue(int capacity) {
        defaultCapacity = capacity;
        this.elem = new Object[defaultCapacity + 1];
    }

    public boolean offer(E val) {
        // 判断队列是否已满
        if (size == defaultCapacity) {
            return false;
        }
        
        elem[rear] = val;
        size++;
        // 如果 rear自增 到达数组末尾,使 rear 重新到数组的头部
        rear = (rear + 1) % (defaultCapacity + 1);
        return true;
    }
    
    public E poll() {
        // 判断队列是否为空
        if (front == rear) {
            return null;
        }
        
        Object ret = elem[front];
        size--;
        // 如果 front 自增 到达数组末尾,使 front 重新到数组的头部
        front = (front + 1) % (defaultCapacity + 1);
        return (E)ret;
    }
}

🍥2. 实现队列的阻塞功能

当阻塞队列容量已满时,调用 put() 方法会进入阻塞状态,因此在原先 offer()方法判断的基础上,我们需要使用 wait()方法 让线程进入阻塞等待状态,考虑到可能有多个线程同时调用 put()方法,可能会引起线程安全问题,因此我们应在 if()判断条件和整个修改操作上 加锁(或者直接在方法上加锁)。(代码如下)

public void put (E value) throws InterruptedException {
    // 判断队列是否已满
    synchronized (this) {
        if (size == defaultCapacity) {
        // 队列进入阻塞状态, 直到有元素出队时 解除阻塞
            this.wait();
        }
        
        queue[rear] = value;
        size++;
        // 如果 rear自增 到达数组末尾,使 rear 重新到数组的头部
        rear = (rear + 1) % (defaultCapacity + 1);
    }
}

当队列为空时,调用 take() 方法会使线程进入阻塞状态,同理若判空条件成立,我们需要调用 wait() 方法使线程进入阻塞,为防止多个线程在队列即将为空时同时调用 take() 方法引发线程安全问题,我们需要在 if()判断语句 和 整个修改操作 进行加锁操作(或者直接在方法上加锁)。(代码如下)

public E take() throws InterruptedException {
    // 判断队列是否为空
    synchronized (this) {
        if (rear == front) {
        // 队列进入阻塞状态,直到有新的元素入队时 解除阻塞
            this.wait();
        }
        
        Object ret = queue[front];
        // 如果 front 自增 到达数组末尾,使 front 重新到数组的头部
        front = (front + 1) % (defaultCapacity + 1);
        size--;
        return (E)ret;
    }
}

🧊3. 解除阻塞状态

什么情况下队列会接触阻塞状态呢?

  1. 当队满时,某个线程从阻塞队列取出一个元素,即执行完出队操作后,需要使用 notify()方法 唤醒因执行 put()方法而阻塞的线程。
  2. 当队空时,某个线程向队列新增一个元素,即执行完入队操作后,需要使用 notify()方法唤醒因执行 take()方法而阻塞的线程。

对 put()方法和take()方法 修改后代码如下:

public void put (E value) throws InterruptedException {
    // 判断队列是否已满
    synchronized (this) {
        if (size == defaultCapacity) {
        // 队列进入阻塞状态, 直到有元素出队时 解除阻塞
            this.wait();
        }
        
        queue[rear] = value;
        size++;
        // 如果 rear自增 到达数组末尾,使 rear 重新到数组的头部
        rear = (rear + 1) % (defaultCapacity + 1);
        // 此处的 notify 用来唤醒 队列为空时的 wait
        this.notify();
    }
}

public E take() throws InterruptedException {
    // 判断队列是否为空
    synchronized (this) {
        if (rear == front) {
        // 队列进入阻塞状态,直到有新的元素入队时 解除阻塞
            this.wait();
        }
        
        Object ret = queue[front];
        // 如果 front 自增 到达数组末尾,使 front 重新到数组的头部
        front = (front + 1) % (defaultCapacity + 1);
        size--;
        // 此处的 notify 用来唤醒 队列为满时的 wait
        this.notify();
        return (E)ret;
    }
}

三. 使用模拟的阻塞队列验证生产者和消费者模型

为了方便看到效果,我们假设阻塞队列的容量为2,并将生产与消费的数据进行打印。
当生产者与消费者处理数据的频率一样,且生产速率为 次/1s、消费速率为 次/1s 时,程序的生产与消费数据应轮流打印:(模拟代码和程序运行结果如下)

public static void main(String[] args) {
    MyBlockingQueue<Integer> myBlockingQueue = new MyBlockingQueue<>(2);
    // 生产者
    Thread producer = new Thread(() -> {
        for (int i = 0; i < 5; i++) {
            try {
                myBlockingQueue.put(i);
                System.out.println("生产了: " + i);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    });
	
	// 消费者
    Thread consumer = new Thread(() -> {
        for (int i = 0; i < 5; i++) {
            try {
                int ret = myBlockingQueue.take();
                System.out.println("消费了: " + ret);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    });
    
    producer.start();
    consumer.start();
}

在这里插入图片描述

当生产速率 > 消费速率,且生产速率为 次/1s、消费速率为 次/2s 时:可以预估到,当经过5s后程序会因队满进入阻塞状态,且后续每消费一次伴随着一次生产,为方便观察阻塞情况,我们可以在方法实现的地方加上阻塞日志的提示(模拟代码和程序运行结果如下)

public static void main(String[] args) {
    MyBlockingQueue<Integer> myBlockingQueue = new MyBlockingQueue<>(2);
    Thread producer = new Thread(() -> {
        for (int i = 0; i < 10; i++) {
            try {
                myBlockingQueue.put(i);
                System.out.println("生产了: " + i);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    });

    Thread consumer = new Thread(() -> {
        for (int i = 0; i < 10; i++) {
            try {
                int ret = myBlockingQueue.take();
                System.out.println("消费了: " + ret);
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    });

    producer.start();
    consumer.start();
}

在这里插入图片描述

当生产速率 < 消费速率,且生产速率为 次/2s、消费速率为 次/1s 时:可以预估到,当经过2s后程序会因队满进入阻塞状态,且后续每生产一次伴随着一次消费,为方便观察阻塞情况,我们可以在方法实现的地方加上阻塞日志的提示(模拟代码和程序运行结果如下)

public static void main(String[] args) {
    MyBlockingQueue<Integer> myBlockingQueue = new MyBlockingQueue<>(2);
    Thread producer = new Thread(() -> {
        for (int i = 0; i < 10; i++) {
            try {
                myBlockingQueue.put(i);
                System.out.println("生产了: " + i);
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    });

    Thread consumer = new Thread(() -> {
        for (int i = 0; i < 10; i++) {
            try {
                int ret = myBlockingQueue.take();
                System.out.println("消费了: " + ret);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    });

    producer.start();
    consumer.start();
}

在这里插入图片描述


以上就是本篇文章的全部内容了,如果这篇文章对你有些许帮助,你的点赞、收藏和评论就是对我最大的支持。
另外,文章可能存在许多不足之处,也希望你可以给我一点小小的建议,我会努力检查并改进。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/352932.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

美赛注意事项

2024年1月27日 &#xff1a; 赖维杰 同学分享 1、最后的展现必须要漂亮&#xff08;绘图、呈现&#xff09; 李维情 西北建模王 论文位&#xff08;核心&#xff09;必须清楚建模位、编程位知道做了些什么 常见模型&#xff1a; 1、看真题&#xff0c;读往年论文&#xff0c;选…

计算机找不到ucrtbased.dll无法运行程序,分享5种有效的解决方法

当计算机系统在运行过程中无法找到ucrtbased.dll这个特定的动态链接库文件时&#xff0c;可能会引发一系列的问题和故障现象。ucrtbased.dll是Windows操作系统中一个至关重要的组件&#xff0c;它包含了C运行时库的核心函数&#xff0c;对于许多应用程序特别是基于Microsoft Vi…

vue中的computed

目录 一&#xff1a;介绍 二&#xff1a;例子演示 一&#xff1a;介绍 在 Vue.js 中&#xff0c;computed 属性是一种特殊类型的属性&#xff0c;它允许你声明依赖于其他数据属性的值。computed 属性的值是通过一个函数计算得出的&#xff0c;这个函数可以在其依赖的数据发生…

【misc | CTF】攻防世界 适合作为桌面

天命&#xff1a;这题还挺繁琐的&#xff0c;知识点还不少 目录 步骤1&#xff1a;图片隐写 步骤2&#xff1a;Winhex查看ascii码 步骤1&#xff1a;图片隐写 拿到这张图片&#xff0c;不可能扔进ps会有多图层&#xff0c;普通图片也就一个图层而已 但居然可以有隐写图片这…

I/O多路复用

简介&#xff1a; I/O 多路复用(I/O 多路转接)使得程序能同时监听多个文件描述符&#xff0c;能够提高程序的性能&#xff0c;Linux 下实现 I/O 多路复用的系统调用主要有 select 、 poll 和 epoll 。 select &#xff1a; 主旨思想&#xff1a; 1. 首先要构造一个关于文…

查询排序(2)

Oracle从入门到总裁:https://blog.csdn.net/weixin_67859959/article/details/135209645 1.选择部门 30 中的所有员工 SQL> select *2 from emp3 where deptno 30;EMPNO ENAME JOB MGR HIREDATE SAL COMM …

《动手学深度学习(PyTorch版)》笔记2

Chapter2 Preliminaries 2.1 Automatic Differentiation 让计算机实现微分功能&#xff0c; 有以下四种方式&#xff1a; - 手工计算出微分&#xff0c; 然后编码进代码 - 数值微分 (numerical differentiation) - 符号微分 (symbolic differentiation) - 自动微分&#xff0…

搜维尔科技:【简报】元宇宙数字人赛道,《莉思菱娜》

个性有些古灵精怪时儿安静时而吵闹&#xff0c;虽然以人类寿命来算已经200多岁但在 吸血鬼中还只是个小毛头&#xff0c;从中学开始喜欢打扮偏爱黑白灰色系的服装喜欢时 尚圈&#xff0c;立志想成为美妆或时尚网红不过目前还是学生&#xff0c;脸上的浅色血迹是纹身 贴纸&#…

Javat集合之Lis---(ArrayList和LinkedList)

文章目录 一、 List概述1.1概念1.2list体系结构图1.3 通用方法测试代码 二、List的特点三、遍历方式foreachfor循环迭代器 四、ArrayListArrayList概述概念数据结构 ArrayList的特点 ArrayList去重字符串去重对象去重 五、LinkedListLinkedList概述概念数据结构LinkedList的特点…

一键轻松,免费创造:QuickQR带你体验AI二维码的轻松生成!

当今时代&#xff0c;将信息快速转变为可扫描图案&#xff0c;以简化人们的生活和工作方式&#xff0c;二维码技术展现了它强大的功能。特别是在分享链接、联系信息或进行支付时&#xff0c;二维码已成为现代社会一个不可或缺的部分。本文将探讨生成AI二维码的一种工具&#xf…

线性表--队列

1.什么是队列&#xff1f; 队列是只允许在一端进行插入数据操作&#xff0c;在另一端进行删除数据操作的特殊线性表&#xff0c;队列具有先 进先出FIFO(First In First Out) &#xff1b; 入队列&#xff1a;进行插入操作的一端称为队尾&#xff1b; 出队列&#xff1a;进行…

幻兽帕鲁服务器搭建,包教包会

服务器搭建 幻兽帕鲁服务器搭建&#xff0c;包教包会&#xff0c;不会评论区评论手把手帮忙搭建 一、steamCMD安装 1、安装screen&#xff1a; yum install screen -y 2、切换用户&#xff1a; su -ls /bin/bash steam 3、切换至steam用户目录&#xff1a; cd ~ 4、下载ste…

如何在docker容器中安装Elasticsearch中的IK分词器

目录 &#xff08;1&#xff09;准备IK分词器的压缩包 &#xff08;2&#xff09;进入docker容器 &#xff08;3&#xff09;移动ik分词器到指定文件夹 &#xff08;4&#xff09;解压分词器压缩包 &#xff08;5&#xff09;测试IK分词器是否安装成功 &#xff08;1&#…

Redis核心技术与实战【学习笔记】 - 3.Redis服务高可靠

1.数据同步&#xff1a;主从库如何实现数据一致&#xff1f; 前面我们学习了 AOF 和 RDB&#xff0c;如果 Redis 发生了宕机&#xff0c;它们可以分别通过回放日志和重新读入 RDB 文件的方式恢复数据&#xff0c;从而保证尽量较少丢失数据&#xff0c;提升可靠性。 不过&…

vue3 + antd 封装动态表单组件(二)

传送带&#xff1a; vue3 antd 封装动态表单组件&#xff08;一&#xff09; 前置条件&#xff1a; vue版本 v3.3.11 ant-design-vue版本 v4.1.1 vue3 antd 封装动态表单组件&#xff08;一&#xff09;是基础版本&#xff0c;但是并不好用&#xff0c; 因为需要配置很多表…

VR拍摄+制作

1.VR制作需要的图片宽高是2:1&#xff0c;需要360✖️180的图片&#xff0c;拍摄设备主要有两种&#xff1a; 1&#xff09;通过鱼眼相机拍摄&#xff0c;拍摄一组图片&#xff0c;然后通过PTGui来合成(拍摄复杂) 2&#xff09;全景相机&#xff0c;一键拍摄直接就能合成需要的…

Android颜色选择器

Android颜色选择器&#xff0c;弹框提示选择颜色。效果如图。点击或者滑动圆环和底部横向渐变色调整颜色&#xff0c;中间圆圈的颜色就是最终选中的颜色。点击圆圈确认颜色。 使用 //颜色选择Dialogprivate void showColorPickDialog(int position, int colorInt){ColorPickerD…

数据结构(绪论+算法的基本概念)

文章目录 一、绪论1.1、数据结构的基本概念1.2、数据结构三要素1.2.1、逻辑结构1.2.2、数据的运算1.2.3、物理结构&#xff08;存储结构&#xff09;1.2.4、数据类型和抽象数据类型 二、算法的基本概念2.1、算法的特性2.2、“好”算法的特质2.2.1、算法时间复杂度2.2.2、算法空…

【Linux】:线程安全的单例模式

线程安全的单例模式 一.STL和智能指针的安全二.单例模式1.基本概念2.懒汉和饿汉的实现方式 三.常见的其它锁四.读者写者模型 一.STL和智能指针的安全 1.STL中的容器是否是线程安全的? 不是. 原因是, STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性…

[SwiftUI]系统弹窗和自定义弹窗

一、系统弹窗 在 SwiftUI 中&#xff0c;.alert 是一个修饰符&#xff0c;用于在某些条件下显示一个警告对话框。Alert 可以配置标题、消息和一系列的按钮。每个按钮可以是默认样式、取消样式&#xff0c;或者是破坏性的样式&#xff0c;它们分别对应不同的用户操作。 1.Aler…