目录
1.生产者消费者模型
2.使用标准库中的阻塞队列
3.模拟实现阻塞队列
在介绍阻塞队列之前,会先介绍一些前置知识,像队列:有普通队列、优先级队列、阻塞队列、和消息队列。前面两个是线程不安全的,而后面两个是线程安全的。本文重点介绍阻塞队列。
1.生产者消费者模型
1.1队列功能介绍
(1)阻塞队列
1)当队列为空时,尝试出队列;此时,队列会阻塞等待,直到队列不为空才继续执行出队列操作。
2)当队列为满时,尝试入队列;此时,队列就会阻塞等待,直到队列不为满为止才能继续执行入队操作。
(2)消息队列
并非遵循常规的先进先出,而是带有一个topic关键字。当出队时指定某个topic,就会先出topic下的元素(topic内部就遵循先进先出)
举例:例如到医院窗口排队,有很多种类型的窗口,如:妇科、儿科、骨科等等(这些称为topic),不是说,你先来了就一定可以就诊,而是等待你所在的topic是否呼唤你。
像上面的阻塞队列和消息队列,起到的作用就是可以实现:生产者消费者模型。
1.2.生产者消费者模型介绍
(1)什么是生产者消费者模型
1)A线程进行入队操作,B线程进行出队操作。当队列为空时,B线程需要等待A线程入队,才能从队列中取出元素;当队列满时,A线程需要等待B线程取出元素后,才能继续入队。这里的A线程就相当于生产者,B线程相当于消费者。
2)有一个自动售卖机(相当于阻塞队列/消息队列),商人负责填货(生产者),用户负责买东西(消费者)。
(2)模型的作用
1)可以让程序机械能解耦合操作
2)可以程序“削峰填谷”
解耦合作用举例:
1.如果A和B是互相调用的关系,那么如果A中需要修改,那么B中的大部分都需要同步修改,否则无法互相调用。
2.当A和B中间加了一个队列,那么A与B的交互只需要通过操作队列即可。即使其中一个出现了问题,也不会影响到另外一个。
削峰填谷举例:
1.当服务器直接和客户端交互时,当请求过多时,就会直接导致服务器崩溃。
2.如果在客户端和服务器中间加上一个队列,让他们通过队列进行交互。即使请求再多,也不会影响到服务器,最坏的情况也就是队列崩溃。
所以说,阻塞队列/消息队列,就是可以实现生产者消费者模型的效果。
2.使用标准库中的阻塞队列
现在,我们介绍如何调用标准库中的阻塞队列。
2.1.创建阻塞队列
(1)选择正确的接口
(2)实例化的对象
可以选择的有下面这三个,很明显,它们之间只是基于不同的数据结构进行实现。
这三个,我们都是可以选择的。
(3)队列的操作
因为是队列,我们只需要考虑入队和出队操作即可。
在阻塞队列中,只有这两个是带有阻塞功能的,所以我们只需要使用这两个即可。
因为这两个操作,是带有阻塞功能的,也就是wait,所以使用时需要声明异常。
(4)普通的操作
这里普通的操作指的是在一个线程中进行操作。
程序运行起来,发现没有任何的报错,只是程序仍然不会结束,这就是阻塞功能。
2.2.使用阻塞队列
(1)消费者消费的很慢
当消费者消费慢时,也就是让消费者每次sleep,此时,就会产生生产者在等消费者的过程
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
Thread t1 = new Thread(()->{//负责入队操作
for (int i = 0; i < 5000; i++) {
try {
queue.put(i);
System.out.println("入队:"+i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
Thread t2 = new Thread(()->{//负责出队操作
for (int i = 0; i < 5000; i++) {
try {
int tmp = queue.take();
System.out.println("出队:"+tmp);
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
t2.start();
}
(2)生成者生成的很慢
以上就是对阻塞队列的使用,下面我们自己实现一个阻塞队列
3.模拟实现阻塞队列
要模拟阻塞队列,就需要有入队/出队操作,并且可以进行阻塞等待,并且是线程安全的!
我们先从普通队列开始,然后加上线程安全和阻塞操作,最后进行优化和线程安全Pro版
3.1.实现普通的队列
下面按照循环队列的形式进行创建队列,只提供了入队和出队操作
class MyBlockQueue {
int head = 0;
int tail = 0;
int size = 0;
public String[] elem;
public MyBlockQueue(int capacity) {
elem = new String[capacity];
}
//入队
public void put(String s) throws InterruptedException {
if(size == elem.length) {
return;
}
elem[tail] = s;//队尾入
size++;
if(tail >= elem.length-1) tail=0;
else tail++;
}
}
//出队
public String take() throws InterruptedException {
if(size == 0) {
return null;
}
String tmp = elem[head];
size--;
if(head == elem.length-1) head = 0;
else head++;
return tmp;
}
}
3.2.加上阻塞功能
这里我们使用的是wait而不是sleep。
class MyBlockQueue {
int head = 0;
int tail = 0;
int size = 0;
public String[] elem;
public MyBlockQueue(int capacity) {
elem = new String[capacity];
}
//入队
public void put(String s) throws InterruptedException {
synchronized (this) {
if(size == elem.length) {
System.out.println("队列满,阻塞等待");
this.wait();
}
elem[tail] = s;//队尾入
size++;
if(tail >= elem.length-1) tail=0;
else tail++;
this.notify();//入队一个,唤醒一次
}
}
//出队
public String take() throws InterruptedException {
synchronized (this) {
if(size == 0) {
System.out.println("队列空");
this.wait();
}
String tmp = elem[head];
size--;
if(head == elem.length-1) head = 0;
else head++;
this.notify();//出队一个,唤醒一次
return tmp;
}
}
}
(1)改进1:对入队、出队操作,都进行了加锁操作
(2)改进2:在队列满/空时,不进行return,而是进行阻塞等待;当有一个元素入队/出队之后,就进行唤醒一次。
上述不使用sleep的原因是:sleep是抱着锁使线程进入休眠状态,当此时有其他的操作(入队/出队)时,无法拿到锁,从而无法执行(发生了死锁)
上述还有一个缺点:就是wait不仅仅可以被notify唤醒,还可以被interrupt唤醒,所以要循环进行判断。
class MyBlockQueue {
int head = 0;
int tail = 0;
int size = 0;
public String[] elem;
public MyBlockQueue(int capacity) {
elem = new String[capacity];
}
//入队
public void put(String s) throws InterruptedException {
synchronized (this) {
while (size >=elem.length) {
//循环等待确认,防止不是被notify唤醒
try {
this.wait();
}catch (InterruptedException e) {
e.printStackTrace();
}
}
elem[tail] = s;//队尾入
size++;
if(tail >= elem.length-1) tail=0;
else tail++;
this.notify();
}
}
//出队
public String take() throws InterruptedException {
synchronized (this) {
while (size ==0) {
try {
this.wait();
}catch (InterruptedException e) {
e.printStackTrace();
}
}
String tmp = elem[head];
size--;
if(head == elem.length-1) head = 0;
else head++;
this.notify();
return tmp;
}
}
}
(3)改进3:使用while+wait的方式反复确认是否要被唤醒
3.3.确保线程一定安全
上述线程其实已经很安全了,但是还需要再进一步优化,达到更安全的效果。对于线程安全还有两个:内存可见性问题和指令重排序,所以我们只需要对变量加上volatile关键字即可。
volatile int head = 0;
volatile int tail = 0;
volatile int size = 0;
上述就是一个完整的阻塞队列模拟实现的代码,下面展示完整代码:
class MyBlockQueue {
/* int head = 0;
int tail = 0;
int size = 0;*/
volatile int head = 0;
volatile int tail = 0;
volatile int size = 0;
public String[] elem;
public MyBlockQueue(int capacity) {
elem = new String[capacity];
}
//入队
public void put(String s) throws InterruptedException {
synchronized (this) {
/* if(size == elem.length) {
System.out.println("队列满,阻塞等待");
this.wait();
}*/
while (size >=elem.length) {
//循环等待确认,防止不是被notify唤醒
try {
this.wait();
}catch (InterruptedException e) {
e.printStackTrace();
}
}
elem[tail] = s;//队尾入
size++;
if(tail >= elem.length-1) tail=0;
else tail++;
this.notify();
}
}
//出队
public String take() throws InterruptedException {
synchronized (this) {
/* if(size == 0) {
System.out.println("队列空");
this.wait();
}*/
while (size ==0) {
try {
this.wait();
}catch (InterruptedException e) {
e.printStackTrace();
}
}
String tmp = elem[head];
size--;
if(head == elem.length-1) head = 0;
else head++;
this.notify();
return tmp;
}
}
}
测试代码:
public static void main(String[] args) throws InterruptedException {
MyBlockQueue myBlockQueue = new MyBlockQueue(10);
Thread t1 = new Thread(()->{
for (int i = 0; i < 20; i++) {
try {
System.out.println("生成1:");
myBlockQueue.put("1");
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
Thread t2 = new Thread(()->{
for (int i = 0; i < 30; i++) {
String tmp = null;
try {
tmp = myBlockQueue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("消费:"+tmp);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
t2.start();
}
测试结果:
结果是可以的,和标准库中的阻塞队列基本一致。
几个注意事项: