实现一个线程安全的队列有两 种方式:一种是使用阻塞算法,另一种是使用非阻塞算法。使用阻塞算法的队列可以用一个锁 (入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。非阻塞的实现方 式则可以使用循环CAS的方式来实现。
1. 简介
ConcurrentLinkedQueue 是一个 非阻塞(lock-free) 的线程安全队列,它适合在并发场景下使用。(并发不是特别剧烈)
- 数据结构:基于 单向链表。
- 核心原理:通过 CAS(Compare-And-Swap) 操作确保线程安全,避免锁的性能开销。
- 应用场景:
- 适合多线程环境中频繁进行 入队(offer) 和 出队(poll) 操作,但并发又不是特别激烈。
- 在不需要阻塞等待的场景下,优于阻塞式队列(如 BlockingQueue)。
- 不需要强一致性。
特点:
1. ConcurrentLinkedQueue 是 无界队列,并且遵循 FIFO(先进先出) 原则,如果没有适当的控制机制,可能会导致内存溢出
2. size()
和 isEmpty()
方法可能不准确:在并发环境下使用 size()
和 isEmpty()
方法时需要特别小心,因为它们的结果可能并不准确。如果需要精确的元素数量或空队列检测,建议使用额外的同步机制或原子变量来实现
2. 核心原理:非阻塞实现的关键 - CAS 算法
2.1. 什么是 CAS?
CAS(Compare-And-Swap) 是一种常用的 无锁同步机制,主要用于实现原子操作。CAS 的核心思想是:
- 比较当前变量的值是否等于预期值。
- 如果相等,则将变量更新为新值。
- 如果不相等,操作失败,重新尝试。
优点:避免加锁带来的线程阻塞和上下文切换,性能更高。
缺点:CAS 可能导致 ABA 问题(即值从 A 变成 B,又变回 A),但 ConcurrentLinkedQueue 通过引用地址判断有效地解决了这个问题。
2.2. Wait-Free 算法
ConcurrentLinkedQueue 基于 Michael & Scott 算法 实现了一个 非阻塞式队列,并在此基础上进行了一些优化。
- Wait-Free 的含义是:操作一定会在有限步内完成,不会出现无限等待的情况。
- 关键技术:通过 CAS 操作 来确保对队列的入队和出队操作是线程安全的。
3. ConcurrentLinkedQueue 源码分析
下面看看大师Doug Lea是如何使用非阻塞的方式来实现线程安全队列ConcurrentLinkedQueue的,核心是采用了"wait-free"算法(即CAS算法)来实现,该算法在 Michael&Scott算法上进行了一些修改。
3.1. 数据结构:单向链表
ConcurrentLinkedQueue 使用 单向链表 作为底层数据结构:每个节点使用内部类 Node<E> 表示,链表中维护两个重要的指针:head:指向链表的头节点,tail:指向链表的尾节点。
private transient volatile Node<E> head; // 头节点
private transient volatile Node<E> tail; // 尾节点
static class Node<E> {
volatile E item; // 节点存储的数据
volatile Node<E> next; // 指向下一个节点的引用
Node(E item) {
this.item = item;
}
}
3.2. 入队操作:offer(E e)
入队操作的目标是将新节点插入到链表的尾部,核心是使用 CAS 来更新尾节点的引用。
/**
* 将指定元素插入队列的尾部。
* 该方法采用非阻塞的CAS算法,保证线程安全且高效。
* 由于队列是无界的,因此该方法总是返回 true。
*
* @param e 要插入的元素,不能为 null
* @return {@code true} (队列是无界的,插入一定成功)
* @throws NullPointerException 如果插入的元素为 null
*/
public boolean offer(E e) {
// 1. 将元素 e 封装成一个新节点,确保元素不为 null
final Node<E> newNode = new Node<E>(Objects.requireNonNull(e));
// 2. 开始自旋,尝试将新节点插入到队列的尾部
for (Node<E> t = tail, p = t;;) { // t 为尾节点,p 用于遍历节点
Node<E> q = p.next; // q 指向当前节点 p 的下一个节点
// 3. 如果当前节点 p 的 next 为 null,说明 p 是尾节点
if (q == null) {
// 尝试将 p 的 next 指向新节点,使用 CAS 操作确保线程安全
if (NEXT.compareAndSet(p, null, newNode)) {
// CAS 成功,说明新节点已被成功插入队列
// 如果 p 不是当前的尾节点 t,尝试更新尾节点为新节点
if (p != t) {
TAIL.weakCompareAndSet(this, t, newNode); // 尾节点更新不强制成功
}
return true; // 插入成功,返回 true
}
// 如果 CAS 失败,说明有其他线程插入了新节点,继续循环重试
}
// 4. 如果 p == q,说明链表结构发生了异常(例如节点被标记为已删除)
else if (p == q) {
// 此时需要重新定位到队列的尾部,确保链表结构完整
p = (t != (t = tail)) ? t : head; // 尝试从尾部 t 或头部 head 开始重新遍历
}
// 5. 否则,说明 p 不是尾节点,继续向后遍历链表
else {
// 如果 p 不等于 t 并且尾节点 t 发生了更新,则更新 t 并继续遍历
// 否则,p 指向当前节点的下一个节点 q,继续循环
p = (p != t && t != (t = tail)) ? t : q;
}
}
}
3.3. 出队操作:poll()
出队操作的目标是移除并返回头节点的数据。
public E poll() {
// 标记重新从头部开始循环的标签
restartFromHead: for (;;) {
// 从头节点开始遍历链表
for (Node<E> h = head, p = h, q;; p = q) {
final E item;
// 如果当前节点的 item 不为 null,并且 CAS 成功(移除该 item)
if ((item = p.item) != null && p.casItem(item, null)) {
// 成功的 CAS 操作是该元素被移除的线性化点
// 如果 p 不是头节点 h,跳过两个节点进行操作
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p); // 更新头节点
return item; // 返回已移除的元素
}
// 如果 p 的 next 为 null,说明队列为空,更新头节点并返回 null
else if ((q = p.next) == null) {
updateHead(h, p); // 更新头节点为当前节点
return null; // 返回 null 表示队列为空
}
// 如果 p == q,表示发生了循环,重新从头开始处理
else if (p == q)
continue restartFromHead; // 重新从头开始遍历
}
}
}
步骤解析:
- 通过 CAS 更新 head 指针,使其指向下一个节点。
- 将头节点中的数据置空(通过 casItem 操作)。
- 返回头节点的数据。