一、什么是堵塞队列
堵塞队列(Blocking Queue)是一种特殊类型的队列,它具有一些特定的行为和限制。在堵塞队列中,当队列为空时,尝试从队列中取出元素的操作将会被阻塞,直到队列中有可用元素;当队列已满时,尝试向队列中添加元素的操作也会被阻塞,直到队列有空闲位置。
堵塞队列常用于多线程编程场景中,用于实现线程间的安全通信和协调。它提供了一种简单而有效的方式来实现线程的等待和唤醒机制,使得线程能够在合适的时机进行阻塞或继续执行。
常见的堵塞队列实现包括:
- ArrayBlockingQueue:基于数组的有界堵塞队列。
- LinkedBlockingQueue:基于链表的可选有界(默认无界)堵塞队列。
- PriorityBlockingQueue:支持优先级排序的堵塞队列。
- SynchronousQueue:一个不存储元素的特殊堵塞队列,用于实现线程间的直接传输。
使用堵塞队列可以有效地解决线程间的同步和通信问题,简化了多线程编程的复杂性。
阻塞队列的典型应用场景:生产者消费者模型
生产者消费者模型是一种常见的多线程协调工作的模式:生产者和消费者之间通过阻塞队列进行通讯,生产者生产出数据不用等待消费者来处理,而是会直接放入到阻塞队列中;消费者也不找生产者索要数据,而是直接从阻塞队列中取。
生产者消费者模型的作用:
- 有利于代码“解耦合”
生产者不用关心消费者的存在,消费者也不需关心生产者的存在,通过阻塞队列可以降低二者的关联关系。 - “削峰填谷”
阻塞队列相当于一个“缓冲区”,平衡了生产者和消费者的处理能力。
二、 代码实例
public static void main(String[] args) {
BlockingDeque<Integer> blockingDeque = new LinkedBlockingDeque<>();
Thread t1 = new Thread(() -> {
while (true) {
try {
Integer take = blockingDeque.take();
System.out.println("消费: " + take);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
Thread t2 = new Thread(() -> {
int value = 0;
while (true) {
try {
System.out.println("生产: " + value);
blockingDeque.put(value);
value++;
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t2.start();
}
结果:
三、模拟实现堵塞队列
注意:
1.我们刚才实现的put和take方法的几乎每一行代码都涉及到读写操作,所以在多线程环境下,这种非原子操作一定是不安全的,我们需要对这两个方法进行加锁,并用volatile修饰相应的变量,来保证线程安全;
2.我们需要实现两个阻塞功能,可以使用wait和notify来进行线程阻塞和唤醒线程的操作;
3.wait方法是可以被其他方法(比如interrupt方法)提前唤醒的,此时队列还是空或满的状态,那么继续执行代码仍然会出错,所以当线程被提前唤醒时,我们需要再次判断队列是否为空或满的状态,如果是,则继续等待,如果不是,则执行后面的逻辑,因此,使用while循环判断就可以解决上述问题~
public class MyBlockingQueue {
private int[] array = new int[100];
private volatile int head;//队首元素下标
private volatile int tail;//队尾元素下标
private volatile int size;//有效元素个数
synchronized public void put(int elem) throws InterruptedException {
//判断队列是否为满
while (size == array.length){
this.wait();
}
//插入元素
array[tail] = elem;
tail++;
//循环队列
//队尾元素下标如果超出数组长度,则从头开始存储
if(tail == array.length){
tail = 0;
}
size++;
this.notify();
}
synchronized public int take() throws InterruptedException {
//判断队列是非为空
while (size == 0){
this.wait();
}
//取出元素
int value = array[head];
head++;
//队首元素下标如果超出数组长度,则从头开始
if(head == array.length){
head = 0;
}
size--;
this.notify();
return value;
}
}