ArrayBlockingQueue与LinkedBlockingQueue底层原理
在线程池中,等待队列采用ArrayBlockingQueue或者LinkedBlockingDeque,那他们是怎么实现存放线程、阻塞、取出的呢?
一、ArrayBlockingQueue底层原理
1.1 简介
ArrayBlockingQueue是一个阻塞的队列,继承了AbstractBlockingQueue,间接的实现了Queue接口和Collection接口。 底层以数组的形式保存数据,所以它是基于数组的阻塞队列。ArrayBlockingQueue是有边界值的,在创建ArrayBlockingQueue时就要确定好该队列的大小,一旦创建,该队列大小不可更改。
内部的全局锁是使用的ReentrantLock
。
1.2 关系图谱
1.3 父类BlockingQueue的方法梳理
public interface BlockingQueue<E> extends Queue<E> {
//将对象塞入队列,如果塞入成功返回true, 否则返回false。
boolean add(E e);
//将对象塞入到队列中,如果设置成功返回true, 否则返回false
boolean offer(E e);
//将元素塞入到队列中,如果队列中已经满了,
//则该方法会一直阻塞,直到队列中有多余的空间。
void put(E e) throws InterruptedException;
//将对象塞入队列并设置时间
//如果塞入成功返回 true, 否则返回 false.
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
//从队列中取对象,如果队列中没有对象,
//线程会一直阻塞,直到队列中有对象,并且该方法取得了该对象。
E take() throws InterruptedException;
//在给定的时间里,从队列中获取对象,
//时间到了直接调用普通的poll方法,为null则直接返回null。
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
//获取队列中剩余长度。
int remainingCapacity();
//从队列中移除指定的值。
boolean remove(Object o);
//判断队列中包含该对象。
public boolean contains(Object o);
//将队列中对象,全部移除,并加到传入集合中。
int drainTo(Collection<? super E> c);
//指定最多数量限制将队列中对,全部移除,并家到传入的集合中。
int drainTo(Collection<? super E> c, int maxElements);
}
1.4 ArrayBlockingQueue源码解析
1.4.1 参数解释
/** 队列中存放的值 */
final Object[] items;
/** 值的索引,这是取出位置的索引*/
int takeIndex;
/** 值的索引,这是插入位置的索引*/
int putIndex;
/** 队列中有多少个元素 */
int count;
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes 取出时枷锁 */
private final Condition notEmpty;
/** Condition for waiting puts 存入时枷锁*/
private final Condition notFull;
1.4.2 构造方法
/**
* capacity 表示数组中最大容量,默认使用非公平锁
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
* capacity 表示数组中最大容量
* fair 为 false 时使用非公平锁,true 时使用公平锁
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
/**
* capacity 表示数组中最大容量
* fair 为 false 时使用非公平锁,true 时使用公平锁
* c 初始化时,可以加入将我们有的集合加入该队列中
*/
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e); //判空
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
1.4.3 存入数据的put方法
public void put(E e) throws InterruptedException {
//判空
checkNotNull(e);
//显示锁
final ReentrantLock lock = this.lock;
//可中断锁
lock.lockInterruptibly();
try {
//判断队列元素是否以及满了,满了就阻塞,如果队列满了await 是阻塞队列
while (count == items.length)
notFull.await();
//队列未满,入队方法
enqueue(e);
} finally {
//释放锁
lock.unlock();
}
}
数据入队方法
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
//判断队列是否以及满了
if (++putIndex == items.length)
//满了就将下一个入队索引设置为 0
putIndex = 0;
count++;
//唤醒 其他阻塞的出队操作
notEmpty.signal();
}
1.4.4 取出数据的take方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//可中断锁
lock.lockInterruptibly();
try {
while (count == 0)
//如果队列数量为0,则阻塞取数据的锁
notEmpty.await();
//队列长度不为0,开始取数据
return dequeue();
} finally {
lock.unlock();
}
}
从队列取出数据
private E dequeue() {
//取得当前items对象
final Object[] items = this.items;
//获取数组最后一个数据
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
//取走后置空数组最后一个元素
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//唤醒 存入数据的锁
notFull.signal();
return x;
}
二、LinkedBlockingQueue底层原理
2.1 主要参数 解释
//队列中元素个数
private final AtomicInteger count = new AtomicInteger();
//头节点
transient Node<E> head;
//尾节点
private transient Node<E> last;
//出队锁
private final ReentrantLock takeLock = new ReentrantLock();
//如果队列为空,出队就会陷入等待
private final Condition notEmpty = takeLock.newCondition();
//入队锁
private final ReentrantLock putLock = new ReentrantLock();
//如果队列满了,入队就陷入等待
private final Condition notFull = putLock.newCondition();
2.2 存入元素put
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
//利用ReentrantLock独占锁来加锁,保证同时只有一个线程来put
final ReentrantLock putLock = this.putLock;
//利用AtomicInteger来表示queue中的元素个数
final AtomicInteger count = this.count;
//可打断的加锁
putLock.lockInterruptibly();
try {
// private final Condition notFull = putLock.newCondition();
//如果队列满了,就调用notFull。await()。notFull是putLock的条件变量,当调用notFull.await()会将putLock释放,阻塞在等待队列notFull上
while (count.get() == capacity) {
notFull.await();
}
//入队,不用获得takeLock,因为与出队操作不涉及共享变量
//从入队代码可以看出head是一个哨兵节点,不存放任何实际数据
//last = last.next = node;
enqueue(node);
//count++
c = count.getAndIncrement();
//如果队列未满,唤醒被阻塞的入队线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//如果c == 0,说明入队之前队列为空,唤醒出队的等待线程
if (c == 0)
signalNotEmpty();
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
//获取出队锁
takeLock.lock();
try {
//唤醒出队等待线程
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
取出元素take
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//如果队列为空,放弃takeLock,阻塞在等待队列notEmpty上
while (count.get() == 0) {
notEmpty.await();
}
//出队
x = dequeue();
//count--;
c = count.getAndDecrement();
//如果队列不为空,唤醒出队等待线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//如果队列不为空,唤醒入队等待线程
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
//head是哨兵节点,不存放数据,实际的头节点是head.next
Node<E> h = head;
//head的next
Node<E> first = h.next;
h.next = h;
//将head踢出
head = first;
//first的item才是第一个元素,head是哨兵节点
E x = first.item;
first.item = null;
//从dequeue方法可以看出,queue中始终有一个哨兵head节点,不存储任何数据,queue中第一个元素是head.next
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//唤醒入队等待线程
notFull.signal();
} finally {
putLock.unlock();
}
}
三、入队出队总结
3.1 入队
方法 | 做法 |
---|---|
put | 如果队列满了,就阻塞,当队列不满的时候,会再执行入队操作 |
offer | 如果队列满了,返回false。未满就返回true |
add | 如果队列满了,抛出异常,未满就返回true |
3.2 出队
方法 | 做法 |
---|---|
take | 如果队列为空,就阻塞,当队列不空的时候,会再执行出队操作 |
poll | 如果队列空了,返回null |
peek | 返回队列首元素,不会出队 |