一、阻塞队列介绍
1、队列
队列入队从队首开始添加,直至队尾;出队从队首出队,直至队尾,所以入队和出队的顺序是一样的
Queue接口
- add(E) :在指定队列容量条件下添加元素,若成功返回true,若当前队列没有可用空间抛出IllegalStateException异常
- offer(E):在指定队列容量条件下添加元素,若成功返回true,若当前队列没有可用空间返回false
- remove():返回并删除此队列的头部元素,若队列为空会抛出异常
- poll():返回并删除此队列的头部元素,若队列为空会返回null
- element():返回头部元素,但不删除,队列为空会抛出异常
- peek():返回头部元素,但不删除,队列为空返回null
2、阻塞队列
BlockingQueue规范定义了添加和删除阻塞队列的方法,很多阻塞队列都是基于BlockingQueue实现的,具体原理:当阻塞队列插入数据时,如果队列已满,线程会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程会阻塞等待直到队列非空
1)BlockingQueue接口
- put():将指定元素插入队列,如果必要等待队列空间变为可用
- take():返回并删除队列中的头部元素,如果必要直到等待某个元素可用
- offer(E, long, TimeUnit):将指定的元素插入此队列,指定的等待时间等待必要的可用空间
- poll(long, TimeUnit):返回并删除此队列的头部元素,指定的等待时间,直到等待某个元素可用
2)应用场景
- 线程池:线程池中线程创建的个数超过核心线程数,会放入到等待队列中,如果队列空了,核心线程又没有要处理的任务,会进入等待,直到队列中有新的任务
- 生产者-消费者模式:当生产者线程发现队列满了会陷入等待,直到有消费者线程进行消费并唤醒生产者线程;当消费者线程发现队列中没有可处理消息会陷入等待,直到生产者线程进行生产并唤醒消费者线程,阻塞队列可以避免线程间的竞争
- 消息队列:可以把消息放到队列中,进行消息的异步处理
- 缓存系统:使用contains()方法判断是否包含某个元素,利用阻塞队列来缓存数据,避免多线程更新缓存的竞争
- 并发任务处理:将任务提交到队列中,消费之后出队,避免重复消费
3、JUC包下的阻塞队列
二、ArrayBlockingQueue
ArrayBlockingQueue采用Object数组方式存储数据,创建ArrayBlockingQueue必须指定容量大小,属于有界队列,采用ReentrantLock保证线程安全,如果生产速度和消费速度基本匹配的情况下,使用ArrayBlockingQueue是个不错选择
1、使用
public class ArrayBlockingQueueTest {
private static final int QUEUE_CAPACITY = 5;
private static final int PRODUCER_DELAY_MS = 1000;
private static final int CONSUMER_DELAY_MS = 2000;
public static void main(String[] args) throws InterruptedException {
// 创建一个容量为QUEUE_CAPACITY的阻塞队列
BlockingQueue<String> queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
// 创建一个生产者线程
Runnable producer = () -> {
while (true) {
try {
// 在队列满时阻塞
queue.put("producer");
System.out.println("生产了一个元素,队列中元素个数:" + queue.size());
Thread.sleep(PRODUCER_DELAY_MS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
new Thread(producer).start();
// 创建一个消费者线程
Runnable consumer = () -> {
while (true) {
try {
// 在队列为空时阻塞
String element = queue.take();
System.out.println("消费了一个元素,队列中元素个数:" + queue.size());
Thread.sleep(CONSUMER_DELAY_MS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
new Thread(consumer).start();
}
}
生产者少休眠1s,生产的快,当生产者添加第六个元素时会陷入等待
2、源码分析
- items:数组元素数组
- takeIndex:下一个待取出元素索引
- putIndex:下一个待添加元素索引
- count:元素个数
- lock:内置锁
- notEmpty:消费者
- notFull:生产者
入队详解
https://www.processon.com/view/link/64c8c537b9f7806c73dadbb4
出队详解
https://www.processon.com/view/link/64c8c92fb9f7806c73daea85
为什么ArrayBlockingQueue对数组操作要设计成双指针?
如果用一个指针,对数组的删除或者添加操作,数组中的元素都要往前或者往后移动,这样导致时间复杂度为O(n),而使用双指针可以前移后移,可以提升操作的性能,时间复杂度为O(1)
三、LinkedBlockingQueue
LinkedBlockingQueue是基于链表实现的阻塞队列,队列默认大小为Integer.MAX_VALUE,由于这个数值比较大,LinkedBlockingQueue也被称为无界队列,LinkedBlockingQueue每个元素都会占用内存,为防止OOM还是设置一个队列大小
1、使用
和ArrayBlockingQueue使用基本差不多
- LinkedBlockingQueue():队列大小为2的32次方减1
- LinkedBlockingQueue(Collection<? extends E>):队列大小为2的32次方减1,按照传入集合初始化队列数据
- LinkedBlockingQueue(int):传入参数指定队列大小
2、源码分析
相比ArrayBlockingQueue读写只一把独占锁的实现,LinkedBlockingQueue读写分了两把锁
- item:元素存储的数据
- next:下一个节点,单项链表结构
- capacity:队列容量
- count:元素数量
- head:链表表头
- last链表表尾
- takeLock:出队操作竞争的锁对象
- notEmpty:当队列无元素时,会让进行takeLock的线程陷入等待,直到有线程唤醒
- putLock:入队操作竞争的锁对象
- notFull:当队列满了,会让进行putLock的线程陷入等待,直到有线程唤醒
初始化LinkedBlockingQueue对象时,会创建一个属性item为null的Node对象
入队详解
https://www.processon.com/view/link/64c8f6d5b9f7806c73db4fc9
出队详解
https://www.processon.com/view/link/64c8ff0e7807695f1493090f
3、LinkedBlockingQueue和ArrayBlockingQueue对比
- 队列大小:ArrayBlockingQueue必须指定容量大小,LinkedBlockingQueue可以不指定,LinkedBlockingQueue如果添加比删除快会导致OOM
- 数组存储容器不同:ArrayBlockingQueue采用数组存储数据,LinkedBlockingQueue采用对象链表方式存储数据;就因为会产生Node对象,并发量大时会对gc产生较大的影响
- ArrayBlockingQueue添加和删除都是争抢同一个锁资源,LinkedBlockingQueue添加和删除进行了锁分离,LinkedBlockingQueue高并发场景下可以并行的进行入队和出队操作
四、DelayQueue
可以使用队列消息延迟消费,实现接口回调通知、token超时失效、订单超时失效
1、使用
public class DelayQueueTest {
public static void main(String[] args) throws InterruptedException {
DelayQueue<Order> delayQueue = new DelayQueue<>();
delayQueue.put(new Order("order1", System.currentTimeMillis(), 5000));
delayQueue.put(new Order("order2", System.currentTimeMillis(), 2000));
delayQueue.put(new Order("order3", System.currentTimeMillis(), 3000));
while (!delayQueue.isEmpty()) {
Order take = delayQueue.take();
System.out.println("处理订单:"+take.getOrderId());
}
}
static class Order implements Delayed {
private String orderId;
private long createTime;
private long delayTime;
public Order(String orderId, long createTime, long delayTime) {
this.orderId = orderId;
this.createTime = createTime;
this.delayTime = delayTime;
}
public String getOrderId() {
return orderId;
}
@Override
public long getDelay(TimeUnit unit) {
// 订单创建时间+延迟时间-当前时间=剩余延迟时间
long diff = createTime + delayTime - System.currentTimeMillis();
return unit.convert(diff, unit);
}
@Override
public int compareTo(Delayed o) {
// 比较两个订单之间差多长时间
long diff = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
return Long.compare(diff, 0);
}
}
}
2、源码分析
- lock:用于保证线程安全
- q: 优先级队列,存储元素,用于保证延迟低的优先执行
- leader:用于标记当前是否有线程在排队(仅用于取元素时) leader 指向的是第一个从队列获取元素阻塞的线程
- available:条件,用于表示现在是否有可取的元素 当新元素到达,或新线程可能需要成为leader时被通知
入队详解
https://www.processon.com/view/link/64c91879470d721c4e3be985
出队详解
https://www.processon.com/view/link/64c9185fc1af4746895281e7
五、如何选择适合的阻塞队列
1、选择策略
通常我们可以从以下 5 个角度考虑,来选择合适的阻塞队列:
功能
第 1 个需要考虑的就是功能层面,比如是否需要阻塞队列帮我们排序,如优先级排序、延迟执行等。如果有这个需要,我们就必须选择类似于 PriorityBlockingQueue 之类的有排序能力的阻塞队列。
容量
第 2 个需要考虑的是容量,或者说是否有存储的要求,还是只需要“直接传递”。在考虑这一点的时候,我们知道前面介绍的那几种阻塞队列,有的是容量固定的,如 ArrayBlockingQueue;有的默认是容量无限的,如 LinkedBlockingQueue;而有的里面没有任何容量,如 SynchronousQueue;而对于 DelayQueue 而言,它的容量固定就是 Integer.MAX_VALUE。所以不同阻塞队列的容量是千差万别的,我们需要根据任务数量来推算出合适的容量,从而去选取合适的 BlockingQueue。
能否扩容
第 3 个需要考虑的是能否扩容。因为有时我们并不能在初始的时候很好的准确估计队列的大小,因为业务可能有高峰期、低谷期。如果一开始就固定一个容量,可能无法应对所有的情况,也是不合适的,有可能需要动态扩容。如果我们需要动态扩容的话,那么就不能选择 ArrayBlockingQueue ,因为它的容量在创建时就确定了,无法扩容。相反,PriorityBlockingQueue 即使在指定了初始容量之后,后续如果有需要,也可以自动扩容。所以我们可以根据是否需要扩容来选取合适的队列。
内存结构
第 4 个需要考虑的点就是内存结构。我们分析过 ArrayBlockingQueue 的源码,看到了它的内部结构是“数组”的形式。和它不同的是,LinkedBlockingQueue 的内部是用链表实现的,所以这里就需要我们考虑到,ArrayBlockingQueue 没有链表所需要的“节点”,空间利用率更高。所以如果我们对性能有要求可以从内存的结构角度去考虑这个问题。
性能
第 5 点就是从性能的角度去考虑。比如 LinkedBlockingQueue 由于拥有两把锁,它的操作粒度更细,在并发程度高的时候,相对于只有一把锁的 ArrayBlockingQueue 性能会更好。另外,SynchronousQueue 性能往往优于其他实现,因为它只需要“直接传递”,而不需要存储的过程。如果我们的场景需要直接传递的话,可以优先考虑 SynchronousQueue。
2、线程池对于阻塞队列的选择
线程池有很多种,不同种类的线程池会根据自己的特点,来选择适合自己的阻塞队列。
Executors类下的线程池类型:
- FixedThreadPool(SingleThreadExecutor 同理)选取的是 LinkedBlockingQueue
- CachedThreadPool 选取的是 SynchronousQueue
- ScheduledThreadPool(SingleThreadScheduledExecutor同理)选取的是延迟队列