【JavaEE】阻塞队列

【JavaEE】阻塞队列

  • 一、什么是阻塞队列?
  • 二、阻塞队列的特点
  • 三、阻塞队列的常用方法
      • 3.1 抛出异常
      • 3.2 有返回结果,不会抛出异常
      • 3.3 阻塞
  • 四、常见的阻塞队列
      • 4.1 ArrayBlockingQueue
      • 4.2 LinkedBlockingQueue
      • 4.3 SynchronousQueue
      • 4.4 PriorityBlockingQueue
      • 4.5 DelayQueue
  • 五、自己实现阻塞队列
      • 5.1 成员变量
      • 5.2 构造方法
      • 5.3 put()方法
      • 5.4 take()方法
      • 5.5 全部代码
  • 六、阻塞队列的优点

博客结尾有此篇博客的全部的代码!!!

一、什么是阻塞队列?

前面我们学习过队列(Queue)这种数据结构—“先进先出”。
今天学的阻塞队列(BlockingQueue),它继承于Queue接口,是队列的一种。

public interface BlockingQueue<E> extends Queue<E>

Queue和BlockingQueue都是在Java5中加入的,BlockingQueue是线程安全的的队列,它在队列为空时,获取元素的操作将会被阻塞;在队列满时,存储元素的操作也会被阻塞。

举例说明一下:
盖房子!之前盖房子是请大工和小工,雇人盖房子!
大工:负责刷墙(将打好的灰刷在墙上)。
小工:负责打灰(将石灰和沙子搅拌在一起)。
灰盆:负责放灰的(容量大小有限)。
小工负责将打好的灰放到灰盆中,大工负责将灰盆中的灰刷到墙上。
在这里插入图片描述

假如小工打灰很快,他一次性打了很多灰,但是灰盆的容量是有限的,他就要等大工将灰盆中的灰用完,他才能往灰盆中再次加灰;假设大工是个老手,他粉刷很快,一下就将灰盆中的灰用完了,但是小工的下一盆灰没有及时供上,此时大工是不是就需要等待小工打好灰之后,他才能继续工作!这两种情况都是发生了阻塞等待!而灰盆则相当于阻塞队列!
这个例子就应该能帮助大家对阻塞对列有了一定的理解了吧!

二、阻塞队列的特点

  • 线程安全:内部通过锁机制保证线程安全。
  • 阻塞操作:当队列为空时,尝试从队列中获取元素的操作会被阻塞;当队列满时,尝试向队列中添加元素的操作会被阻塞。
  • 容量限制:阻塞队列可以有容量限制,也可以是无界队列。

这里的无界队列并不是意味可以放无限个元素,无界也是有上限的!例如LinkedBlcokingQueue的上限是Integer.MAX_VALUE(-2,147,483,648到 2,147,483,647)。而有界队列就算队列中元素已满,也是不会扩容的。

三、阻塞队列的常用方法

  • 抛出异常:add、remove、element。
  • 返回结果但是不抛出异常:offer、poll、peek。
  • 阻塞:take、put。

3.1 抛出异常

这段代码可以发现,我们将容量设置为10,t1负责add元素,t2负责remove元素,但是t2在最开始会休眠1秒钟,在休眠的时间中,t1肯定add超过10个元素放入queue中。所以这里报错。

add()抛出异常(队列中元素已满,再添加元素,就会报错):

public static void main(String[] args) {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);
        Thread t1 = new Thread(()->{
            while(true){
                queue.add(1);
            }

        });
        Thread t2 = new Thread(()->{
            while(true){
                try {
                    Thread.sleep(1000);
                    queue.remove();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }

            }
        });
        t1.start();
        t2.start();

    }

在这里插入图片描述
remove()抛出异常(对列中没有元素,抛出异常):

