AQS
但凡了解多线程的对于AQS应该都有所耳闻吧(我第一次知道AQS还是在一次面试中,那次被虐的老惨了),AQS即AbstractQueuedSynchronizer队列同步器,是一个抽象类,它是从java5开始的同步组件的基础框架,它仅仅只是定义了FIFO同步队列、同步状态的获取和释放方法,很多同步类都继承该类来实现同步逻辑,子类通过继承AQS并实现它的抽象方法来管理同步状态
是构建其他同步组件的基础框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch等)
源码解析
主要维护了一个volatile修饰的state同步状态,当线程调用lock方法时,如果state=0,说明该资源没有被占有,可以获得锁;如果state=1,说明该资源已被占用,需要加入同步队列等待。一个FIFO双向链表的同步队列,来完成线程获取锁的排队工作,线程获取锁失败后,会被添加至队尾。,FIFO双向链表,通过head节点和tail节点来记录队首和队尾元素,主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,可重写的方法tryAcquire(int arg)、tryRelease(int arg)、tryAcquireShared(int arg)、tryReleaseShared(int arg)、isHeldExclusively()
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 通过内置的FIFO同步队列来完成资源获取线程的排队工作,如果当前线程获取同步状态失败(锁)时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态
// 头结点
private transient volatile Node head;
// 尾结点
private transient volatile Node tail;
// 线程同步的关键
// 同步状态信息state,可以通过getState、setState、compareAndSetState来获取或者修改值
// 对于不同的实现类,其用法不同
// 当state>0时表示已经获取了锁,当state = 0时表示释放了锁
private volatile int state;
// 获取当前同步状态
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
// 使用CAS设置当前状态,可以保证原子性
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
static final long spinForTimeoutThreshold = 1000L;
// 获取锁 独占锁
public final void acquire(int arg) {
// tryAcquire需要具体子类去实现,保证线程安全的获取同步状态
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 同步状态获取失败,通过addWaiter将节点加入到同步队列尾部,acquireQueued以无限循环的方式获取同步状态
selfInterrupt();
}
// 以不可中断方式获取锁
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取给定节点的前驱节点,这里要么是头部节点head,要么是其他排队的节点
final Node p = node.predecessor();
// 如果node的前驱结点p是head,表示node是第二个节点,可以尝试去获取资源
if (p == head && tryAcquire(arg)) {
// 拿到资源后,将head指向该节点
setHead(node);
p.next = null; // help GC
// 设置获取成功状态
failed = false;
return interrupted;
}
// 走到这里说明锁的状态不可以获取,判断是否可以挂起当前线程,如果判断为真,则可以挂起当前线程,否则继续循环
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 阻塞等待被唤醒
interrupted = true;
}
} finally {
// 如果获取失败则取消获取
if (failed)
cancelAcquire(node);
}
}
private Node addWaiter(Node mode) {
// 生成该线程对应的Node节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 获取当前尾结点
Node pred = tail;
if (pred != null) { // 尾结点不为空
// 当前节点的前驱节点指向尾结点
node.prev = pred;
// 设置当前节点为尾结点
if (compareAndSetTail(pred, node)) {
// 旧的尾结点的后继节点指向当前节点
pred.next = node;
return node;
}
}
// 如果等待队列为空或者上述CAS操作失败,则自旋CAS插入
//(可能有多个线程并发加入队尾产生竞争)
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 第一次循环,尾结点为null,需要进行初始化
if (t == null) { // Must initialize
// 构建一个哨兵节点作为头节点
if (compareAndSetHead(new Node()))
// 尾结点同样指向哨兵节点
tail = head;
} else { // 尾结点不为空
// 将该节点的前驱节点指向当前的尾结点
node.prev = t;
// 设置该节点为尾结点
if (compareAndSetTail(t, node)) {
// 原本的尾结点的后继节点设置为当前节点
t.next = node;
return t;
}
}
}
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
// 释放锁
public final boolean release(int arg) {
if (tryRelease(arg)) {
// 获取head节点
Node h = head;
// 如果head节点不为空并且等待状态不为0则唤醒后继节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 唤醒后继节点
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 获取给定节点的等待状态
int ws = node.waitStatus;
// 将等待状态更新为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 获取给定节点的后继节点
Node s = node.next;
// 后继节点为空或者等待状态为CANCELLED
if (s == null || s.waitStatus > 0) {
s = null;
// 从后向前遍历找到第一个不是CANCELLED状态的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒
if (s != null)
LockSupport.unpark(s.thread);
}
// 判断是否可以将当前节点挂起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱结点的等待状态
int ws = pred.waitStatus;
// 如果等待状态为SIGNAL,表明前驱结点会唤醒当前节点,可以挂起
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 大于0的状态目前是CANCELLED
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// 清理队列中已取消的前驱节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // 走到这里可能是CONDITION和PROPAGATE或者是0
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 尝试将前驱结点的等待状态设置为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
}
Node节点
队列中的元素存储的是Node节点
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
// 标记该线程是获取共享资源时被阻塞挂起后放入AQS队列的
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
// 标记线程是获取独占锁资源时被挂起后放入AQS队列的
static final Node EXCLUSIVE = null;
// 当前线程的等待状态
// SIGNAL 线程需要被唤醒 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
// CANCELLED 线程被取消了 由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待
// CONDITION 线程在条件队列里面等待 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取
// PROPAGATE 释放共享资源时需要通知其他节点 下一次共享式同步状态获取将会无条件的被传播下去
// 0
volatile int waitStatus;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 记录当前节点的前驱节点
volatile Node prev;
// 记录当前节点的后继节点
volatile Node next;
// 当前节点持有的线程
volatile Thread thread;
// 等待队列中的后继节点,如果当前节点是共享的,那么这个字段将是一个SHARED常量
Node nextWaiter;
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
ConditionObject
看到在AQS中还有一个ConditionObject类,其实现了Condition接口
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }
// Internal methods
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
// 向后移一位
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
/**
* Unlinks cancelled waiter nodes from condition queue.
* Called only while holding lock. This is called when
* cancellation occurred during condition wait, and upon
* insertion of a new waiter when lastWaiter is seen to have
* been cancelled. This method is needed to avoid garbage
* retention in the absence of signals. So even though it may
* require a full traversal, it comes into play only when
* timeouts or cancellations occur in the absence of
* signals. It traverses all nodes rather than stopping at a
* particular target to unlink all pointers to garbage nodes
* without requiring many re-traversals during cancellation
* storms.
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
// public methods
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
// 唤醒条件队列中的下一个节点
public final void signal() {
// 判断当前线程是否持有锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
/**
* Moves all threads from the wait queue for this condition to
* the wait queue for the owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
/**
* Implements uninterruptible condition wait.
* <ol>
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* </ol>
*/
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
/*
* For interruptible waits, we need to track whether to throw
* InterruptedException, if interrupted while blocked on
* condition, versus reinterrupt current thread, if
* interrupted while blocked waiting to re-acquire.
*/
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1;
/**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
/**
* Throws InterruptedException, reinterrupts current thread, or
* does nothing, depending on mode.
*/
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程添加到条件队列尾部
Node node = addConditionWaiter();
// 在进入条件等待之前先完全释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 线程一直进行条件等待
while (!isOnSyncQueue(node)) {
// 进行条件等待的都在这里挂起,等待被唤醒
LockSupport.park(this);
// 当前线程被唤醒后检查是否被中断,如果被中断表示节点取消条件等待,移出条件队列
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
/**
* Implements timed condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
/**
* Implements absolute timed condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* <li> If timed out while blocked in step 4, return false, else true.
* </ol>
*/
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
/**
* Implements timed condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* <li> If timed out while blocked in step 4, return false, else true.
* </ol>
*/
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
// support for instrumentation
/**
* Returns true if this condition was created by the given
* synchronization object.
*
* @return {@code true} if owned
*/
final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == AbstractQueuedSynchronizer.this;
}
/**
* Queries whether any threads are waiting on this condition.
* Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
*
* @return {@code true} if there are any waiting threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}
/**
* Returns an estimate of the number of threads waiting on
* this condition.
* Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
*
* @return the estimated number of waiting threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
protected final int getWaitQueueLength() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
}
/**
* Returns a collection containing those threads that may be
* waiting on this Condition.
* Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.
*
* @return the collection of threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
protected final Collection<Thread> getWaitingThreads() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION) {
Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
}
LockSupport工具类
在AQS中多次使用到了LockSupport来进行操作,LockSupport是用来干什么的呢?LockSupport定义了一组公共静态方法,提供了最基本的线程阻塞和唤醒功能,也成为了构建同步组件的基础工具,主要作用是来挂起和唤醒线程,LockSupport的底层是使用Unsafe实现的
public class LockSupport {
private LockSupport() {} // Cannot be instantiated.
private static void setBlocker(Thread t, Object arg) {
// Even though volatile, hotspot doesn't need a write barrier here.
UNSAFE.putObject(t, parkBlockerOffset, arg);
}
// 唤醒线程,如果thread之前因调用park()方法而被挂起,则调用unpark后,该线程会被唤醒;如果Thread之前没有调用park,则调用unpark后,再次调用park方法,会立即返回
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
// 当线程在没有持有许可的情况下调用park方法而被阻塞挂起,blocker对象会被记录到当前线程内部,此时可以通过getBlocker方法来获取blocker对象,推荐blocker对象设置为this,这样可以在打印线程堆栈时知道哪个类被阻塞了
// volatile Object parkBlocker; Thread类中有一个变量来专门存储该值
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
// 获取blocker对象
public static Object getBlocker(Thread t) {
if (t == null)
throw new NullPointerException();
return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
}
// 有超时时间的阻塞
public static void parkNanos(Object blocker, long nanos) {
if (nanos > 0) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, nanos);
setBlocker(t, null);
}
}
// deadline的单位是ms,是一个时间戳,表示阻塞到什么时候
public static void parkUntil(Object blocker, long deadline) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(true, deadline);
setBlocker(t, null);
}
// 调用park方法来获取许可,调用该方法的线程会被挂起阻塞,等待其他线程调用unpark(Thread thread)方法来唤醒该线程
public static void park() {
UNSAFE.park(false, 0L);
}
// 挂起指定时间
public static void parkNanos(long nanos) {
if (nanos > 0)
UNSAFE.park(false, nanos);
}
public static void parkUntil(long deadline) {
UNSAFE.park(true, deadline);
}
static final int nextSecondarySeed() {
int r;
Thread t = Thread.currentThread();
if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) {
r ^= r << 13; // xorshift
r ^= r >>> 17;
r ^= r << 5;
}
else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0)
r = 1; // avoid zero
UNSAFE.putInt(t, SECONDARY, r);
return r;
}
// Hotspot implementation via intrinsics API
private static final sun.misc.Unsafe UNSAFE;
private static final long parkBlockerOffset;
private static final long SEED;
private static final long PROBE;
private static final long SECONDARY;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> tk = Thread.class;
parkBlockerOffset = UNSAFE.objectFieldOffset
(tk.getDeclaredField("parkBlocker"));
SEED = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomSeed"));
PROBE = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomProbe"));
SECONDARY = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomSecondarySeed"));
} catch (Exception ex) { throw new Error(ex); }
}
}
https://zhhll.icu/2020/多线程/基础/6.AQS/
本文由 mdnice 多平台发布