AbstractQueuedSynchronizer源码

介绍

基于队列的抽象同步器,它是jdk中所有显示的线程同步工具的基础,像ReentrantLock/DelayQueue/CountdownLatch等等,都是借助AQS实现的。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

image-20230620194452634

内部类

Node

AQS中的Node内部类的作用就是以队列的方式来存放线程节点

    /**
     * Wait queue node class.
     *
     * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
     * Hagersten) lock queue. CLH locks are normally used for
     * spinlocks.  We instead use them for blocking synchronizers, but
     * use the same basic tactic of holding some of the control
     * information about a thread in the predecessor of its node.  A
     * "status" field in each node keeps track of whether a thread
     * should block.  A node is signalled when its predecessor
     * releases.  Each node of the queue otherwise serves as a
     * specific-notification-style monitor holding a single waiting
     * thread. The status field does NOT control whether threads are
     * granted locks etc though.  A thread may try to acquire if it is
     * first in the queue. But being first does not guarantee success;
     * it only gives the right to contend.  So the currently released
     * contender thread may need to rewait.
     *
     * <p>To enqueue into a CLH lock, you atomically splice it in as new
     * tail. To dequeue, you just set the head field.
     * <pre>
     *      +------+  prev +-----+       +-----+
     * head |      | <---- |     | <---- |     |  tail
     *      +------+       +-----+       +-----+
     * </pre>
     *
     * <p>Insertion into a CLH queue requires only a single atomic
     * operation on "tail", so there is a simple atomic point of
     * demarcation from unqueued to queued. Similarly, dequeuing
     * involves only updating the "head". However, it takes a bit
     * more work for nodes to determine who their successors are,
     * in part to deal with possible cancellation due to timeouts
     * and interrupts.
     *
     * <p>The "prev" links (not used in original CLH locks), are mainly
     * needed to handle cancellation. If a node is cancelled, its
     * successor is (normally) relinked to a non-cancelled
     * predecessor. For explanation of similar mechanics in the case
     * of spin locks, see the papers by Scott and Scherer at
     * http://www.cs.rochester.edu/u/scott/synchronization/
     *
     * <p>We also use "next" links to implement blocking mechanics.
     * The thread id for each node is kept in its own node, so a
     * predecessor signals the next node to wake up by traversing
     * next link to determine which thread it is.  Determination of
     * successor must avoid races with newly queued nodes to set
     * the "next" fields of their predecessors.  This is solved
     * when necessary by checking backwards from the atomically
     * updated "tail" when a node's successor appears to be null.
     * (Or, said differently, the next-links are an optimization
     * so that we don't usually need a backward scan.)
     *
     * <p>Cancellation introduces some conservatism to the basic
     * algorithms.  Since we must poll for cancellation of other
     * nodes, we can miss noticing whether a cancelled node is
     * ahead or behind us. This is dealt with by always unparking
     * successors upon cancellation, allowing them to stabilize on
     * a new predecessor, unless we can identify an uncancelled
     * predecessor who will carry this responsibility.
     *
     * <p>CLH queues need a dummy header node to get started. But
     * we don't create them on construction, because it would be wasted
     * effort if there is never contention. Instead, the node
     * is constructed and head and tail pointers are set upon first
     * contention.
     *
     * <p>Threads waiting on Conditions use the same nodes, but
     * use an additional link. Conditions only need to link nodes
     * in simple (non-concurrent) linked queues because they are
     * only accessed when exclusively held.  Upon await, a node is
     * inserted into a condition queue.  Upon signal, the node is
     * transferred to the main queue.  A special value of status
     * field is used to mark which queue a node is on.
     *
     * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill
     * Scherer and Michael Scott, along with members of JSR-166
     * expert group, for helpful ideas, discussions, and critiques
     * on the design of this class.
     */
    static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        //用来表示在共享模式下等待的标记
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        //用来表示在独占模式下等待的标记
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        //waitStatus 的值,用来表示线程已经被取消
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        //waitStatus的值,用来表示后继线程都需要被挂起
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        //waitStatus 的值,表示线程在一个条件上等待
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         * waitStatus 的值,表示下一个acquireShared应该无条件传播
         */
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         * 等待状态值 仅接收以上4个值和默认的0
         */
        volatile int waitStatus;

        /**
         * Link to predecessor node that current node/thread relies on
         * for checking waitStatus. Assigned during enqueuing, and nulled
         * out (for sake of GC) only upon dequeuing.  Also, upon
         * cancellation of a predecessor, we short-circuit while
         * finding a non-cancelled one, which will always exist
         * because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.
         * 前驱结点
         */
        volatile Node prev;

        /**
         * Link to the successor node that the current node/thread
         * unparks upon release. Assigned during enqueuing, adjusted
         * when bypassing cancelled predecessors, and nulled out (for
         * sake of GC) when dequeued.  The enq operation does not
         * assign next field of a predecessor until after attachment,
         * so seeing a null next field does not necessarily mean that
         * node is at end of queue. However, if a next field appears
         * to be null, we can scan prev's from the tail to
         * double-check.  The next field of cancelled nodes is set to
         * point to the node itself instead of null, to make life
         * easier for isOnSyncQueue.
         * 后继节点
         */
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         * 当前线程
         */
        volatile Thread thread;

        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred to the queue to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         * 下一个条件等待节点
         */
        Node nextWaiter;

        /**
         * Returns true if node is waiting in shared mode.
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * Returns previous node, or throws NullPointerException if null.
         * Use when predecessor cannot be null.  The null check could
         * be elided, but is present to help the VM.
         *
         * @return the predecessor of this node
         * 获取当前节点的前驱节点
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        //用于addWaiter方法
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
        //用于Condition中使用
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

同步队列中Node应该长下面这个样子:

image-20230620194533854

ConditionObject

ConditionObject实现了在基于AQS锁的情况下对获取到锁的线程进行有条件的等待和唤醒,其主要的方法是await和signal以及它们的变种。

ConditionObject是专门为AQS服务的,它的节点的构造,状态的标志等都与AQS有关,在wait操作和signal操作时都需要去操作AQS的同步队列。

    /**
     * Condition implementation for a {@link
     * AbstractQueuedSynchronizer} serving as the basis of a {@link
     * Lock} implementation.
     *
     * <p>Method documentation for this class describes mechanics,
     * not behavioral specifications from the point of view of Lock
     * and Condition users. Exported versions of this class will in
     * general need to be accompanied by documentation describing
     * condition semantics that rely on those of the associated
     * {@code AbstractQueuedSynchronizer}.
     *
     * <p>This class is Serializable, but all fields are transient,
     * so deserialized conditions have no waiters.
     */
    public class ConditionObject implements Condition, java.io.Serializable {
        //序列化版本号
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        //等待队列中第一个节点
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        //等待队列中最后一个节点
        private transient Node lastWaiter;

        /**
         * Creates a new {@code ConditionObject} instance.
         */
        public ConditionObject() { }

        // Internal methods

        /**
         * Adds a new waiter to wait queue.
         * @return its new wait node
         * 向Condition队列添加等待者
         */
        //将当前线程封装为节点并设置为CONDITION加入到Condition队列中,这里如果lastWaiter不为CONDITION状态,那么会把它踢出Condition队列;
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            //检查节点的有效性,遍历队列,将状态不为CONDITION的节点剔除出队列
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //将当前线程封装成节点并且设置为CONDITION加入到Condition队列中去
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            //尾节点为空则表明队列为空,将新节点设置为头节点
            if (t == null)
                firstWaiter = node;
            //尾节点不为空则表明队列不为空,将新节点设置为尾节点的后续节点
            else
                t.nextWaiter = node;
            //将新节点设置为尾节点
            lastWaiter = node;
            return node;
        }

        /**
         * Removes and transfers nodes until hit non-cancelled one or
         * null. Split out from signal in part to encourage compilers
         * to inline the case of no waiters.
         * @param first (non-null) the first node on condition queue
         *              唤醒等待队列中的第一个线程节点,
         *              成功则返回
         *              失败则尝试唤醒下一个线程节点
         */
        private void doSignal(Node first) {
            do {
                //新的头节点为空则,队列为空
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                //断开头节点与队列的连接
                first.nextWaiter = null;
             //将头节点从等待队列中移除
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

        /**
         * Removes and transfers all nodes.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

        /**
         * Unlinks cancelled waiter nodes from condition queue.
         * Called only while holding lock. This is called when
         * cancellation occurred during condition wait, and upon
         * insertion of a new waiter when lastWaiter is seen to have
         * been cancelled. This method is needed to avoid garbage
         * retention in the absence of signals. So even though it may
         * require a full traversal, it comes into play only when
         * timeouts or cancellations occur in the absence of
         * signals. It traverses all nodes rather than stopping at a
         * particular target to unlink all pointers to garbage nodes
         * without requiring many re-traversals during cancellation
         * storms.
         * 检查逻辑
         * 从等待队列的头节点开始遍历,如果头节点的waitStatus != Node.CONDITION则将firstWaiter的引用指向头结点的下一个节点,
         * 则该下一个节点就变成了新的头节点,如此往复进行
         */
        private void unlinkCancelledWaiters() {
            //等待队列的头节点
            Node t = firstWaiter;
            //trail用于记录上一个有效的节点
            Node trail = null;
            //从头节点开始遍历
            while (t != null) {
                Node next = t.nextWaiter;
                //当前线程的等待状态不是 Node.CONDITION等待状态
                if (t.waitStatus != Node.CONDITION) {
                    //断开与下一个节点的连接 t独立出来便于GC回收
                    t.nextWaiter = null;
                    if (trail == null)
                        //将头节点的下一个节点设置为头节点
                        firstWaiter = next;
                    else
                        //通过nextWaiter连接到队列中
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    //有效节点
                    trail = t;
                //遍历下一个节点
                t = next;
            }
        }

        // public methods

        /**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         * signal的作用就是将await中Condition队列的第一个节点唤醒;
         */
        public final void signal() {
            //isHeldExclusively是需要子类继承的,在lock中判断当前线程是否是获得锁的线程,是则返回true,如果当前线程不是获取锁的线程则抛出异常
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            //获取Condition队列中第一个Node
            Node first = firstWaiter;
            //队首的节点不为null
            if (first != null)
                //唤醒队首的节点
                doSignal(first);
        }

        /**
         * Moves all threads from the wait queue for this condition to
         * the wait queue for the owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }

        /**
         * Implements uninterruptible condition wait.
         * <ol>
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * </ol>
         */
        public final void awaitUninterruptibly() {
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            boolean interrupted = false;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if (Thread.interrupted())
                    interrupted = true;
            }
            if (acquireQueued(node, savedState) || interrupted)
                selfInterrupt();
        }

        /*
         * For interruptible waits, we need to track whether to throw
         * InterruptedException, if interrupted while blocked on
         * condition, versus reinterrupt current thread, if
         * interrupted while blocked waiting to re-acquire.
         */

        /** Mode meaning to reinterrupt on exit from wait */
        private static final int REINTERRUPT =  1;
        /** Mode meaning to throw InterruptedException on exit from wait */
        private static final int THROW_IE    = -1;

        /**
         * Checks for interrupt, returning THROW_IE if interrupted
         * before signalled, REINTERRUPT if after signalled, or
         * 0 if not interrupted.
         *  检查Condition队列中节点在等待过程中的中断状态
         *  THROW_IE:表示在signal之前被中断唤醒
         *  REINTERRUPT:表示在signal之后有中断,在singnal之后被通断,需要保证singnal的行为最终完成,所以中断只用延续状态状态REINTERRUPT,不用抛出异常。
         */
        private int checkInterruptWhileWaiting(Node node) {
                     //中断状态
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }

        /**
         * Throws InterruptedException, reinterrupts current thread, or
         * does nothing, depending on mode.
         */
        private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }

        /**
         * Implements interruptible condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled or interrupted.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * </ol>
         *等候机制: 持有锁的线程调用await方法将会主动放弃锁,将其加入等待队列中
         *  等候机制的操作分为如下几步:
         *  1.调用addConditionWaiter方法,将节点添加到等候队列
         *  2.调用fullyRelease方法,释放锁机制
         *  3.while循环,判断当前节点是否在同步队列中,如果不在则挂起当前线程
         *  4.获取锁,并判断线程是否中断
         */
        public final void await() throws InterruptedException {
            //await()方法对中断敏感,线程中断为true时调用await()就会抛出异常
            if (Thread.interrupted())
                throw new InterruptedException();
            //将当前线程封装成节点并且设置为CONDITION加入到Condition队列中去,这里如果lastWaiter不为CONDITION状态,那么会把它踢出Condition队列。
            Node node = addConditionWaiter();
            //释放node节点线程的锁, 锁可能有重入,所以这里不是直接调用AQS的release方法,详情见fullyRelease说明
            int savedState = fullyRelease(node);
            //标记线程在await过程中的中断转态,0表示未中断
            int interruptMode = 0;
            //判断节点是否在同步队列中,只有node节点进入了同步队列循环才会结束(即,被signal了)
            while (!isOnSyncQueue(node)) {
                //不再同步队列中,则使用LockSupport.park将其挂起
                LockSupport.park(this);
                //线程被唤醒或者中断后,判断线程的中断状态
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    //中断则推出循环
                    break;
            }
            //线程被唤醒后重新获取锁,锁状态恢复到savedState
            //不管线程是:未中断,还是signal中断,singnal后中断,前面的代码 都会保证node节点进入同步队列。
            //acquireQueued 方法获取到锁,并且在获取锁park的过程中有被中断,并且之前在await过程中,不是被signal之前就中断的情况,则标记后续处理中断的情况为interruptMode。
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            //重新获取到锁,把节点从condition队列中去除,同时也会清除被取消的节点
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            //线程被中断,根据中断条件选择抛出异常或者重新中断传递状态
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

        /**
         * Implements timed condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled, interrupted, or timed out.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * </ol>
         */
        public final long awaitNanos(long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            final long deadline = System.nanoTime() + nanosTimeout;
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                if (nanosTimeout <= 0L) {
                    transferAfterCancelledWait(node);
                    break;
                }
                if (nanosTimeout >= spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                nanosTimeout = deadline - System.nanoTime();
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return deadline - System.nanoTime();
        }

        /**
         * Implements absolute timed condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled, interrupted, or timed out.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * <li> If timed out while blocked in step 4, return false, else true.
         * </ol>
         */
        public final boolean awaitUntil(Date deadline)
                throws InterruptedException {
            long abstime = deadline.getTime();
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            boolean timedout = false;
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                if (System.currentTimeMillis() > abstime) {
                    timedout = transferAfterCancelledWait(node);
                    break;
                }
                LockSupport.parkUntil(this, abstime);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return !timedout;
        }

        /**
         * Implements timed condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled, interrupted, or timed out.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * <li> If timed out while blocked in step 4, return false, else true.
         * </ol>
         */
        public final boolean await(long time, TimeUnit unit)
                throws InterruptedException {
            long nanosTimeout = unit.toNanos(time);
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            final long deadline = System.nanoTime() + nanosTimeout;
            boolean timedout = false;
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                if (nanosTimeout <= 0L) {
                    timedout = transferAfterCancelledWait(node);
                    break;
                }
                if (nanosTimeout >= spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                nanosTimeout = deadline - System.nanoTime();
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return !timedout;
        }

        //  support for instrumentation

        /**
         * Returns true if this condition was created by the given
         * synchronization object.
         *
         * @return {@code true} if owned
         */
        final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
            return sync == AbstractQueuedSynchronizer.this;
        }

        /**
         * Queries whether any threads are waiting on this condition.
         * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
         *
         * @return {@code true} if there are any waiting threads
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        protected final boolean hasWaiters() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                if (w.waitStatus == Node.CONDITION)
                    return true;
            }
            return false;
        }

        /**
         * Returns an estimate of the number of threads waiting on
         * this condition.
         * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
         *
         * @return the estimated number of waiting threads
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        protected final int getWaitQueueLength() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int n = 0;
            for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                if (w.waitStatus == Node.CONDITION)
                    ++n;
            }
            return n;
        }

        /**
         * Returns a collection containing those threads that may be
         * waiting on this Condition.
         * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.
         *
         * @return the collection of threads
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        protected final Collection<Thread> getWaitingThreads() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            ArrayList<Thread> list = new ArrayList<Thread>();
            for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                if (w.waitStatus == Node.CONDITION) {
                    Thread t = w.thread;
                    if (t != null)
                        list.add(t);
                }
            }
            return list;
        }
    }

节点与节点之间通过前一个节点的nextWaiter指向下一个节点。所以不同于AbstractQueuedSynchronizer中的等待队列(同步队列)是双向的,ConditionObject中的等待队列是单向链表。

Condition等待队列中应该长如下这个样子:

image-20230620194603391

常用方法

acquire

获取锁

    /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        //尝试获取锁
        if (!tryAcquire(arg) &&
            //addWaiter:将线程封装到 Node 节点并添加到队列尾部
            //acquireQueued查看当前排队的 Node 是否在队列的前面,如果在前面,尝试获取锁资源。如果没在前面,线程进入到阻塞状态。
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            //中断当前线程,等待唤醒
            selfInterrupt();
    }

①尝试获取锁

②如果获取锁没有成功,则构造节点加入等待队列中

③如果节点入队列成功,则线程自我中断让出资源。

tryAcquire

尝试获取锁

    /**
     * Attempts to acquire in exclusive mode. This method should query
     * if the state of the object permits it to be acquired in the
     * exclusive mode, and if so to acquire it.
     *
     * <p>This method is always invoked by the thread performing
     * acquire.  If this method reports failure, the acquire method
     * may queue the thread, if it is not already queued, until it is
     * signalled by a release from some other thread. This can be used
     * to implement method {@link Lock#tryLock()}.
     *
     * <p>The default
     * implementation throws {@link UnsupportedOperationException}.
     *
     * @param arg the acquire argument. This value is always the one
     *        passed to an acquire method, or is the value saved on entry
     *        to a condition wait.  The value is otherwise uninterpreted
     *        and can represent anything you like.
     * @return {@code true} if successful. Upon success, this object has
     *         been acquired.
     * @throws IllegalMonitorStateException if acquiring would place this
     *         synchronizer in an illegal state. This exception must be
     *         thrown in a consistent fashion for synchronization to work
     *         correctly.
     * @throws UnsupportedOperationException if exclusive mode is not supported
     */
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

我们找到tryAcquire方法发现,直接抛出了一个异常。将tryAcquire的具体实现预留给各种锁来实现

addWaiter

当获取锁失败时,就开始构造节点入队列了。

    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        //创建节点 将当前线程封装为Node对象,当mode为null,代表互斥锁
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        //快速入队,入队不成功则交由enq来实现
        //当前尾节点
        Node pred = tail;
        //如果队列没有节点 创建一个哨兵节点
        if (pred != null) {
            // 当前线程Node节点的prev指向pred节点
            node.prev = pred;
            // 以CAS方式,尝试将tail指向node节点
            if (compareAndSetTail(pred, node)) {
                // 将pred的next指向node
                pred.next = node;
                return node;
            }
        }
        // 如果上述方式,CAS操作失败,导致加入到AQS末尾失败,就基于enq的方式添加到AQS队列
        enq(node);
        return node;
    }

enq

    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        // 死循环,直到插入成功
        for (;;) {
            Node t = tail;
            //尾节点为null 即等待队列为空
            if (t == null) { // Must initialize
                //创建哨兵节点  如果尾节点为null,说明同步队列还未初始化,则CAS操作新建头节点
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
              // 将node的prev指向当前的tail节点
                node.prev = t;
                // CAS尝试将node变成tail节点
                if (compareAndSetTail(t, node)) {
                    // 将之前尾节点的next指向要插入的节点
                    t.next = node;
                    return t;
                }
            }
        }
    }

acquireQueued

    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     *线程被构造为节点进入队列后,接下来就是对队列中的节点进行获取锁的处理
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            //中断标识
            boolean interrupted = false;
            for (;;) {
                // 获取node的前驱节点
                final Node p = node.predecessor();
                //如果前一个节点为head头节点,说明当前节点应该获取锁,尝试获取锁资源
                if (p == head && tryAcquire(arg)) {
                   // 尝试获取锁资源成功,将头节点指向获取锁成功的节点,清空节点的thread和prev了,该节点成为新的哨兵节点
                    setHead(node);
                    // 将之前的头节点的next指向null,帮助快速GC
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //如果前一个节点不是head头节点或者获取锁资源失败
                if (shouldParkAfterFailedAcquire(p, node) &&
                        //尝试将线程park
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                //获取锁失败,取消竞争锁
                cancelAcquire(node);
        }
    }

cancelAcquire

取消获取锁

    /**
     * Cancels an ongoing attempt to acquire.
     *
     * @param node the node
     */
    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        //如果待取消节点(node)为null,则直接返回。
        if (node == null)
            return;
        // 将node的thread置为null;取消节点对线程的引用
        node.thread = null;

        // Skip cancelled predecessors
        //将node节点往前驱节点方向所有连续的取消状态的节点出队
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        //1只有这里才设置为已取消状态
        node.waitStatus = Node.CANCELLED;

        // If we are the tail, remove ourselves.
        //2如果当前节点为尾节点,当前节点出队,
        if (node == tail && compareAndSetTail(node, pred)) {
            //将前节点(这个时候已经是队尾节点)的next指向null,这里不保证它一定会成功,因为可能有其它新节点加入,用CAS方式避免将覆盖其它线程的操作。
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            //3如果当前节点不是尾节点,也不是头节点
            if (pred != head &&
                 //前节点状态已经为Node.SIGNAL  或者 将前节点状态更改为Node.SIGNAL成功
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                //当前节点的next节点存在并且没有取消
                if (next != null && next.waitStatus <= 0)
                    //则将前节点的next指向node节点的next
                    compareAndSetNext(pred, predNext, next);
            } else {
                //唤醒后继节点
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }
unparkSuccessor

唤醒后继节点

    /**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
         //获取node节点的状态,在唤醒后继之前,先将node节点状态改为0
        int ws = node.waitStatus;
        if (ws < 0)
            //如果node节点状态小于0,cas修改为0
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        //当前节点的next节点
        Node s = node.next;
        //如果node的后继节点被取消,则从队尾向node节点遍历,找到距离node节点最近的waitStatus<=0的节点,然后唤醒s节点
        if (s == null || s.waitStatus > 0) {
        // next节点不需要唤醒,需要唤醒next的next
            s = null;
            // 从尾部往前找,找到状态正常的节点。(小于等于0代表正常状态)
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //经过循环的获取,如果拿到状态正常的节点,并且不为null
        if (s != null)
            //线程唤醒
            LockSupport.unpark(s.thread);
    }

release

释放锁

    /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     * release利用tryRelease先进行释放锁,tryRealse是由子类实现的方法,可以确保线程是获取到锁的,并进行释放锁,
     * unparkSuccessor主要是利用LockSupport.unpark唤醒线程;
     */
    public final boolean release(int arg) {
        //尝试释放锁,这个方法是由子类实现的方法
        if (tryRelease(arg)) {
            Node h = head;
            //头节点不为如果节点状态不是CANCELLED,也就是线程没有被取消,也就是不为0的,就进行唤醒
            if (h != null && h.waitStatus != 0)
                //唤醒线程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease

尝试释放锁

    /**
     * Attempts to set the state to reflect a release in exclusive
     * mode.
     *
     * <p>This method is always invoked by the thread performing release.
     *
     * <p>The default implementation throws
     * {@link UnsupportedOperationException}.
     *
     * @param arg the release argument. This value is always the one
     *        passed to a release method, or the current state value upon
     *        entry to a condition wait.  The value is otherwise
     *        uninterpreted and can represent anything you like.
     * @return {@code true} if this object is now in a fully released
     *         state, so that any waiting threads may attempt to acquire;
     *         and {@code false} otherwise.
     * @throws IllegalMonitorStateException if releasing would place this
     *         synchronizer in an illegal state. This exception must be
     *         thrown in a consistent fashion for synchronization to work
     *         correctly.
     * @throws UnsupportedOperationException if exclusive mode is not supported
     */
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

交由子类实现

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/30461.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

使用omp并行技术加速最短路径算法-迪杰斯特拉(Dijkstra)算法(记录最短路径和距离)

原理&#xff1a; Dijkstra算法是解决**单源最短路径**问题的**贪心算法** 它先求出长度最短的一条路径&#xff0c;再参照该最短路径求出长度次短的一条路径 直到求出从源点到其他各个顶点的最短路径。 首先假定源点为u&#xff0c;顶点集合V被划分为两部分&#xff1a;集合…

【玩转Linux操作】Linux服务管理

&#x1f38a;专栏【玩转Linux操作】 &#x1f354;喜欢的诗句&#xff1a;更喜岷山千里雪 三军过后尽开颜。 &#x1f386;音乐分享【如愿】 大一同学小吉&#xff0c;欢迎并且感谢大家指出我的问题&#x1f970; 文章目录 &#x1f354;服务(service)管理⭐service管理指令 &…

chatgpt赋能python:Python如何快速提取指定行和列的数据?

Python如何快速提取指定行和列的数据&#xff1f; 在进行数据分析和处理时&#xff0c;常常需要从海量数据中筛选出所需的数据。这时&#xff0c;Python是一款非常强大的工具&#xff0c;可以方便地进行大规模数据清洗和筛选。本文将介绍如何使用Python快速提取指定行和列的数…

chatgpt赋能python:Python提取指定位置字符

Python 提取指定位置字符 Python 是一种高级程序语言&#xff0c;其易读性、简单易学性和易维护性使其成为最受欢迎的编程语言之一。它可以用于各种数据分析和科学计算&#xff0c;包括搜索引擎优化&#xff08;SEO&#xff09;。 在SEO中&#xff0c;提取和处理数据是一个重…

监听关闭浏览器触发事件

关闭和刷新页面都会触发&#xff0c;一般都不用来做弹窗提示&#xff0c;一般用来做数据操作 // 监听页面关闭 清除本地缓存 window.onbeforeunload function (e) { localStorage.removeItem("statement"); }; // 监听页面关闭 提醒是否关闭 现在不允许自定义内容了…

【深度学习】5-1 与学习相关的技巧 - 参数的更新(Momentum,AdaGrad, Adam )

神经网络的学习的目的是找到使损失函数的值尽可能小的参数。这是寻找最优参数的问题&#xff0c;解决这个问题的过程称为最优化。 但是神经网络的最优化问题非常难。这是因为参数空间非常复杂&#xff0c;无法轻易找到最优解。而且&#xff0c;在深度神经网络中&#xff0c;参…

selenium.chrome怎么写扩展拦截或转发请求?

Selenium WebDriver 是一组开源 API&#xff0c;用于自动测试 Web 应用程序&#xff0c;利用它可以通过代码来控制chrome浏览器&#xff01; 有时候我们需要mock接口的返回&#xff0c;或者拦截和转发请求&#xff0c;今天就来实现这个功能。 代码已开源&#xff1a; https:/…

9k字长文理解Transformer: Attention Is All You Need

作者&#xff1a;猛码Memmat 目录 Abstract1 Introduction2 Background3 Model Architecture3.1 Encoder and Decoder Stacks3.2 Attention3.2.1 Scaled Dot-Product Attention3.2.2 Multi-Head Attention3.2.3 Applications of Attention in our Model 3.3 Position-wise Feed…

C++基础(6)——类和对象(运算符重载)

前言 本文主要介绍了C中运算符重载的基本知识。 4.5.1&#xff1a;加号运算符重载&#xff08;成员函数和全局函数都可实现&#xff09; 运算符重载&#xff1a;对已有的运算符重新进行定义&#xff0c;赋予其另一种功能&#xff0c;以适应不同的数据类型 1&#xff1a;成员…

防火墙日志记录和监控在网络安全中的重要性

防火墙监视进出网络的流量&#xff0c;并保护部署网络的网络免受恶意流量的侵害。它是一个网络安全系统&#xff0c;根据一些预定义的规则监控传入和传出的流量。它以日志的形式记录有关如何管理流量的信息。日志数据包含流量的源和目标 IP 地址、端口号、协议等。为了有效地保…

Git系列:运用Git创建空白分支进行项目相关文档管理

文章目录 起因一、为什么会选择Git分支二、Git分支的简单介绍和好处三、本次的具体操作1.$git checkout --orphan XXX2.删除当前分支里的内容3.提交新的分支 总结 起因 项目管理过程中没有做好相关文档管理&#xff0c;比如需求&#xff0c;开发&#xff0c;测试等文档&#x…

科技云报道:大模型时代,AI基础软件机会何在?

科技云报道原创。 大模型时代&#xff0c;离不开算力&#xff0c;算法、数据的喂养。如果将视角放至整个产业链上&#xff0c;算法背后&#xff0c;还有一个关键要素值得被关注&#xff0c;那就是AI基础软件。 算法是实现AI功能的关键&#xff0c;而基础软件则为算法提供运行…

React项目引入Arco Design,以及Arco Design Pro 架构

创建项目 创建 react-arco 项目 pnpm create vite my-vue-app --template react安装 arco-design/web-react 安装 react 版的 arco-design 基础使用 添加一个按钮&#xff0c;App.tsx import "./App.css"; import { Button } from "arco-design/web-react…

CH2023、Adobe Character Animator 2023(动画角色制作软件)下载教程、安装教程

最后附下载地址 Adobe CH简介&#xff1a; Adobe Character Animator是一款基于动画制作的软件&#xff0c;它可以将手绘的角色通过摄像头或麦克风捕捉到的实时动作转化为动画效果。该软件结合了人工智能和动画技术&#xff0c;可以快速创建高质量的角色动画&#xff0c;并且…

2023年的深度学习入门指南(17) - 深度学习的硬件加速技术

2023年的深度学习入门指南(17) - 深度学习的硬件加速技术 有了前面的知识之后&#xff0c;想必大家对于算力需求的理解已经越来越深刻了。 除了使用CPU&#xff0c;GPU这样的通用器件之外&#xff0c;采用专用的硬件来进行加速是一个大家都能想到的选择。 其中的代表器件就是…

杂记 | 使用Docker和Nginx为网站添加HTTPS访问功能

文章目录 01 准备工作1.1 HTTPS介绍1.2 准备工作 02 编写nginx.conf03 使用docker启动nginx 01 准备工作 1.1 HTTPS介绍 HTTPS&#xff08;Hypertext Transfer Protocol Secure&#xff09;是一种通过加密通信保护网站数据传输的协议。它是 HTTP 协议的安全版本&#xff0c;通…

1.4 掌握Scala运算符

一、运算符等价于方法 &#xff08;一&#xff09;运算符即方法 op运算符与.op方法调用是等价的&#xff0c;op表示运算符&#xff1a;、-、*、/…… 演示x y与x.(y)的等价 &#xff08;二&#xff09;方法即运算符 1、单参方法 str.indexOf(‘a’) 与 str indexOf ‘a’…

stable-diffusion-webui的介绍与使用——Controlnet1.1

源码地址&#xff1a;https://github.com/lllyasviel/ControlNet | 最新版本 controlnet-v1.1 论文地址&#xff1a;2302.Adding Conditional Control to Text-to-Image Diffusion Models 扩展UI地址&#xff08;需先安装sd-webui&#xff09;&#xff1a;https://github.com/M…

【gcc, cmake, eigen, opencv,ubuntu】四.opencv安装和使用,获取opencv matiax 的指针

文章目录 ubuntu系统安装opencv1.下载opencv和opencv_contrib2.安装指导3.Linux 下 fatal error: opencv2/opencv.hpp: 没有那个文件或目录4.g 和cmake 编译使用opencv的程序5.opencv,eigen速度比较6.opencv常用类型符号7.获取opencv matiax 的指针 ubuntu系统安装opencv 1.下…

设计模式大全

使用设计模式的目的&#xff1a; 程序猿在编码的过程中面临着来自耦合性、内聚性、可维护性、可扩展性、重用性、灵活性等多方面的挑战。设计模式是为了让程序具有更好的&#xff1a; 1&#xff09;重用性&#xff0c;即相同功能的代码编写一次即可&#xff0c;不用重复编写 …