文章目录
- 阻塞队列
- 自己实现一个阻塞队列(三步)
- 标准库中的阻塞队列
- 使用阻塞队列的优势
- 生产者消费者模型
阻塞队列
队列(Queue)是我们熟悉的一个数据结构,它是“先进先出”的。但是并不是所有的队列都是“先进先出”的,比如:
优先级队列(PriorityQueue):基于自己的比较规则,拿出相应的值。
消息队列(MQ):在队列中引入一个“类型”,出队列的时候会指定某个类型的元素先出。
而我们今天学习的阻塞队列,它是一个“先进先出”的队列,但又有一些其他特点:
- 线程安全的。
- 带有阻塞功能:
如果队列满,继续入队列,入队列操作就会阻塞,直到队列不满,入队列操作才能完成。
如果队列空,继续出队列,出队列操作就会阻塞,直到队列不空,出队列操作才能完成。
自己实现一个阻塞队列(三步)
- 实现基本队列
class MyBlockingQueue{
public int[] items = new int[100];
public int head = 0;
public int tail = 0;
public int size = 0;
//入队列
public void put ( int key) {
// 插入操作
items[tail] = key;
tail++;
if (tail >= items.length) {
tail = 0;
}
size++;
}
//出队列
public Integer take() {
// 删除操作
int ret = items[head];
head++;
if (head >= items.length) {
head = 0;
}
size--;
return ret;
}
}
- 保证线程安全
class MyBlockingQueue{
public int[] items = new int[100];
public int head = 0;
public int tail = 0;
public int size = 0;
//入队列
public void put ( int key) {
synchronized (this) {
// 插入操作
items[tail] = key;
tail++;
if (tail >= items.length) {
tail = 0;
}
size++;
}
}
//出队列
public Integer take() {
synchronized (this) {
// 删除操作
int ret = items[head];
head++;
if (head >= items.length) {
head = 0;
}
size--;
return ret;
}
}
}
- 实现阻塞功能
class MyBlockingQueue{
public int[] items = new int[100];
public int head = 0;
public int tail = 0;
public int size = 0;
//入队列
public void put ( int key) throws InterruptedException {
synchronized (this) {
//当wait被唤醒之后 size还有可能为满 所以不能一唤醒了就直接去使用 得再次判断条件是否满足
// 比如:1.抛出异常 2. 三个线程同时执行 其中两个线程竞争锁 没竞争到锁的线程被唤醒之后size还为空
while(size >= items.length){
this.wait();
}
//判断队列是否为满 满了不能插入 就阻塞
// if (size >= items.length){
// this.wait();
// }
// 插入操作
items[tail] = key;
tail++;
if (tail >= items.length) {
tail = 0;
}
size++;
this.notify();
}
}
//出队列
public Integer take() throws InterruptedException {
synchronized (this) {
//当wait被唤醒之后 size还有可能为空 所以不能一唤醒了就直接去使用 得再次判断条件是否满足
while(size <= 0){
this.wait();
}
//判断队列是否为空 空了不能删除 就阻塞
// if (size <= 0){
// this.wait();
// }
// 删除操作
int ret = items[head];
head++;
if (head >= items.length) {
head = 0;
}
size--;
this.notify();
return ret;
}
}
}
标准库中的阻塞队列
//标准库的阻塞队列
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque<>(2);
//带有阻塞功能的入队列
blockingQueue.put(1);
blockingQueue.put(2);
blockingQueue.take();
blockingQueue.put(3);
//带有阻塞功能的出队列
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
blockingQueue.put(4);
System.out.println(blockingQueue.take());
blockingQueue.put(4);
}
使用阻塞队列的优势
- 有利于代码解耦合
- 削峰填谷
在服务器A流量激增的情况下,通过阻塞队列能够进行限流。使服务器B、C、D能够不受干扰、平稳运行。
生产者消费者模型
生产者消费者模型:描述的是多线程协同工作的一种方式。借助阻塞队列实现。
//生产者-消费者模型
public static void main(String[] args) {
MyBlockingQueue myBlockingQueue = new MyBlockingQueue();
Thread thread = new Thread(() -> {
int n = 1;
while (true){
try {
myBlockingQueue.put(n);
} catch (InterruptedException e) {
e.printStackTrace();
}
n++;
}
});
Thread thread1 = new Thread(() -> {
while (true){
try {
System.out.println(myBlockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start();
thread1.start();
}