关于对ArrayBlockingQueue 的AQS探究

1、介绍

条件队列是 AQS 中最容易被忽视的一个细节。大部分时候,我们都用不上条件队列,但是这并不说明条件队列就没有用处了,它反而是我们学习生产者-消费者模式的最佳教材。条件队列是指一个阻塞队列,其中的元素是等待某个条件成立的线程。在 Java 的同步机制中,条件队列通常用于实现线程等待和唤醒的操作。当条件不满足时,线程会被加入到条件队列中等待;当条件满足时,线程会被从条件队列中移除并继续执行

2、AQS 中的条件队列

AbstractQueuedSynchronizer 中的条件队列是一个单向链表,每个节点代表一个等待线程。条件队列的头节点是一个特殊的节点,表示等待队列的头部。当条件不满足时,线程会被加入到条件队列的尾部等待;当条件满足时,线程会从条件队列中移除并加入到同步队列中等待获取锁。
AbstractQueuedSynchronizer 中的条件队列是通过内部维护的等待队列和同步队列实现的。当线程调用 await() 方法时,它会被加入到等待队列中等待条件满足。
AQS模型当有其他线程调用了条件队列的 signal() 方法,线程则会从条件队列中移除,并加入到同步队列中等待获取锁,示例如下:
在这里插入图片描述

3、方法调用

(1)await

该方法的作用是将当前线程加入条件队列并阻塞,直到被唤醒或中断。

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

如果线程被中断过,那么直接抛出中断异常
创建一个 Condition 类型的节点,并插入到等待队列中。

private Node addConditionWaiter() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }

    Node node = new Node(Node.CONDITION);

    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

我们看到 isHeldExclusively(),这个方法主要就是判断当前线程是否是持有独占锁的线程,如果不是,则直接抛出异常。所以,我们可以知道条件队列只适用于独占锁的情况。并且调用 await() 方法前,必须先获取锁。
然后,如果最后一个节点被取消的话,则会调用 unlinkCancelledWaiters() 方法移除掉所有被取消的节点。这个方法就是从等待的第一个元素开始,依次向后查找被取消的节点,然后将这些被取消的节点移除出等待队列中。
最后创建一个 Condition 节点,并且加入到条件队列中。

注意:操作等待队列的过程中,因为依然持有独占锁,所以是线程安全的,并不需要额外的同步操作。
释放锁
fullyRelease 是一个释放当前持有锁的方法,这个方法是调用 tryRelease 释放所持有的锁。
这里释放锁的原因是接下来将阻塞线程,如果不释放锁,那么这个锁资源将无法再被使用,直到这个持有锁的线程被唤醒,会造成资源的浪费
循环判断当前节点是否在同步队列中

while (!isOnSyncQueue(node)) {
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}

如果不在同步队列中,则直接阻塞线程,此时线程就停留在LockSupport.park(this); 处。直到其他线程将他唤醒或者中断,才会继续执行
如何判断节点是否在同步队列中呢?

我们可以先思考一下,等待队列中的节点状态都是 Condition 的,而同步队列中的状态则是 0、Signal等等,因此我们通过判断节点的状态。其次,同步队列中使用 pred、next 指针来组织同步队列的结构,我们可以通过判断节点的 pred 和 next 指针是否有值来判断。

final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    /*
     * node.prev can be non-null, but not yet on queue because
     * the CAS to place it on queue can fail. So we have to
     * traverse from tail to make sure it actually made it.  It
     * will always be near the tail in calls to this method, and
     * unless the CAS failed (which is unlikely), it will be
     * there, so we hardly ever traverse much.
     */
    return findNodeFromTail(node);
}

第一种情况:通过判断节点的状态和 pred 指针来确定节点是否已经在同步队列中。

第二种情况:通过判断节点的 next 指针来确定节点是否已经在同步队列中。

而第三种情况其实是第一种情况的延伸,主要跟 enq() 插入同步队列有关。主要考虑的是当 pred 指针不为 null 时,说明节点可能已经为同步队列中,为什么说可能,是因为 CAS 设置 next 指针可能会失败。所以需要从同步队列的后面从前面开始在同步队列寻找此节点。

