阻塞队列
字面意思,带有阻塞功能的队列,满足队列先进先出的性质
作用:
1.如果队列为空,此时执行出队列操作,就会阻塞,直到往此队列里添加元素为止(队列不为空)
2.如果队列为满,此时执行入队列操作,也会阻塞,直到往此队列删元素(队列不为满)。
标准库提供的阻塞队列
//标准库提供的阻塞队列 BlockingQueue<Integer> queue = new LinkedBlockingDeque<>(); //带有优先级的阻塞队列 底层实现----堆 PriorityBlockingQueue<Integer> queue1 = new PriorityBlockingQueue<>(); //基于链表实现的阻塞队列 LinkedBlockingDeque<Integer> queue2 = new LinkedBlockingDeque<>(); //基于数组实现的阻塞队列 ArrayBlockingQueue<Integer> queue3 = new ArrayBlockingQueue<>(10); //需要给出数组容量大小
队列空阻塞
//标准库提供的阻塞队列 BlockingQueue<Integer> queue = new LinkedBlockingDeque<>(); //普通队列方法 offer poll peek //阻塞队列方法 put take --->带有阻塞功能 queue.put(3); queue.take(); //此时队列已为空 System.out.println("此时队列为空"); queue.take();
队列满阻塞
//标准库提供的阻塞队列 BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3); //普通队列方法 offer poll peek //阻塞队列方法 put take --->带有阻塞功能 queue.put(3); queue.put(3); queue.put(3); //此时队列已为满 System.out.println("此时队列为满"); //阻塞 queue.put(3);
模拟实现阻塞队列(数组实现)
public class myBlockingQueue { //模拟实现阻塞队列 --- 基于数组实现 private int[] elem; private int size; private int head; private int tail; private Deque<Integer> queue = null; public myBlockingQueue(int length){ queue = new LinkedList<>(); elem = new int[length]; } //入队列 public void put(int value) throws InterruptedException { //wait/notify 必须和synchronized成对使用 synchronized (this){ //判断是否为满 if(elem.length == size){ //阻塞 this.wait(); } elem[tail++] = value; if(tail >= elem.length){ tail = 0; } size++; //这个notify是唤醒take操作的wait this.notify(); } } //出队列 public void take() throws InterruptedException { synchronized (this){ if(size == 0){ //阻塞 this.wait(); } int result = elem[head++]; if(head >= elem.length){ head = 0; } //这个notify是唤醒put操作的wait size--; this.notify(); } } public static void main(String[] args) throws InterruptedException { myBlockingQueue m = new myBlockingQueue(3); m.put(1); m.take(); System.out.println("队列阻塞"); m.take(); } }
需要注意的点:
1.wait/notify操作必须和synchroinzed成对出现,否则会报异常
2.搞清楚wait和notify的位置,wait操作出现在当队列为空还取出队或者队列为满还入元素的时候,take操作的notify操作唤醒的是put操作的wait,put操作的notify唤醒的是take操作的wait操作
生产者消费者模型
public static void main(String[] args) { BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque<>(); Thread producer = new Thread(() -> { for(int i = 0; i < 10; i++){ try { //让控制台打印慢点,结果更明了 Thread.sleep(1000); System.out.print("生产:"+i+" "); blockingQueue.put(i); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread customer = new Thread(() -> { for(int i = 0; i < 10; i++){ try { Thread.sleep(1000); System.out.print("消费:"+i+" "); blockingQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } }); producer.start(); customer.start(); }