文章目录
- 一、什么是阻塞队列?
- 二、阻塞队列的功能
- 2.1 线程安全
- 2.2 具有阻塞功能
- 三、阻塞队列的作用(生产者——消费者模型的作用)
- 3.1 生产者-消费者模型
- 3.2 解耦合
- 3.3 削锋填谷
- 什么是消息队列
- 什么是中间件?
- 四、阻塞队列的具体使用
- 4.1 使用标准库提供的类
- 4.2 自我实现一个阻塞队列
一、什么是阻塞队列?
学习了数据结构,我们会接触到 队列 此数据结构。
队列的特点是 先进先出 。那么如何理解“先进先出”这个概念呢?
下图中,右边是一组数:0 1 2 3 4 5 6 7 8,从左至右依次将数字放入队列queue中,再依次将队列中的数字取出来,此时由于0先进队列,因此0也是先出队列的。
再举个例子。排队做核酸。先排队的人,就能够先做完核酸,后排队的人,就只能晚点做核酸。
队列有许多种类,譬如 优先级队列、消息队列、阻塞队列。此处我们重点介绍 阻塞队列。阻塞队列具有队列的一切特点,也是具有先进先出的特点。但由于阻塞队列是队列的一种特例,因此阻塞队列有自己的功能特点。
二、阻塞队列的功能
2.1 线程安全
队列是线程不安全的,但阻塞队列是线程安全的。
2.2 具有阻塞功能
阻塞队列除了线程安全,还具有阻塞功能。其阻塞功能具体表现在:
1、元素入队列时,如果当前队列已经满了,就会队列阻塞,直到队列不满(队列有空闲大小时),该元素才能入队列成功。
2、元素出队列时,如果队列为空,队列就会阻塞,直到队列不空,元素才能出队。
三、阻塞队列的作用(生产者——消费者模型的作用)
阻塞队列的应用场景一般是在:生产者——消费者模型 中使用。
生产者——消费者问题,描述的是多线程协同工作的一种方式。 而阻塞队列又是生产者——消费者模型的一个应用场景。
3.1 生产者-消费者模型
那我们怎么理解 生产者——消费者模型是描述多线程协同工作的一种方式 这句话呢??
举个例子。教师节,A、B、C三个学生打算折1000只千纸鹤当作礼物送给老师,他们准备了很多种颜色的彩纸,因为只买了一把剪刀,所以A同学负责把彩纸剪切成小纸张,B同学、C同学就负责使用小纸张折成千纸鹤。
如果A同学剪切彩纸的速度很快,B、C同学折叠千纸鹤的动作稍慢,导致桌子上剩余了很多彩纸,此时A同学就可以稍作休息,等待B、C同学把桌上剩余的小纸张消耗得差不多了,再开始继续剪切彩纸。
相反的,如果B、C同学折叠千纸鹤的速度飞快,不到一会儿就将桌子上的小纸张消耗得一干二净,此时B、C就可以稍作休息,等待A同学剪切更多的彩纸放到桌子上之后,再继续折叠千纸鹤。
A、B、C同学3个人虽然各司其职,但是却又是为了1000只千纸鹤这同一个目标而做事(协同工作)。在协同工作的过程中,会遇到各种场景:桌子上的彩纸过多时(阻塞队列中资源过多,队列满了),A同学稍作休息(阻塞等待);桌子上的彩纸没有时(阻塞队列中没有资源,队列空了),B、C同学稍作休息(阻塞等待)。
3.2 解耦合
系统中含有A、B两个服务器,A、B服务器之间进行通信。A服务器发送请求,B服务器响应后返回数据给A服务器。随着业务场景的变化,新增了C服务器,C服务器也希望收到来自A服务器的请求,此时A服务器就需要修改自身代码为C服务器提供接口,以保证C服务器也能收到请求。
此时A服务器需要修改的代码还不复杂,但是如果随着业务复杂度的提高,有成百上千个服务器也希望A服务器能够为他们提供接口以收到来自A服务器的请求,此时,A服务器需要修改的代码量就很庞大,同时 A服务器与这些服务器之间的耦合过高,如果后续其他服务器出现了什么问题,直接影响到A服务器的使用,同样,如果A服务器出现了什么问题,也直接影响到其他服务器的使用。
为了进行解耦合,在A服务器与其他服务器之间加入一个 阻塞队列。其他服务器通过这个阻塞队列与A服务器进行间接通信,如果还有其他服务器想要收到来自A服务器的请求,只需要修改阻塞队列这个中间件中的代码即可,此时无论哪一个服务器出了什么问题,都影响不到其他服务器,此时,服务器之间的耦合大大降低。
3.3 削锋填谷
众所周知,河流是人类的发源地。人类一般以部落为群,居住在河的下流。假如说,有一天由于雨季频繁,上流发大水,湍急的水流冲刷到下流,此时居住在下流的人类就会遭遇洪灾。因此为了避免此类自然灾难,人类就想到了在河流的中部修建一座水库,当上流发大水时,湍急的大水先来到水库,水库储存这些湍急的水流,然后再缓慢的将这些湍急的水流排至下流,此时,该水库作为下流的一个缓冲,下流就能避免再遭受湍急的洪灾了,哪怕上流的水量巨大且湍急,水库也无法储存这么多水流,水库的存在,也可以为下流的疏散争取时间。如果遇上旱期,下流没有水流了,也可以打开水库,将水库中储存的水流排至下流,以供人类日常生活。
阻塞队列就是这么一个存在。
什么是消息队列
所谓的消息队列(Message Queue),就是将阻塞队列这样的数据结构,单独提取出来作为一个程序,部署在一组服务器上。
消息队列服务器,也就是我们平常所说的MQ,也是一种常见的“中间件”。
什么是中间件?
中间件是一类“通用的”服务器的统称。当我们进行书写代码的时候,一部分是业务代码,一部分是通用代码(与业务无关的代码)。那怎么理解“通用代码”这个词呢?
举个例子:像百度公司其涉及的业务一般是搜索、腾讯公司一般涉及的业务是游戏、社交,阿里公司一般涉及的业务是电商,他们的业务各不相同,因此开发时所写的业务代码也不同,但是他们都需要进行数据的存储,因此此时进行数据存储的代码就是通用代码。
因此就有人想出可以专门弄一些程序来负责数据存储,这个程序做好了之后,任何有数据存储需求的开发者、公司都可以使用该程序进行数据存储。
一般我们选择使用数据库进行数据的持久化存储,目前主流的数据库有MySql、redis、sql server…像数据库这样的程序就可以视为一个“中间件”。
目前主流的消息队列有RabbitMQ、RocketMq、kafka、ActiveMQ。
四、阻塞队列的具体使用
4.1 使用标准库提供的类
标准库为我们提供了一个类:BlockingQueue ,由于这是一个接口,不能直接进行 new 实例,因此我们可以基于链表/数组实现阻塞队列。
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> blockingDeque
= new LinkedBlockingDeque<>(100);
// 往队列中插入3个元素
blockingDeque.put(1);
blockingDeque.put(2);
blockingDeque.put(3);
//将队列里的3个元素全取出来并打印在控制台上
Integer ret1 = blockingDeque.take();
System.out.println(ret1);
Integer ret2 = blockingDeque.take();
System.out.println(ret2);
Integer ret3 = blockingDeque.take();
System.out.println(ret3);
//将队列里的3个元素全取出来了,此时队列为空
//尝试在为空的队列中继续取元素,此时会发生什么?
Integer ret4 = blockingDeque.take();
System.out.println(ret4);
}
我们已经知道了阻塞队列的特点了,所以在为空的阻塞队列里继续取元素,线程会阻塞等待,因此此时控制台并不会结束进程,光标一闪一闪的,表示此时被阻塞。
阻塞队列中,一般使用put() 方法进行入队列操作,offer() 方法也是用于入队列操作,但是一般不推荐使用该方法进行入队列操作。使用 take() 方法 进行出队列操作。
put()方法是往队列尾插入一个元素,take()方法是从队列首取出一个元素。
如果有想要深入了解 阻塞队列 的同学,可以对阻塞队列的源码进行一番研究,多专注、琢磨源码。可能有同学不知道这个阻塞队列的源码在哪里找,其实只要我们在IDEA中书写 阻塞队列 BlockingQueue这个接口后,将鼠标移动到BlockingQueue这个接口上,然后按住ctrl键不松开,同时鼠标点击BlockingQueue这个接口,IDEA会自动跳转到BlockingQueue这个接口这个源码里,此时我们就可以查看一些接口、类、方法的源码了。
4.2 自我实现一个阻塞队列
public class testBlockingQueue2 {
// 自定义数组大小
private static final int CAPACITY_COUNT = 100;
// 基于数组实现队列
private volatile int[] arr = new int[CAPACITY_COUNT];
// 指向队列头
private volatile int head = 0;
// 指向队列尾
private volatile int tail = 0;
// 队列中的有效元素个数
private volatile int size = 0;
// 该方法用于实现插入元素至队列
public void put(int elem){
synchronized (this){
// 判断当前队列是否满
if (size >= arr.length){
try {
// 队列满,则阻塞等待
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 将元素插入队列尾
arr[tail] = elem;
// 插入新元素后,指向队列尾的指针 + 1
tail++;
// 判断指向队列尾部的指针是否大于或等于队列长度,大于则让tail指向队列首
while (tail >= arr.length){
tail = 0;
}
// 队列插入新元素后,队列有效元素个数 + 1
size++;
// 当插入元素后,队列不为空,此时可以take()取元素,因此此处使用notify()进行唤醒
this.notify();
}
}
// 使用该方法取出队列中的元素(取的是队列的首元素)
public Integer take(){
synchronized (this){
// 判断队列是否为空,队列为空则阻塞等待
while (size == 0){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 队列不为空,将队列首元素取出来
int ret = arr[head];
// 指向队列首的指针 + 1
head++;
// 判断 head 是否走到队列末尾或者超过队列长度,如果是,让 head 重新指向队列头部
if (head >= arr.length){
head = 0;
}
// 将队列首元素取出队列成功后,队列有效元素个数 - 1
size--;
// 队列取出元素后,说明队列此时不是满的,可以进行put()操作,使用notify()唤醒put()方法的阻塞
this.notify();
// 将取出来的队列首元素返回
return ret;
}
}
}
class test{
public static void main(String[] args) {
testBlockingQueue2 queue2 = new testBlockingQueue2();
// 通过匿名内部类来创建两个线程进行演示生产者-消费者模型
Thread producer = new Thread(() -> {
int n = 0;
while (true){
try {
queue2.put(n);
System.out.println("producer 生产元素 = " + n);
n++;
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread consumer = new Thread(() -> {
while (true){
try {
int n = queue2.take();
System.out.println("consumer 消费元素 = " + n);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 启动线程
producer.start();
consumer.start();
}
}
自我实现一个阻塞队列,主要的难点是思考如何实现put()、take()的逻辑,顺利使用put()进行元素入队列,take()进行元素出队列操作。