hi,大家好,今天为大家带来多线程案例--阻塞队列
这块知识点也很重要,要好好掌握呀~~~
🌸🌸🌸🌸🌸🌸🌸🌸🌸🌸🌸🌸🌸🌸🌸🌸🌸🌸🌸🌸🌸🌸🌸🌸🌸🌸🌸🌸🌸
目录
💚1.什么是阻塞队列
💚2.生产者消费者模型
💚3标准库实现阻塞队列
💚4.自己实现一个阻塞队列
1.阻塞队列
我们之前在数据结构已经学了队列,什么是队列,我们来回忆一下,队列,是一种数据结构,先进先出.
阻塞队列也如此,先进先出,但是相比队列,它带有阻塞功能,当队列为满,要阻塞等待,当队列为空的时候,要阻塞等待.
因此,阻塞队列是线程安全的数据结构
当队列满的时候,会进入阻塞等待,直到有线程从队列取出元素
当队列空的时候,会进入阻塞等待,直到有线程在队列添加元素
此时这个案板就是相当于一个阻塞队列
当饺子皮放满了,妈妈和小女儿就会告诉爸爸和大女儿先别擀面皮了,那么生产者就可以阻塞等待一会,当案板皮没了,爸爸和大女儿告诉消费者阻塞等待一会
那么这样做提高了线程执行效率
下面来说一说阻塞队列的好处
1.让代码块之间解耦合
啥是耦合呢,就是代码块和代码块之间的关联性的强弱
举个例子,当自己的好朋友生病了,作为好友,我需要去探望,那么如果是不相关的ABCD,那么我可以直接不管,我和我好友的耦合性就很高,相反,我和ABCD耦合性很低
说到这里,顺便说一下啥是内聚
内聚就是功能一样的代码放在一起,再举个例子,衣服要分门别类的放,就是相同的一类的要放在一起
我们写代码,要遵守"高内聚,低耦合"
我们再来举一个计算机的例子
A服务器给B服务器发送请求,B服务器给A响应
服务器处理请求的时候,很耗费硬件资源,包括不限于(CPU,内存,硬盘,带宽)......
所以当某个硬件资源达到瓶颈,服务器就有挂的风险
假设请求量非常多,那么B回应不过来,B服务器很有可能就挂掉了,它俩耦合性就很高,那么A也就挂掉了
所以我们可以采用增加一个阻塞队列的方式
这样增加一个阻塞队列,降低了耦合性,B挂,不影响A
2.削峰填谷
1.削峰
还用这个例子,当A的请求很多的时候,会影响B的响应速率吗,答案是不会!
阻塞队列帮A承担了很多请求,让B保证平稳的速率响应请求,这就是削峰的意思
2.填谷:
当A需求猛增以后,迅速进入猛减期,此时会影响B响应速率吗,还是不会!
阻塞队列会自动调节,当 请求量突然骤减,阻塞队列会拿出之前积压的请求分配给B,这就是填谷
说完作用,差不多介绍完了,现在来看一看实现吧
3.标准库实现阻塞队列
采用BlockingQueue
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
//生产者消费者模型
public class ThreadDemo3 {
public static void main(String[] args) {
BlockingQueue<Integer> queue=new LinkedBlockingQueue();
//消费者
Thread t1=new Thread(()->{
while(true){
try {
int value=queue.take();
System.out.println("消费"+value);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
//生产者
Thread t2=new Thread(()->{
int value=0;
while(true){
try {
queue.put(value);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("生产"+value);
value++;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t2.start();
}
}
标准库实现比较简单,我们自己实现就有难度了
4.自己实现阻塞队列
需要3步
1.实现一个普通队列
2.加上线程安全(加锁)
3.增加阻塞等待功能
废话不多说,上代码
//自己实现阻塞队列
class MyBlockingQueue{
volatile private int[] items=new int[1000];
volatile private int head=0;
volatile private int tail=0;
volatile private int size=0;
//入队列
synchronized public void put(int elem) throws InterruptedException {
if(size==items.length){
//return;
this.wait();
}
items[tail]=elem;
tail++;
if(tail==items.length){
tail=0;
}
size++;
this.notify();
}
synchronized public Integer take() throws InterruptedException {
if(size==0){
// return null;
this.wait();
}
int value=items[head];
head++;
if(head==items.length){
head=0;
}
size--;
this.notify();
return value;
}
}
public class ThreadDemo2 {
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue();
//消费者
Thread t1 = new Thread(()->{
while (true) {
int value = 0;
try {
value = queue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("消费" + value);
}
});
t1.start();
Thread t2=new Thread(()->{
int value=0;
while(true){
try {
queue.put(value);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("生产"+value);
value++;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t2.start();
System.out.println("hello");
}
}
解释:
当队列满就阻塞等待,取出元素再唤醒,继续执行增加操作(如蓝色线)
当队列空就阻塞等待,放进元素再唤醒,继续执行取出操作(如红色线)
在put和take方法上加上锁,保证原子性,线程安全
在这个操作中涉及到多次读以及修改操作,为了保证读取到的数据是正确的,也就会为了保证内存可见性,我们采用增加关键字volatile的操作
并且上述不可能同时wait,不可能有一个队列又空又满!!!
这里还有最后一个小问题
为啥要用while,不用if,?
看看wait的源码
wait可能会在线程执行过程中被提前唤醒,条件没成熟就醒了,这不符合代码逻辑,所以我们把if改为while,wait操作之前,发现条件不满足wait,等到wait被唤醒以后,再次判断wait唤醒是不是因为满足条件,不满足,就继续等待!!!
以上就是今天的所有内容了,我们下期再见啦!!!