文章目录
- 阻塞队列
- 1.生产者-消费者模式
- 生产者消费者模型的意义:
- 1.解耦合
- 2.削峰填谷:
- 2.阻塞队列的使用
- BlockingQueue
- 3.实现阻塞队列
- 唤醒:
- 使用阻塞队列实现生产者消费者模型
阻塞队列
阻塞队列是一种特殊的队列:
-
1.是线程安全的。
-
2.带有阻塞特性
如果队列为空,继续出队列,就会发生阻塞。直到其他线程往队列中添加队列为止
如果队列为满,继续入队列, 也会发生阻塞,直到其他线程从队列中取走元素为止
阻塞队列可以来实现生产者-消费者模型。
1.生产者-消费者模式
生产者:把生产出来的内容,放到阻塞队列中。
消费者:从阻塞队列中获取内容。
生产者消费者模型的意义:
1.解耦合
两个模块联系越紧密,耦合就越高。对于分布式系统来说,更加有意义。
可以使用生产者-消费者模型,实现解耦合的效果。
2.削峰填谷:
峰:短时间内,请求量比较多时。
谷:请求量比较少时。
在这种情况下:高峰时段,一旦客户端发起的请求量非常多时,每个A收到的请求,都会立即发给B。此时,A和B的访问量是相同的。但是在实际上,由于不同的服务器,上面跑的业务不同。虽然访问量一样,单个访问,消耗的硬件资源是不一样的。可能服务器A可以承担这些并发量,但是服务器B承担不了,就会挂掉。
在引入了生产者-消费者模型之后,就会解决这类问题。
- 当服务器A收到了大量请求之后,A会把对应的请求写入到队列中。B仍然按照之前的节奏来处理请求。(削峰)
- 一般情况下,峰值不会持续存在,峰值过后,A的请求量就会恢复正常、甚至减低。服务器B就可以在此时,逐渐把积压的请求给处理掉。(填谷)。
2.阻塞队列的使用
BlockingQueue
-
BlockingQueue是一个具体的接口,所以需要new一个具体的实现。
-
同时BlockingQueue继承自Queue。也可以使用Queue的方法(没有阻塞属性)
1.基于数组实现
2.基于链表实现
-
BlockingQueue带有阻塞的方法:
put:阻塞式入队列
take:阻塞式出队列
没有提供阻塞式获取队首元素的方法。
public static void main(String[] args) {
// BlockingQueue<String> queue = new ArrayBlockingQueue<>();
BlockingQueue<String>queue = new LinkedBlockingQueue<>();
queue.put("111");
queue.put("222");
queue.put("333");
queue.put("444");
String elem = queue.take();
System.out.println(elem);
elem = queue.take();
System.out.println(elem);
elem = queue.take();
System.out.println(elem);
elem = queue.take();
System.out.println(elem);
}
3.实现阻塞队列
给一个普通的队列加上线性安全和阻塞
对入队和出队的方法进行加锁。对数据的修改实现原子性操作,保证线程安全
- put入队的时候,如果队列满了,就进行阻塞(wait)
在出队的时候,当size–后,队列中有位置了,调用notify()方法,对阻塞的put方法进行唤醒。
- 同样的,如果take出队列时,队列为空的话,也需要进行阻塞。
在入队时,当size++后,队列不为空了,调用notify()方法,对阻塞的take方法进行唤醒。
-
一个队列的阻塞情况,要么为空、要么为满。
put和take只有一边能阻塞。如果put阻塞了,其他线程继续调用put,也会进行阻塞。只有靠take来唤醒。
take阻塞,其他线程继续调用take也会进行阻塞,只能靠put来唤醒。
唤醒:
wait方法除了使用notify()方法进行唤醒,还可以通过interrupt()方法,来中断wait的状态。
使用interrupt方法唤醒的时候,会出现InterruptedException异常
public void put(String elem) throws InterruptedException {
}
因为是throws抛出的异常,执行到interrupt()方法后,整个方法就会结束。
public void put(String elem) {
synchronized (this) {
if (size == data.length) {
try {
this.wait();
}catch (InterruptedException e){
}
}
data[tail] = elem;
tail++;
if (tail == data.length) {
tail = 0;
}
size++;
this.notify();
}
- 如果是try-catch来处理异常。如果出现异常,程序仍会继续执行下去。在满队列的情况下。强行修改,会覆盖掉tail的值,并且size会超出数组长度。
使用wait时,要考虑wait是notify唤醒的,还是通过Interrupt唤醒的。在wait返回时,还要进行判断wait执行的条件符不符合。可以直接将wait写在while循环中。循环的条件就是wait执行的条件。使wait在唤醒之后,再确定一下,条件是否满足。
while (size == data.length) {
//队列满了,就会进行堵塞
this.wait();
}
- 最终再通过volatile修饰要频繁修改的变量,避免出现内存可见性问题。
class MyBlockingQueue {
private String[] data = new String[1000];
private volatile int head = 0;
//队列起始位置
private volatile int tail = 0;
//队列结束位置的下一个元素。
private volatile int size = 0;
//队列中有效元素个数
//入队
public void put(String elem) throws InterruptedException {
synchronized (this) {
while (size == data.length) {
//队列满了,就会进行堵塞
this.wait();
}
//队列没满,向队列添加元素
data[tail] = elem;
tail++;
if (tail == data.length) {
//满了之后,环形队列要回到开头。
tail = 0;
}
size++;
this.notify();
//唤醒take中的wait
}
}
//出队
public String take() throws InterruptedException {
synchronized (this) {
while (size == 0) {
//队列为空时
this.wait();
}
//队列不空时,把队首head位置删除
String ret = data[head];
head++;
if (head == data.length) {
head = 0;
}
size--;
this.notify();
//唤醒put中的wait.
return ret;
}
}
}
使用阻塞队列实现生产者消费者模型
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue();
//消费者
Thread t1 = new Thread(() -> {
while (true){
try {
String res = queue.take();
System.out.println("消费元素: "+res);
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
//生产者
Thread t2 = new Thread(() -> {
int num = 1;
while (true){
try {
queue.put(num+" ");
System.out.println("生产元素:"+num);
num++;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
t2.start();
}
生产元素:1001
生产元素:1002
消费元素: 2
消费元素: 3
生产元素:1003
消费元素: 4
生产元素:1004
- 生产者快速生产了1000多个,消费者才消耗几个。队列填满之后,生产者进入了阻塞。直到消费者消费了之后,才会进行生产。消费一个生产一个。
点击移步博客主页,欢迎光临~