- 何为阻塞队列
- 当阻塞队列是空时,从队列中获取元素的操作将被阻塞
- 当阻塞队列是满时,往队列中添加元素将会被阻塞
- 试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他线程往空的队列中插入新的元素
- 试图往满的队列中,添加新的元素的线程也会被阻塞,直到其他线程从队列中移除一个或多个元素或完全清空队列后使队列重新变得空闲起来后并后续新增
-
为什么用?有什么好处?
在多线程领域:所谓阻塞,在某些情况下会刮起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒。阻塞队列的好处是,我们不需要关心什么时候需要阻塞线程,什么时候唤醒线程。在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们程序带来不小的复杂度。 -
队列的实现
-
核心方法
- 抛出异常组
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
// Exception in thread "main" java.lang.IllegalStateException: Queue full
// 超过容量,抛出异常
// System.out.println(blockingQueue.add("d"));
// 检查元素 顶端元素
System.out.println(blockingQueue.element());
// 按顺序删除
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
- 返回特殊值
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("d")); // false 不会抛出异常
// 检查队列顶部元素
System.out.println(blockingQueue.peek());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll()); // 没有第四个 获取null
- 一直阻塞
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
System.out.println("=======================");
// blockingQueue.put("d"); // 放第四个的时候,没有位置了,线程阻塞,等待空位置
blockingQueue.take();
blockingQueue.take();
blockingQueue.take();
blockingQueue.take(); // 取第四个的时候,队列里还没有,线程阻塞
- 超时阻塞
// 立即插入,不用等待2秒
System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L,TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L,TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L,TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L,TimeUnit.SECONDS)); // 阻塞两秒,返回null
- SynchronousQueue
// 放一个,取了再放一个,如果不取则不放,队列里面只能有一个
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() +"\t put 1 ");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName() +"\t put 2 ");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName() +"\t put 3 ");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"aaa").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "\t" + blockingQueue.take());
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "\t" + blockingQueue.take());
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "\t" + blockingQueue.take());
} catch (InterruptedException e) {
}
},"bbb").start();