一、什么是AQS
AQS的全称为(AbstractQueuedSynchronizer)抽象的队列式的同步器,用来构建锁或者其他同步组件的基础框架类。使⽤AQS能简单且⾼效地构造出应⽤⼴泛的⼤量的同步器,如:基于AQS实现的lock、CountDownLatch、CyclicBarrier、Semaphore需解决的问题。
AQS核⼼思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的⼯作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占⽤,那么就需要⼀套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是⽤CLH(虚拟的双向队列)队列锁实现的,即将暂时获取不到锁的线程加⼊到队列中。
AQS 的源码中 方法很多,但主要做了三件事情:
1、管理 同步状态;
2、维护 同步队列;
3、阻塞和唤醒 线程。
另外,从行为上来区分就是 获取锁 和 释放锁,从模式上来区分就是 独占锁 和 共享锁。
二、AQS的实现原理
1、同步状态(State)
volatile int state;
用于存储同步状态,通过getState()
、setState(int newState)
、compareAndSetState(int expect, int update)
方法进行访问和原子操作
/**
* 同步状态
*/
private volatile int state;
/**
* 返回同步状态的当前值。此操作具有读取的 volatile 内存语义
* 返回值: 当前状态值
*/
protected final int getState() {
return state;
}
/**
* 设置同步状态的值。此操作具有写入的 volatile 内存语义
* 形参: newState – 新的状态值
*/
protected final void setState(int newState) {
state = newState;
}
/**
* 如果当前状态值等于预期值,则以原子方式将同步状态设置为给定的更新值。此操作具有读取和写入的volatile 内存语义。
* 形参: expect – 期望值 update – 新价值
* 返回值: true 如果成功。错误返回表示实际值不等于预期值。
*/
protected final boolean compareAndSetState(int expect, int update) {
return U.compareAndSetInt(this, STATE, expect, update);
}
2、线程排队与阻塞
队列采用CLH变体的FIFO双向链表实现,每个节点仅需持有前驱节点的引用,由尾部节点维护整个队列。
使用内部类Node
表示等待队列中的节点,包含线程引用、等待状态等信息。
3、同步模式
独占模式(Exclusive):只有一个线程能持有同步状态,如ReentrantLock
。
共享模式(Shared):多个线程可以同时持有同步状态,如Semaphore
。
4、阻塞与唤醒机制
使用java.util.concurrent.locks.LockSupport.park()
和unpark(Thread thread)
方法实现线程阻塞与唤醒。
5、公平与非公平策略
子类通过不同的tryAcquire
实现逻辑控制是否遵循FIFO原则。
6、关键方法(acquire)
(1)tryAcquire(int arg)
和 tryRelease(int arg)
:子类实现,分别用于独占模式下尝试获取和释放同步状态。
Acquire:
while (!tryAcquire(arg)) {
enqueue thread if it is not already queued;
possibly block current thread;
}
Release:
if (tryRelease(arg))
unblock the first queued thread;
/**
尝试以独占模式获取。此方法应查询对象的状态是否允许在独占模式下获取它,如果允许,则查询获取它。
此方法始终由执行 acquire 的线程调用。如果此方法报告失败,则 acquire 方法可能会对线程进行排队(如果尚未排队),直到其他线程发出释放信号。
这可以用于实现方法 Lock.tryLock()。默认实现抛出 UnsupportedOperationException.
形参: arg – 获取参数。此值始终是传递给 acquire 方法的值,或者是在进入条件等待时保存的值。该值在其他方面是未解释的,可以表示您喜欢的任何内容。
返回值: true 如果成功。成功后,该对象已被收购。
抛出: IllegalMonitorStateException – 如果获取会使此同步器处于非法状态。必须以一致的方式引发此异常,同步才能正常工作。UnsupportedOperationException – 如果不支持独占模式
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
/**
尝试设置状态以反映独占模式下的发布。
此方法始终由执行释放的线程调用。
默认实现抛出 UnsupportedOperationException.
形参: arg – 释放参数。此值始终是传递给发布方法的值,或者是进入条件等待时的当前状态值。该值在其他方面是未解释的,可以表示您喜欢的任何内容。
返回值: true 如果此对象现在处于完全释放状态,则任何等待的线程都可以尝试获取;否则 false 。
抛出: IllegalMonitorStateException – 如果释放会使此同步器处于非法状态。必须以一致的方式引发此异常,同步才能正常工作。UnsupportedOperationException – 如果不支持独占模式
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
(2)tryAcquireShared(int arg)
和 tryReleaseShared(int arg)
:子类实现,分别用于共享模式下尝试获取和释放同步状态。
/**
尝试在共享模式下获取。此方法应查询对象的状态是否允许在共享模式下获取它,如果允许,则查询获取它。
此方法始终由执行 acquire 的线程调用。如果此方法报告失败,则 acquire 方法可能会对线程进行排队(如果尚未排队),直到其他线程发出释放信号。
默认实现抛出 UnsupportedOperationException.
形参: arg – 获取参数。此值始终是传递给 acquire 方法的值,或者是在进入条件等待时保存的值。该值在其他方面是未解释的,可以表示您喜欢的任何内容。
返回值: 失败时为负值;如果共享模式下的采集成功,但后续的共享模式采集无法成功,则为零;如果共享模式下的采集成功,并且后续共享模式采集也可能成功,则为正值,在这种情况下,后续等待线程必须检查可用性。(支持三种不同的返回值,使此方法能够在获取有时仅以独占方式执行操作的上下文中使用。成功后,该对象已被收购。
抛出: IllegalMonitorStateException – 如果获取会使此同步器处于非法状态。必须以一致的方式引发此异常,同步才能正常工作。UnsupportedOperationException – 如果不支持共享模式
*/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
/**
尝试设置状态以反映共享模式下的发布。
此方法始终由执行释放的线程调用。
默认实现抛出 UnsupportedOperationException.
形参: arg – 释放参数。此值始终是传递给发布方法的值,或者是进入条件等待时的当前状态值。该值在其他方面是未解释的,可以表示您喜欢的任何内容。
返回值: true 如果此共享模式版本可能允许等待获取(共享或独占)成功;以及 false 其他
抛出: IllegalMonitorStateException – 如果释放会使此同步器处于非法状态。必须以一致的方式引发此异常,同步才能正常工作。UnsupportedOperationException – 如果不支持共享模式
*/
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
(3)acquire(int arg)
、acquireInterruptibly(int arg)
、tryAcquireNanos(int arg, long nanosTimeout)
:独占模式下阻塞获取同步状态,支持中断和超时。
release(int arg)
:独占模式下释放同步状态。
/**
主 acquire 方法,由所有导出的 acquire 方法调用。
形参:
node – 除非重新获取条件,否则为 null arg – 获取参数 shared – 如果共享模式为真,否则为独占 interruptible – 如果中止并在中断时返回负数 timed – 如果为 true,则使用定时等待 time – 如果定时,则超时的 System.nanoTime 值
返回值:
如果获取,则为正,如果超时,则为 0,如果中断,则为负
*/
final int acquire(Node node, int arg, boolean shared,
boolean interruptible, boolean timed, long time) {
Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0; // retries upon unpark of first thread
boolean interrupted = false, first = false;
Node pred = null; // predecessor of node when enqueued
/*
* Repeatedly:
* Check if node now first
* if so, ensure head stable, else ensure valid predecessor
* if node is first or not yet enqueued, try acquiring
* else if node not yet created, create it
* else if not yet enqueued, try once to enqueue
* else if woken from park, retry (up to postSpins times)
* else if WAITING status not set, set and retry
* else park and clear WAITING status, and check cancellation
*/
for (;;) {
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred))) {
if (pred.status < 0) {
cleanQueue(); // predecessor cancelled
continue;
} else if (pred.prev == null) {
Thread.onSpinWait(); // ensure serialization
continue;
}
}
if (first || pred == null) {
boolean acquired;
try {
if (shared)
acquired = (tryAcquireShared(arg) >= 0);
else
acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
if (acquired) {
if (first) {
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared)
signalNextIfShared(node);
if (interrupted)
current.interrupt();
}
return 1;
}
}
if (node == null) { // allocate; retry before enqueue
if (shared)
node = new SharedNode();
else
node = new ExclusiveNode();
} else if (pred == null) { // try to enqueue
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null)
tryInitializeHead();
else if (!casTail(t, node))
node.setPrevRelaxed(null); // back out
else
t.next = node;
} else if (first && spins != 0) {
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();
} else if (node.status == 0) {
node.status = WAITING; // enable signal and recheck
} else {
long nanos;
spins = postSpins = (byte)((postSpins << 1) | 1);
if (!timed)
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
LockSupport.parkNanos(this, nanos);
else
break;
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
}
return cancelAcquire(node, interrupted, interruptible);
}
/**
以独占模式获取,中断时中止。通过首先检查中断状态,然后至少调用一次 tryAcquire,成功返回来实现。否则,线程将排队,可能会反复阻塞和取消阻塞,直到 tryAcquire 成功或线程中断。此方法可用于实现方法 Lock.lockInterruptibly。
形参: arg – 获取参数。此值被传达给 tryAcquire ,但在其他方面不会被解释,并且可以表示您喜欢的任何内容。
抛出: InterruptedException – 如果当前线程中断
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted() ||
(!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0))
throw new InterruptedException();
}
/**
尝试在独占模式下获取,如果中断则中止,如果给定的超时时间过长,则失败。通过首先检查中断状态,然后至少调用一次 tryAcquire,成功返回来实现。否则,线程将排队,可能会重复阻塞和取消阻塞,直到tryAcquire 成功或线程中断或超时结束。此方法可用于实现方法 Lock.tryLock(long, TimeUnit)。
形参: arg – 获取参数。此值被传达给 tryAcquire ,但在其他方面不会被解释,并且可以表示您喜欢的任何内容。 nanosTimeout – 等待的最大纳秒数
返回值: true 如果获得; false 如果超时
抛出: InterruptedException –如果当前线程中断
*/
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (!Thread.interrupted()) {
if (tryAcquire(arg))
return true;
if (nanosTimeout <= 0L)
return false;
int stat = acquire(null, arg, false, true, true,
System.nanoTime() + nanosTimeout);
if (stat > 0)
return true;
if (stat == 0)
return false;
}
throw new InterruptedException();
}
/**
以独占模式发布。通过在返回 true 时 tryRelease 取消阻止一个或多个线程来实现。此方法可用于实现方法 Lock.unlock。
形参: arg – 释放参数。此值被传达给 tryRelease ,但在其他方面不会被解释,并且可以表示您喜欢的任何内容。
返回值: 返回的值 tryRelease
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
signalNext(head);
return true;
}
return false;
}
(4)acquireShared(int arg)
、acquireSharedInterruptibly(int arg)
、tryAcquireSharedNanos(int arg, long nanosTimeout)
:共享模式下阻塞获取同步状态,支持中断和超时。
releaseShared(int arg)
:共享模式下释放同步状态。
/**
在共享模式下获取,忽略中断。通过至少一次首次调用 tryAcquireShared来实现,成功返回。否则,线程将排队,可能会重复阻塞和取消阻塞,直到 tryAcquireShared 调用成功。
形参: arg – 获取参数。此值被传达给 tryAcquireShared ,但在其他方面不会被解释,并且可以表示您喜欢的任何内容
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
acquire(null, arg, true, false, false, 0L);
}
/**
在共享模式下获取,如果中断则中止。通过首先检查中断状态,然后至少调用一次 tryAcquireShared,成功返回来实现。否则,线程将排队,可能会反复阻塞和取消阻塞,直到 tryAcquireShared 成功或线程中断。
形参: arg – 获取参数。此值被传达给 tryAcquireShared ,但在其他方面不会被解释,并且可以表示您喜欢的任何内容。
抛出: InterruptedException – 如果当前线程中断
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted() ||
(tryAcquireShared(arg) < 0 &&
acquire(null, arg, true, true, false, 0L) < 0))
throw new InterruptedException();
}
/**
尝试在共享模式下获取,如果中断则中止,如果给定超时过,则失败。通过首先检查中断状态,然后至少调用一次 tryAcquireShared,成功返回来实现。否则,线程将排队,可能会重复阻塞和取消阻塞,直到 tryAcquireShared 成功或线程中断或超时结束。
形参: arg – 获取参数。此值被传达给 tryAcquireShared ,但在其他方面不会被解释,并且可以表示您喜欢的任何内容。 nanosTimeout – 等待的最大纳秒数
返回值: true 如果获得; false 如果超时
抛出: InterruptedException – 如果当前线程中断
*/
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (!Thread.interrupted()) {
if (tryAcquireShared(arg) >= 0)
return true;
if (nanosTimeout <= 0L)
return false;
int stat = acquire(null, arg, true, true, true,
System.nanoTime() + nanosTimeout);
if (stat > 0)
return true;
if (stat == 0)
return false;
}
throw new InterruptedException();
}
/**
在共享模式下发布。通过在返回 true 时 tryReleaseShared 取消阻止一个或多个线程来实现。
形参: arg – 释放参数。此值被传达给 tryReleaseShared ,但在其他方面不会被解释,并且可以表示您喜欢的任何内容。
返回值: 返回的值 tryReleaseShared
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
signalNext(head);
return true;
}
return false;
}
三、CountDownLatch、CompletableFuture、CyclicBarrier、Semaphore
CountDownLatch:
通过计数法(倒计时器),让一些线程堵塞直到另一个线程完成一系列操作后才被唤醒;该⼯具通常⽤来控制线程等待,它可以让某⼀个线程等待直到倒计时结束,再开始执⾏。具体可以使用countDownLatch.await()来等待结果。多用于多线程信息汇总。
CompletableFuture:
通过设置参数,可以完成CountDownLatch同样的多平台响应问题,但是可以针对其中部分返回结果做更加灵活的展示。
CyclicBarrier:
字面意思是可循环(Cyclic)使用的屏障(Barrier)。他要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活,线程进入屏障通过CyclicBarrier的await()方法。可以用于批量发送消息队列信息、异步限流。
Semaphore:
信号量主要用于两个目的,一个是用于多个共享资源的互斥作用,另一个用于并发线程数的控制。SpringHystrix限流的思想
具体使用案例:保证多个线程的执行顺序,参考我的另一篇文章:北京--面试1(设计模式、反射、队列、线程、锁、Linux命令、JVM调优参数)-CSDN博客