DelayQueue并发队列是一个无界阻塞延迟队列,队列中的每个元素都有个过期时间,当从队列获取元素时,只有过期元素才会出队列。队列头元素是最快要过期的元素。
DelayQueue类图结构
由该图可知,DelayQueue内部使用PriorityQueue存放数据,使用ReentrantLock实现线程同步。
另外,队列里面的元素要实现Delayed接口,由于每个元素都有一个过期时间,所以要实现获知当前元素还剩下多少时间就过期了的接口,由于内部使用优先级队列来实现,所以要实现元素之间相互比较的接口。
在如下代码中,条件变量available与lock锁是对应的,其目的是为了实现线程间同步。
private final Condition available =lock.newCondition();
其中leader变量的使用基于Leader-Follower模式的变体,用于尽量减少不必要的线程等待。
当一个线程调用队列的take方法变为leader线程后,它会调用条件变量available.awaitNanos(delay)等待delay时间,但是其他线程(follwer线程)则会调用available.await)进行无限等待。
leader线程延迟时间过期后,会退出take方法,并通过调用available.signal)方法唤醒一个follwer线程,被唤醒的follwer线程被选举为新的leader线程。
主要函数原理讲解
offer操作
插入元素到队列,如果插入元素为null则抛出NullPointerException异常,否则由于是无界队列,所以一直返回true。
插入元素要实现Delayed接口。
如上代码首先获取独占锁,然后添加元素到优先级队列,由于q是优先级队列,所以添加元素后,调用q.peek()方法返回的并不一定是当前添加的元素。
如果代码(2)判断结果为true,则说明当前元素e是最先将过期的,那么重置leader线程为null,这时候激活avaliable变量条件队列里面的一个线程,告诉它队列里面有元素了。
take操作
获取并移除队列里面延迟时间过期的元素,如果队列里面没有过期元素则等待。
如上代码首先获取独占锁lock。假设线程A第一次调用队列的take()方法时队列为空,则执行代码(1)后first==null,所以会执行代码(2)把当前线程放入available的条件队列里阻塞等待。
当有另外一个线程B执行offer(item)方法并且添加元素到队列时,假设此时没有其他线程执行入队操作,则线程B添加的元素是队首元素,那么执行q.peek()。
e这时候就会重置leader线程为null,并且激活条件变量的条件队列里面的一个线程。
此时线程A就会被激活。
线程A被激活并循环后重新获取队首元素,这时候first就是线程B新增的元素,可知这时候first不为null,则调用first.getDelay(TimeUnit.NANOSECONDS)方法查看该元素还剩余多少时间就要过期,如果delay<=0则说明已经过期,那么直接出队返回。
否则查看leader是否为null,不为null则说明其他线程也在执行take,则把该线程放入条件队列。
如果这时候leader为null,则选取当前线程A为leader线程,然后执行代码(5)等待delay时间(这期间该线程会释放锁,所以其他线程可以offer添加元素,也可以take阻塞自己),剩余过期时间到后,线程A会重新竞争得到锁,然后重置leader线程为null,重新进入循环,这时候就会发现队头的元素已经过期了,则会直接返回队头元素。
在返回前会执行finally块里面的代码(7),代码(7)执行结果为true则说明当前线程从队列移除过期元素后,又有其他线程执行了入队操作,那么这时候调用条件变量的singal方法,激活条件队列里面的等待线程。
poll操作
获取并移除队头过期元素,如果没有过期元素则返回null。
size操作
计算队列元素个数,包含过期的和没有过期的。