public class Demo2 {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue2 = new ArrayBlockingQueue<Integer>(10);
        Thread t1 = new Thread(()->{
            while(true){
                try {
                    Thread.sleep(1000);
                    queue2.add(1);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

        });
        Thread t2 = new Thread(()->{
            while(true){
                    queue2.remove();
            }
        });
        t1.start();
        t2.start();
    }
}

在这里插入图片描述
element()抛出异常(返回队列首元素,但不删除,当队列为空,抛出异常):

public static void main(String[] args) {
        BlockingQueue<Integer> queue2 = new ArrayBlockingQueue<Integer>(10);
        Thread t1 = new Thread(()->{
            while(true){
                try {
                    Thread.sleep(1000);
                    queue2.add(1);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }

            }

        });
        Thread t2 = new Thread(()->{
            while(true){
//                    queue2.remove();
                queue2.element();
            }
        });
        t1.start();
        t2.start();
    }

在这里插入图片描述

3.2 有返回结果,不会抛出异常

offer():插入成功true,插入失败false
poll():移除成功,则打印移除元素,没有则null
peek():返回队列元素,但不删除,如果队列为空,则返回null

public class Demo4 {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue4 = new ArrayBlockingQueue<Integer>(1);
        Thread t1 = new Thread(()->{
            for (int i = 0; i < 2; i++) {
                System.out.println(queue4.offer(1));
            }
        });
        Thread t2 = new Thread(()->{
            for (int i = 0; i < 3; i++) {
                try {
                    Thread.sleep(1000);
                    System.out.println(queue4.poll());
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        t1.start();
        t2.start();
    }
}

在这里插入图片描述

3.3 阻塞

public class Demo5 {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue5 = new ArrayBlockingQueue<Integer>(10);
        Thread t1 = new Thread(()->{
            while(true){
                try {
                    queue5.put(1);
                    System.out.println("生产者生产元素"+queue5.size());
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        Thread t2 = new Thread(()->{
            while(true){
                try {
                    queue5.take();
                    System.out.println("消费者消耗元素"+queue5.size());
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        t1.start();
        t2.start();
    }
}

由于容量是10,所以生产者生产的元素就是10 以内的。
在这里插入图片描述

public class Demo6 {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue6 = new ArrayBlockingQueue<Integer>(10);
        Thread t1 = new Thread(()->{
            while(true){
                try {
                    Thread.sleep(1000);
                    queue6.put(1);
                    System.out.println("生产者生产元素"+queue6.size());
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        Thread t2 = new Thread(()->{
            while(true){
                try {
                    queue6.take();
                    System.out.println("消费者消耗元素"+queue6.size());
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        t1.start();
        t2.start();
    }
}

这里对生产者加了休眠,所以消费者会出现阻塞等待,等待生产者生产元素。
在这里插入图片描述

四、常见的阻塞队列

4.1 ArrayBlockingQueue

  • 基于数组实现的有界阻塞队列。
  • 具有固定的容量,初始化时必须指定容量大小。
  • 适合容量固定且需要高性能的场景。

4.2 LinkedBlockingQueue

  • 基于链表实现的可选界阻塞队列。
  • 默认为无界队列(实际是有限制的,最大容量为Integer.MAX_VALUE),也可以指定容量。
  • 性能通常比ArrayBlockingQueue更好,尤其是在高并发场景下。

4.3 SynchronousQueue

  • 特殊的阻塞队列,不存储元素。
  • 生产者线程必须等待消费者线程取走元素后才能继续生产。
  • 适合直接传递数据的场景,常用于线程池中的任务传递。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;

public class Demo7 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> queue = new SynchronousQueue<>();
        new Thread(() -> {
            try {
                System.out.println(queue.take()); // 消费者线程
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        Thread.sleep(5000);
        queue.put(1); // 生产者线程
    }
}

4.4 PriorityBlockingQueue

  • 基于优先级的无界阻塞队列。
  • 元素按照自然顺序或指定的比较器排序。
  • 不保证线程安全的公平性,但保证优先级最高的元素总是最先被取出。
public class Demo8 {
    public static void main(String[] args) throws InterruptedException {
            BlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
            queue.put(3);
            queue.put(1);
            queue.put(2);
            System.out.println(queue.take()); // 输出1
        }
    }

优先级确定方式:

  1. 自然排序(Comparable 接口):例如这里是从小到大,所以输出是1
  2. 自定义比较器(Comparator)

4.5 DelayQueue

  • 基于优先级的无界阻塞队列。
  • 只有当元素的延迟时间到期后,才能被取出。
  • 元素必须实现Delayed接口。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

class DelayedTask implements Delayed {
    private final long startTime;
    private final String name;

    public DelayedTask(long startTime, String name) {
        this.startTime = startTime;
        this.name = name;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(startTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return Long.compare(this.startTime, ((DelayedTask) o).startTime);
    }

    @Override
    public String toString() {
        return name;
    }
}

public class DelayQueueExample {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Delayed> queue = new DelayQueue<>();
        queue.put(new DelayedTask(System.currentTimeMillis() + 2000, "Task1"));
        queue.put(new DelayedTask(System.currentTimeMillis() + 1000, "Task2"));

        System.out.println(queue.take()); // 输出Task2
    }
}

五、自己实现阻塞队列

5.1 成员变量

    private int capacity = 0xffff;
    private Integer[] elem ;//存储数组
    private int size;//存储元素个数
    private int head;//队头
    private int tail;//队尾

5.2 构造方法

当你传入值的时候,capacity就是你传入的那个值,就不是默认值了。

 public MyQueue() {
        elem = new Integer[capacity];
    }
    public MyQueue(int capacity) {
        this.capacity = capacity;
        elem = new Integer[capacity];
    }

5.3 put()方法

由于这里的put()方法涉及元素的添加,所以将这段代码放入锁中,避免了原子性带来的线程安全问题。

这里使用while不使用if,为了防止wait()被虚假唤醒,如果被虚假唤醒if就会执行后面的代码,而while还是会再检查一遍

举个例子:

  • 线程A此时元素个数已满
  • 线程B此时移除线程A元素,并且激活wait ()
  • 线程C在此时又往线程A中添加元素,使线程A元素又达到已满状态,但此时代码会继续向下执行
public void put(int key) throws InterruptedException {
        synchronized (this) {
            while (size == elem.length) {
                this.wait();
            }
            elem[tail] = key;
            tail = (tail + 1) % capacity;
            size++;
            this.notify();
        }
    }

5.4 take()方法

        public int take() throws InterruptedException {
            synchronized (this) {
                while(size == 0) {
                    this.wait();
                }
                int ret = elem[head];
                head = (head + 1) % capacity;
                size--;
                this.notify();
                return ret;
            }
    }

5.5 全部代码

class MyQueue {
    private int capacity = 0xffff;
    private Integer[] elem ;//存储数组
    private int size;//存储元素个数
    private int head;//队头
    private int tail;//队尾

    public MyQueue() {
        elem = new Integer[capacity];
    }
    public MyQueue(int capacity) {
        this.capacity = capacity;
        elem = new Integer[capacity];
    }

    public void put(int key) throws InterruptedException {
        synchronized (this) {
            while (size == elem.length) {
                this.wait();
            }
            elem[tail] = key;
            tail = (tail + 1) % capacity;
            size++;
            this.notify();
        }
    }

        public int take() throws InterruptedException {
            synchronized (this) {
                while(size == 0) {
                    this.wait();
                }
                int ret = elem[head];
                head = (head + 1) % capacity;
                size--;
                this.notify();
                return ret;
            }
    }
}


public class Demo9 {
    public static void main(String[] args) throws InterruptedException {
        MyQueue myQueue = new MyQueue(5);
        myQueue.put(1);
        myQueue.put(2);
        myQueue.put(3);
        myQueue.put(4);
        while(true) {
            System.out.println(myQueue.take());
        }
    }
}

六、阻塞队列的优点

  1. 降低解耦合

假设代码1和代码2之间直接进行交互,这个时候修改代码1,代码2大概率就会受到影响,但是中间加个阻塞队列当作交互平台的话,就大大降低了代码1和代码2之间的耦合性。

  1. 削峰削谷

假设两个服务器进行交互请求,服务器1消耗资源少,产生请求量高,服务器2消耗资源大,接受请求量低。当正常进行交互的时候,服务器1的大量请求就会发送给服务器2,此时服务器2由于处理不了大量请求,有可能就会挂掉!
当加入阻塞队列后,服务器1的请求就会先放到阻塞队列中,服务器2就会根据自己的能力,需要接受多少请求就拿多少请求。

此篇博客的全部代码!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/982487.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Python项目】基于深度学习的车辆特征分析系统

【Python项目】基于深度学习的车辆特征分析系统 技术简介&#xff1a;采用Python技术、MySQL数据库、卷积神经网络&#xff08;CNN&#xff09;等实现。 系统简介&#xff1a;该系统基于深度学习技术&#xff0c;特别是卷积神经网络&#xff08;CNN&#xff09;&#xff0c;用…

蓝桥杯备赛:每日一题

只学习不思考不记笔记假把式 这道题目的难度很难&#xff0c;当然主要的原因在于模型的转化&#xff0c;刚看的这道题也是一脸懵&#xff0c;但是转换成覆盖模型后就好了很多&#xff0c;归跟接地就是每块区域的中取最大的最少的牛覆盖天数&#xff0c;然后根据这个天数求每一块…

7. 机器人记录数据集(具身智能机器人套件)

1. 树莓派启动机器人 conda activate lerobotpython lerobot/scripts/control_robot.py \--robot.typelekiwi \--control.typeremote_robot2. huggingface平台配置 huggingface官网 注册登录申请token&#xff08;要有写权限&#xff09;安装客户端 # 安装 pip install -U …

突破极限:高性能ROCK 220A-M 工业级无人机电调深度测评 —— 无人机动力系统的核心守护者

引言 在无人机技术高速发展的今天&#xff0c;动力系统的稳定性与效率成为决定任务成败的关键。作为工业级电调领域的标杆产品&#xff0c;ROCK 220A-M 凭借其卓越的性能与多重安全设计&#xff0c;为专业级无人机应用提供了可靠的动力解决方案。本文将从技术架构、防护…

OceanBase-obcp-v3考试资料梳理

集群架构 基本概念 集群: 集群由一个或多个Region组成,Region 由一个或多个Zone组成,Zone由一个或多个OBServer组成,每个OBServer里有若干个partition的Replica。 Region: 对应物理上的一个城市或地域,当OB集群由多个Region组成时, 数据库的数据和服务能力就具备地域…

深度学习PyTorch之13种模型精度评估公式及调用方法

深度学习pytorch之22种损失函数数学公式和代码定义 深度学习pytorch之19种优化算法&#xff08;optimizer&#xff09;解析 深度学习pytorch之4种归一化方法&#xff08;Normalization&#xff09;原理公式解析和参数使用 深度学习pytorch之简单方法自定义9类卷积即插即用 实时…

Gartner发布2025年网络安全六大预测

文章目录 前言趋势1&#xff1a;生成式AI推动数据安全计划趋势2&#xff1a;管理机器身份趋势3&#xff1a;战术型AI趋势4&#xff1a;优化网络安全技术趋势5&#xff1a;扩大安全行为与文化计划的价值趋势6&#xff1a;应对网络安全倦怠 前言 Gartner发布2025年网络安全六大预…

WPS Word中英文混杂空格和行间距不一致调整方案

文章目录 问题1&#xff1a;在两端对齐的情况下&#xff0c;如何删除参考文献&#xff08;英文&#xff09;的空格问题2&#xff1a;中英文混杂行间距不一致问题问题3&#xff1a;设置中文为固定字体&#xff0c;设置西文为固定字体参考 问题1&#xff1a;在两端对齐的情况下&a…

代码随想录算法训练营第22天 | 组合 组合总和 电话号码的字母组合

77. 组合 77. 组合 - 力扣&#xff08;LeetCode&#xff09; class Solution {List<Integer> path new ArrayList<>();List<List<Integer>> result new ArrayList<>();public void backTracking(int n,int k,int startIndex){if(path.size() …

Hadoop、Hive、Spark的关系

Part1&#xff1a;Hadoop、Hive、Spark关系概览 1、MapReduce on Hadoop 和spark都是数据计算框架&#xff0c;一般认为spark的速度比MR快2-3倍。 2、mapreduce是数据计算的过程&#xff0c;map将一个任务分成多个小任务&#xff0c;reduce的部分将结果汇总之后返回。 3、HIv…

从0开始的操作系统手搓教程21:进程子系统的一个核心功能——简单的进程切换

目录 具体说说我们的简单RR调度 处理时钟中断处理函数 调度器 schedule switch_to 我们下面&#xff0c;就要开始真正的进程切换了。在那之前&#xff0c;笔者想要说的是——我们实现的进程切换简单的无法再简单了——也就是实现一个超级简单的轮询调度器。 每一个进程按照…

EP 架构:未来主流方向还是特定场景最优解?

DeepSeek MoE架构采用跨节点专家并行&#xff08;EP&#xff09;架构&#xff0c;在提升推理系统性能方面展现出巨大潜力。这一架构在发展进程中也面临诸多挑战&#xff0c;其未来究竟是会成为行业的主流方向&#xff0c;还是仅适用于特定场景&#xff0c;成为特定领域的最优解…

win11编译llama_cpp_python cuda128 RTX30/40/50版本

Geforce 50xx系显卡最低支持cuda128&#xff0c;llama_cpp_python官方源只有cpu版本&#xff0c;没有cuda版本&#xff0c;所以自己基于0.3.5版本源码编译一个RTX 30xx/40xx/50xx版本。 1. 前置条件 1. 访问https://developer.download.nvidia.cn/compute/cuda/12.8.0/local_…

前端到AI,LangChain.Js(五)

学习地址&#xff1a; 学习小册 实战 基于前面的RAG模块&#xff0c;可以通过构建本地存储向量数据库&#xff0c;本地存储聊天记录&#xff0c;部署成stream API&#xff0c;做一个chat bot。 Agents模块&#xff0c;可以通过tools进行数据标签和信息提取&#xff0c;通过RU…

使用 Arduino 和 Wi-Fi 控制 RGB LED

通过 WiFi 的 Arduino RGb LED 控制器 &#xff0c;在本文中&#xff0c;我们将介绍下一个基于 IOT 的项目 - 使用 Wi-Fi 的 RGB LED 闪光灯。在这里&#xff0c;我们使用 Arduino 和 ESP8266 Wi-Fi 模块通过 Android 手机通过 Wi-Fi 控制 RGB LED 的颜色。 在这个 RGB Flash…

C# 开发工具Visual Studio下载和安装

开发环境与工具 C#的主要开发环境是Visual Studio&#xff0c;这是一个功能强大的集成开发环境&#xff08;IDE&#xff09;&#xff0c;集成了代码编辑、调试、项目管理、版本控制等功能。此外&#xff0c;Visual Studio Code也是一个轻量级的跨平台代码编辑器&#xff0c;支…

HTML + CSS 题目

1.说说你对盒子模型的理解? 一、是什么 对一个文档进行布局的时候&#xff0c;浏览器渲染引擎会根据标准之一的css基础盒模型&#xff0c;将所有元素表示为一个个矩形的盒子。 一个盒子由四个部分组成: content&#xff0c;padding&#xff0c;border&#xff0c;margin 下…

数据结构–栈

数据结构–栈 什么是栈&#xff1f; 首先先给大家讲一下栈是什么&#xff1a;栈是一种特殊的线性表。特殊之处就在于对栈的操作的特殊。对于栈&#xff0c;只允许其在固定的一端进行插入和删除元素操作。不像普通的顺序表&#xff0c;链表&#xff0c;可以在任意位置进行删除插…

通过Docker搭个游戏——疯狂大陆(Pkland)

最近在研究我的服务器&#xff0c;在服务器上搭了很多docker的项目&#xff0c;然后找着找着发现一个能用Docker配置环境的游戏叫Pkland。 项目地址&#xff1a;GitHub - popkarthb/pkland: 疯狂大陆是一款多人在线的战略游戏。 游戏操作简捷,您仅需要使用浏览器就可以在任何时…

03.06 QT

一、使用QSlider设计一个进度条&#xff0c;并让其通过线程自己动起来 程序代码&#xff1a; <1> Widget.h: #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QThread> #include "mythread.h"QT_BEGIN_NAMESPACE namespace Ui {…