单锁实现
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class BlockingQueue1<E> implements BlockingQueue<E> {
private final E[] array; // 内部存储元素的数组
private int head = 0; // 队列头部索引
private int tail = 0; // 队列尾部索引
private int size = 0; // 队列中元素的数量
/**
* 构造函数,初始化队列容量
*
* @param capacity 队列的最大容量
*/
@SuppressWarnings("all")
public BlockingQueue1(int capacity) {
array = (E[]) new Object[capacity]; // 创建一个固定大小的数组
}
private final ReentrantLock lock = new ReentrantLock(); // 互斥锁,用于控制并发访问
private final Condition tailWaits = lock.newCondition(); // 等待队列满的条件变量
private final Condition headWaits = lock.newCondition(); // 等待队列空的条件变量
/**
* 将元素插入队列末尾,如果队列已满则等待直到有空间可用
*
* @param e 要插入的元素
* @throws InterruptedException 如果当前线程在等待时被中断
*/
@Override
public void offer(E e) throws InterruptedException {
lock.lockInterruptibly(); // 获取锁,并允许响应中断
try {
while (isFull()) { // 如果队列已满,则等待
tailWaits.await();
}
array[tail] = e; // 插入元素到队列尾部
if (++tail == array.length) { // 更新尾部指针,循环队列处理
tail = 0;
}
size++; // 增加元素计数
headWaits.signal(); // 通知可能等待的消费者线程
} finally {
lock.unlock(); // 确保无论是否发生异常都释放锁
}
}
/**
* 将元素插入队列末尾,如果队列已满且超过指定超时时间则放弃插入
*
* @param e 要插入的元素
* @param timeout 超时时间,单位为毫秒
* @throws InterruptedException 如果当前线程在等待时被中断
*/
@Override
public void offer(E e, long timeout) throws InterruptedException {
lock.lockInterruptibly(); // 获取锁,并允许响应中断
try {
long t = TimeUnit.MILLISECONDS.toNanos(timeout); // 将毫秒转换为纳秒
while (isFull()) { // 如果队列已满,则等待
if (t <= 0) { // 如果超时时间已到,返回
return;
}
t = tailWaits.awaitNanos(t); // 等待并更新剩余时间
}
array[tail] = e; // 插入元素到队列尾部
if (++tail == array.length) { // 更新尾部指针,循环队列处理
tail = 0;
}
size++; // 增加元素计数
headWaits.signal(); // 通知可能等待的消费者线程
} finally {
lock.unlock(); // 确保无论是否发生异常都释放锁
}
}
/**
* 从队列头部移除并返回元素,如果队列为空则等待直到有元素可用
*
* @return 队列头部的元素
* @throws InterruptedException 如果当前线程在等待时被中断
*/
@Override
public E poll() throws InterruptedException {
lock.lockInterruptibly(); // 获取锁,并允许响应中断
try {
while (isEmpty()) { // 如果队列为空,则等待
headWaits.await();
}
E e = array[head]; // 获取队列头部的元素
array[head] = null; // 清除引用以帮助垃圾回收
if (++head == array.length) { // 更新头部指针,循环队列处理
head = 0;
}
size--; // 减少元素计数
tailWaits.signal(); // 通知可能等待的生产者线程
return e;
} finally {
lock.unlock(); // 确保无论是否发生异常都释放锁
}
}
/**
* 检查队列是否为空
*
* @return 如果队列为空则返回 true,否则返回 false
*/
private boolean isEmpty() {
return size == 0;
}
/**
* 检查队列是否已满
*
* @return 如果队列已满则返回 true,否则返回 false
*/
private boolean isFull() {
return size == array.length;
}
}
双锁实现
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicInteger;
public class BlockingQueue2<E> implements BlockingQueue<E> {
private final E[] array; // 内部存储元素的数组
private int head = 0; // 队列头部索引
private int tail = 0; // 队列尾部索引
private final AtomicInteger size = new AtomicInteger(0); // 使用原子整数来保证线程安全的计数器
// 控制对队列尾部的访问
private final ReentrantLock tailLock = new ReentrantLock();
private final Condition tailWaits = tailLock.newCondition(); // 等待队列满的条件变量
// 控制对队列头部的访问
private final ReentrantLock headLock = new ReentrantLock();
private final Condition headWaits = headLock.newCondition(); // 等待队列空的条件变量
/**
* 构造函数,初始化队列容量
*
* @param capacity 队列的最大容量
*/
public BlockingQueue2(int capacity) {
this.array = (E[]) new Object[capacity]; // 创建一个固定大小的数组
}
/**
* 将元素插入队列末尾,如果队列已满则等待直到有空间可用
*
* @param e 要插入的元素
* @throws InterruptedException 如果当前线程在等待时被中断
*/
@Override
public void offer(E e) throws InterruptedException {
int c; // 用于保存当前队列中的元素数量
tailLock.lockInterruptibly(); // 获取尾部锁,并允许响应中断
try {
while (isFull()) { // 如果队列已满,则等待
tailWaits.await();
}
array[tail] = e; // 插入元素到队列尾部
if (++tail == array.length) { // 更新尾部指针,循环队列处理
tail = 0;
}
c = size.getAndIncrement(); // 增加元素计数,并获取之前的值
// a. 队列不满, 但不是从满->不满, 由此offer线程唤醒其它offer线程
if (c + 1 < array.length) {
tailWaits.signal(); // 唤醒其他可能等待的生产者线程
}
} finally {
tailLock.unlock(); // 确保无论是否发生异常都释放锁
}
// b. 从0->不空, 由此offer线程唤醒等待的poll线程
if (c == 0) {
headLock.lock(); // 获取头部锁
try {
headWaits.signal(); // 唤醒可能等待的消费者线程
} finally {
headLock.unlock(); // 确保无论是否发生异常都释放锁
}
}
}
/**
* 从队列头部移除并返回元素,如果队列为空则等待直到有元素可用
*
* @return 队列头部的元素
* @throws InterruptedException 如果当前线程在等待时被中断
*/
@Override
public E poll() throws InterruptedException {
E e; // 用于保存出队的元素
int c; // 用于保存当前队列中的元素数量
headLock.lockInterruptibly(); // 获取头部锁,并允许响应中断
try {
while (isEmpty()) { // 如果队列为空,则等待
headWaits.await();
}
e = array[head]; // 获取队列头部的元素
array[head] = null; // 清除引用以帮助垃圾回收
if (++head == array.length) { // 更新头部指针,循环队列处理
head = 0;
}
c = size.getAndDecrement(); // 减少元素计数,并获取之前的值
// b. 队列不空, 但不是从0变化到不空,由此poll线程通知其它poll线程
if (c > 1) {
headWaits.signal(); // 唤醒其他可能等待的消费者线程
}
} finally {
headLock.unlock(); // 确保无论是否发生异常都释放锁
}
// a. 从满->不满, 由此poll线程唤醒等待的offer线程
if (c == array.length) {
tailLock.lock(); // 获取尾部锁
try {
tailWaits.signal(); // 唤醒可能等待的生产者线程
} finally {
tailLock.unlock(); // 确保无论是否发生异常都释放锁
}
}
return e;
}
/**
* 检查队列是否为空
*
* @return 如果队列为空则返回 true,否则返回 false
*/
private boolean isEmpty() {
return size.get() == 0;
}
/**
* 检查队列是否已满
*
* @return 如果队列已满则返回 true,否则返回 false
*/
private boolean isFull() {
return size.get() == array.length;
}
}