private Node enq(Node node) {
    for (;;) {
        Node oldTail = tail;
        if (oldTail != null) {
            node.setPrevRelaxed(oldTail);
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return oldTail;
            }
        } else {
            initializeSyncQueue();
        }
    }
}

在插入同步队列方法中,我们可以看到第 5 行,它是先设置节点的 pred 指针。然后再通过 CAS 设置节点的 next 指针,如果此时 CAS 失败了,就会出现 pred 指针有值,但是 next 是找不到节点的。
当线程被唤醒之后(signal 或者中断),检查中断。

private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

如果线程在 signal 之前就已经被唤醒(中断方式),则返回 THROW_IE;如果在 signal 唤醒之后才被中断,则返回REINTERRUPT。
怎么判断线程的唤醒方式呢
我们可以来思考一下:signal 的时候,会将节点的 Condition 状态修改为 0,也就是说如果是使用 signal 的方式唤醒线程,那么节点在被唤醒之前,它的节点状态已经被修改为 0 了。而使用中断的方式唤醒线程,则不会修改节点的状态。因此我们只需要判断节点的状态就可以知道线程是被哪种方式唤醒的。

我们通过源码来揭晓我们的猜想是否正确

final boolean transferAfterCancelledWait(Node node) {
    if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    /*
     * If we lost out to a signal(), then we can't proceed
     * until it finishes its enq().  Cancelling during an
     * incomplete transfer is both rare and transient, so just
     * spin.
     */
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

通过 CAS 修改节点的状态,如果节点的状态是 Condition 的话,则可以成功修改为 0。说明节点就是被中断唤醒。如果节点状态无法修改成功,则说明节点的状态已经不是 Condition 了,那么则说明在唤醒之前已经被修改了,那么只可能是 signal 方法唤醒的。

如果节点是通过 signal 方法唤醒的,这里会循环判断节点是否已经在同步队列中了,只有节点已经在同步队列,才会结束执行。
为什么这里需要循环判断节点是否已经在同步队列呢?
如果这里不执行这一段的话,那么直接返回 false。我们此时回到 await 方法,会进入到下一次循环,继续判断节点是否在同步队列中,如果节点此时还没有加入到同步队列的话,就会继续被阻塞。那么这个节点就会出现在同步队列,却被阻塞的情况。如果此时没有其他线程唤醒的话,该线程就成为了一个死线程。

被唤醒之后,尝试获取锁
这里获取锁的方法跟独占锁获取锁的方法是同一个。获取锁成功之后,将返回,并重新记录中断。

(2) signal

该方法的作用是将条件队列中的第一个线程唤醒。

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

如果线程没有获取锁,那么则直接抛出异常。
唤醒条件队列中的第一个节点。

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

将节点从条件队列中移除,并加入到同步队列中,同时唤醒线程。

这里为什么需要使用一个 while 循环? 因为有可能当前需要唤醒节点已经被取消了,那么就需要继续唤醒下一个节点,直到有一个节点被成功唤醒或者条件队列为空才结束,这样做是保证可以正常唤醒一个线程。

我们线程来看它是如何唤醒一个线程的:

final boolean transferForSignal(Node node) {
    if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

从第一个条件判断,我们就可以知道当节点已经不是 Condition 状态时,说明它可能已经被取消了。因此直接返回 false,让外层的 while 循环去唤醒下一个节点。

当已经加入同步队列后,发现前驱节点已经被取消,或者设置前驱节点的状态为 Signal 失败了,那么则直接唤醒当前节点的线程。
为什么要唤醒线程呢?而不是根据同步队列的规则去唤醒?
这里是为了让节点在同步队列中,可以自己完成修正,主要细节在 acquireQueued 这个方法中。我们这里只做简单的介绍。acquireQueued 会去尝试获取锁,如果获取锁失败,那么则会根据前驱节点的状态,来做出调整。如果前驱节点已经是 Signal,那么则直接进入阻塞,等待前驱节点唤醒。如果前驱节点是取消状态,即 status > 0,那么则往前寻找一个 status <= 0 的节点。

如果当前节点的前驱节点的状态可以正常设置为 Signal,那么则不会进行唤醒,而是按照同步队列的规则去进行后续的唤醒操作。

当线程被唤醒之后,就会继续执行 await 方法的后续代码。

小结

  1. 条件队列只能在独占锁情况下使用
  2. 在调用 await 和 signal 方法前,必须先获取锁,否则会抛出异常
  3. 线程可以被中断和 signal 两种方式唤醒。
  4. 线程被唤醒之后,并不会马上获得锁,而是加入同步队列,跟同步队列中的其他节点一起竞争锁。
  5. 使用 await 方法时,如果中断发生调用 signal 方法之前,那么会直接抛出中断异常。如果不想处理中断,可以使用
    awaitUninterruptibly(),该方法不会抛出中断异常,只会记录中断标记。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/219107.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

每日一题:LeetCode-75. 颜色分类

每日一题系列&#xff08;day 12&#xff09; 前言&#xff1a; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f50e…

ROS 元功能包

ROS元功能包&#xff08;Metapackage&#xff09;是一种特殊的软件包&#xff0c;它本身并不包含任何可执行代码或数据文件。在ROS 1中&#xff0c;可以通过catkin_create_pkg命令创建元功能包。 相反&#xff0c;它的主要目的是作为一组相关功能包的集合或者依赖关系列表。使…

蓝桥杯每日一题2023.12.5

题目描述 1.一步之遥 - 蓝桥云课 (lanqiao.cn) 题目分析 对于本题遵循多了就减少了就加的原则&#xff0c;用while进行计算即可 #include<bits/stdc.h> using namespace std; int x, ans; int main() {while(x ! 1){if(x < 1)x 97;else x - 127;ans ;}cout <&…

vue-cli创建项目运行报错this[kHandle] = new _Hash(algorithm, xofLen);(完美解决)

1&#xff1a;问题出现的原因 出现这个问题是node.js 的版本问题&#xff0c;因为 node.js V17开始版本中发布的是OpenSSL3.0, 而OpenSSL3.0对允许算法和密钥大小增加了严格的限制&#xff0c;可能会对生态系统造成一些影响。故此以前的项目在使用 nodejs V17以上版本后会报错。…

使用VBA快速统计词组(单词组合)词频

实例需求&#xff1a;产品清单如A列所示&#xff0c;现在如下统计词组词频。想必各位小伙伴都指定如何使用字典对象实现去重&#xff0c;进而实现单个单词的词频统计。 但是统计词组词频就没有那么简单了&#xff0c;为了便于演示&#xff0c;此处的词组只限于两个单词的组合。…

阿里云Arthas使用——在日志没有输出异常情况下,如何进行线上bug定位 stack命令 和 trace命令

前言 Arthas 是一款线上监控诊断产品&#xff0c;通过全局视角实时查看应用 load、内存、gc、线程的状态信息&#xff0c;并能在不修改应用代码的情况下&#xff0c;对业务问题进行诊断&#xff0c;包括查看方法调用的出入参、异常&#xff0c;监测方法执行耗时&#xff0c;类…

深入理解:指针变量的解引用 与 加法运算

前言 指针变量的解引用和加法运算是非常高频的考点&#xff0c;也是难点&#xff0c;因为对初学者的不友好&#xff0c;这就导致了各大考试都很喜欢在这里出题&#xff0c;通常会伴随着强制类型转换、二维数组、数组指针等一起考查大家对指针的理解。但是不要怕&#xff0c;也许…

托盘四向穿梭车自动化密集库供应|单机智能向系统智能跨越的HEGERLS托盘四向车系统

随着物流产业的迅猛发展&#xff0c;托盘四向穿梭式自动化密集仓储系统可认为是在穿梭车货架系统基础上提出的一种新仓储概念。托盘四向穿梭式立体库因其在流通仓储体系中所具有的高效密集存储功能优势、运作成本优势与系统化智能化管理优势&#xff0c;已发展为仓储物流的主流…

契约锁2023年伙伴大会连下58城,顺利收官!

10月以来&#xff0c;携手全国58城的IT伙伴&#xff0c;共同探讨电子签章海量市场下的发展机遇以及合作模式、交流分享电子签章海量市场机遇、体验电子签章产品在组织数字化建设中的应用价值。 以简单易用、方便实施的产品&#xff0c;和开放共享政策&#xff0c;广结伙伴、共建…

常用汇编指令集

寄存器 如上是OD展示的寄存器&#xff0c;逐条说明常用的寄存器和标志位含义&#xff1a; EIP&#xff1a;寄存器指向即将要执行的指令的地址&#xff08;EIP中的地址&#xff0c;就是下一步要执行指令的地址&#xff09; ESP&#xff1a;里面的内容永远指向堆栈的最顶端 EAX&…

浪涌保护器参数指南:浪涌保护器行业选型方案

浪涌保护器&#xff08;SPD&#xff09;是一种用于限制瞬态过电压和泄放浪涌电流的器件&#xff0c;可有效降低电子设备在雷击、电源故障等情况下受到的损害。其主要作用是当系统发生浪涌时&#xff0c;将过电压、过电流泄放到大地&#xff0c;从而保护设备和人身安全。然而浪涌…

微表情检测(一)----LGAttNet论文总结

LGAttNet: Automatic microexpression detection using dualstream local and global attentions Abstract 微表情识别之前需要先进行微表情的检测。我们提出了一种基于双重注意力网络的微表情检测架构&#xff0c;称为LGAttNet。LGAttNet是第一个利用与二维卷积神经网络组合的…

虚拟机-桥接模式连接

文章目录 1.查看宿主机再用的IP信息2.桥接模式-虚拟机设置VMware设置虚拟机设置重启网络服务 1.查看宿主机再用的IP信息 ipconfig /all 注&#xff1a; 在虚拟机中要设置同网段的ip设置同一个子网掩码设置同一个网关设置同一个DNS服务器 2.桥接模式-虚拟机设置 VMware设置 虚…

从零开始学习 JS APL(五):完整指南和实例解析

目录 学习目标&#xff1a; 学习内容&#xff1a; 学习时间&#xff1a; 学习内容&#xff1a; Window对象&#xff1a; 定时器-延时函数&#xff1a; JS 执行机制&#xff1a; location对象&#xff1a; 本地存储&#xff1a; 本地存储分类- localStorage&#xff1a…

代码签名的工作原理

代码签名的基础是PKI安全体系。代码签名证书由签名证书私钥和公钥证书两部分组成。私钥用于代码的签名&#xff0c;公钥用于私钥签名的验证和证书持有者的身份识别。 1. 发布者从CA机构&#xff08;如JoySSL&#xff09;申请数字证书&#xff1b; 2. 发布者开发出代码&#x…

物联网主机E6000:工业领域的数据融合与5G未来

一、物联网的崛起 在科技日新月异的今天&#xff0c;物联网已经成为了我们生活中不可或缺的一部分。从智能家居到工业自动化&#xff0c;物联网的应用已经深入到我们生活的各个角落。而在这个大背景下&#xff0c;物联网主机的出现&#xff0c;更是为我们的生活带来了前所未有的…

3、RocketMQ源码分析(三)

RocketMQ源码-NameServer架构设计及启动流程 本文我们来分析NameServer相关代码&#xff0c;在正式分析源码前&#xff0c;我们先来回忆下NameServer的功能&#xff1a; NameServer是一个非常简单的Topic路由注册中心&#xff0c;其角色类似Dubbo中的zookeeper&#xff0c;支…

一文详解集合竞价,建议收藏!

集合竞价是指对在规定的一段时间内接受买卖申报一次性集中撮合的竞价方式。沪深市场9:15-9:25及14:57-15:00为集合竞价的时间段。集合竞价的所有交易以同一个价格成交。集合竞价的成交价确定原则是&#xff1a; 1、可实现最大成交量的价格&#xff1b; 2、高于该价格的买入申…

图表控件LightningChart .NET中文教程 - 如何创建WPF 2D热图?(二)

LightningChart.NET完全由GPU加速&#xff0c;并且性能经过优化&#xff0c;可用于实时显示海量数据-超过10亿个数据点。 LightningChart包括广泛的2D&#xff0c;高级3D&#xff0c;Polar&#xff0c;Smith&#xff0c;3D饼/甜甜圈&#xff0c;地理地图和GIS图表以及适用于科学…

评论功能实现方案

构建高效且安全的评论功能&#xff1a;实现方案探讨。 1、分析 我们以b站的评论为例&#xff0c;用下图来解释我们评论的分级。 我们可以抽出存储评论的数据表属性 评论id父级id评论作者id被回复用户ID评论帖子ID评论内容创建时间 可以设计如下的数据表 其中pid表示父id。 …