文章目录
- AQS
- 一、AQS 概述
- 1、什么是 AQS ?
- 2、AQS 架构图
- 3、AQS 原理概述
- 4、同步状态state
- 5、FIFO等待队列
- 6、AQS 中的 Node
- 7、AQS 的特点
- 二、AQS 源码(以 ReentrantLock 为例)
- 1、基本实现
- 2、加锁
- 1)lock
- 2)addWaiter
- 【1】compareAndSetTail
- 【2】enq
- 3)tryAcquire
- 【1】非公平的 tryAcquire
- 【2】公平的 tryAcquire
- 【3】hasQueuedPredecessors
- 4)acquireQueued
- 【1】shouldParkAfterFailedAcquire
- 【2】parkAndCheckInterrupt
- 【3】cancelAcquire
- 5)小结
- 3、解锁
- 1)unlock
- 2)tryRelease
- 3)unparkSuccessor
- 4)小结
- 4、可重入
- 5、使用案例
- 三、AQS 应用
- 1、Semaphore(信号量)
- 1)基本实现
- 2)原理
- 3)使用案例
- 2、CountDownLatch(计数器闭锁)
- 1)基本实现
- 2)原理
- 3)使用案例
- 3、CyclicBarrier(循环栅栏)
- 1)基本实现
- 2)原理
- 3)使用案例
- 4、ReentrantReadWriteLock
- 1)基本实现
- 2)原理
- 3)使用案例
- 5、ThreadPoolExecutor
- 6、CountDownLatch vs CyclicBarrier
- 四、自定义同步工具
AQS
一、AQS 概述
1、什么是 AQS ?
AQS(AbstractQueuedSynchronizer)是 Java 中用于构建锁和其他同步器的框架,基于模板方法设计模式,提供了一个抽象类,提供了一些用于自定义同步器实现的protected方法。
package java.util.concurrent.locks;
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 尝试获取独占锁(arg为获取锁的次数)
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 尝试释放独占锁(arg为释放锁的次数)
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 尝试获取共享锁(arg为获取锁的次数)
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 尝试释放共享锁(arg为释放锁的次数)
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
// 该线程是否正在独占资源。只有用到Condition才需要去实现它。
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
}
可以看出 AQS 主要包含以下两种同步器:
- 独占式:一把锁只能被一个线程持有。需要实现
tryAcquire()
和tryRelease()
方法。 - 共享式:多个线程可以共享一把锁。需要实现
tryAcquireShared()
和tryReleaseShared()
方法。
AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock
。
2、AQS 架构图
参考文章:https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html
上图中有颜色的为Method,无颜色的为Attribution。
总的来说,AQS框架共分为五层,自上而下由浅入深,从AQS对外暴露的API到底层基础数据。
- 当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。
- 当自定义同步器进行加锁或者解锁操作时,先经过第一层的API进入AQS内部方法,然后经过第二层进行锁的获取。
- 接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理。
- 而这些处理方式,均依赖于第五层的基础数据提供层。
3、AQS 原理概述
AQS 核心思想是:
- 如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;
- 如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。
AQS 的原理基于以下几个核心概念:
-
同步状态state:
AQS中维护了一个同步状态(
volatile int state
),用于展示当前临界资源的获锁情况。 -
CAS操作:
AQS通过CAS操作来尝试修改共享资源的状态。CAS是一种原子操作,用来实现多线程并发的同步。
-
FIFO等待队列:
AQS中维护了一个FIFO(先进先出)的等待队列,主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。
AQS 的工作流程大致如下:
- 当一个线程尝试获取资源时,首先通过CAS操作来尝试修改同步状态state,如果成功获取资源,线程可以继续执行。
- 如果CAS操作失败(即资源被其他线程占用),当前线程会被阻塞,并加入到等待队列的尾部。
- 当持有资源的线程释放资源时,AQS会唤醒等待队列中的一个线程(通常是队首线程),让其尝试重新获取资源。
- 被唤醒的线程再次尝试获取资源,成功后可以继续执行,否则会继续被阻塞。
通过这种方式,AQS可以实现各种同步器,支持不同的并发场景,例如独占锁、共享锁、信号量等。
4、同步状态state
AQS中维护了一个同步状态(volatile int state
),用于展示当前临界资源的获锁情况。
-
state=0
,代表没有线程占有锁,可以去争抢这个锁,用 CAS 将 state 设为 1。 -
state=1
,说明 CAS 成功,抢到了锁,这样其他线程就抢不到了,进入等待队列尾部。 -
如果锁重入的话,
state+1
;解锁state-1
;直到state=0
,代表释放锁(所以 lock 和 unlock 必须要配对) -
释放锁后,唤醒等待队列中的首个线程,让其来占有锁。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
// 共享变量,使用volatile修饰保证线程可见性
private volatile int state;
可以通过getState()
,setState()
,compareAndSetState()
进行操作
// java.util.concurrent.locks.AbstractQueuedSynchronizer
// 获取state的值
protected final int getState() {
return state;
}
// 设置state的值
protected final void setState(int newState) {
state = newState;
}
// 使用CAS方式更新state
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
这几个方法都是final
修饰的,说明子类中无法重写它们。
我们可以通过修改state
字段表示的同步状态来实现多线程的独占模式和共享模式(加锁过程)。
5、FIFO等待队列
AQS中维护了一个FIFO(先进先出)的等待队列,主要用的是CLH队列的变体实现的,将获取不到锁的线程加入到队列尾部。
CLH(Craig、Landin and Hagersten)队列是单向链表,而AQS中的队列是CLH变体的虚拟双向队列(FIFO)
- AQS 将 每条请求共享资源的线程 封装成一个 Node节点 来实现等待队列。
6、AQS 中的 Node
AQS 的 等待队列 是用 Node 实现的,线程也是封装在 Node 的属性中的。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
// 队列的首尾节点
private transient volatile Node head;
private transient volatile Node tail;
// 队列节点
static final class Node {
// 线程两种锁的模式
static final Node SHARED = new Node(); // 共享模式
static final Node EXCLUSIVE = null; // 独占模式
// waitStatus的枚举值(默认为0)
static final int CANCELLED = 1; // 表示当前节点被取消
static final int SIGNAL = -1; // 表示当前节点需要去唤醒下一个节点
static final int CONDITION = -2; // 表示节点处于条件队列中等待唤醒
static final int PROPAGATE = -3; // SHARED模式中,前置节点不仅会唤醒后继节点,还可能唤醒后继的所有节点
// 属性
volatile int waitStatus; // 当前节点在队列中的状态
volatile Node prev; // 上一个节点
volatile Node next; // 下一个节点
volatile Thread thread; // 表示处于该节点的线程
// 指向下一个处于CONDITION状态的节点
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
// 获取上一个节点,没有抛出NPE
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
7、AQS 的特点
AQS 的主要特点包括:
- 高效性: AQS 使用了 CAS 操作和自旋等待,避免了线程阻塞和唤醒的开销,提高了并发性能。
- 灵活性: AQS 提供了丰富的模板方法和状态管理机制,使得可以根据实际需求实现各种不同类型的同步器。
- 可扩展性: AQS 提供了钩子方法,使得可以对同步器的行为进行扩展和定制化,例如增加超时机制、中断支持等。
总的来说,AQS 是 Java 并发编程中一个重要的基础框架,它为开发者提供了实现自定义同步器的便利和灵活性,同时也在实现上提供了高效性和可扩展性。
二、AQS 源码(以 ReentrantLock 为例)
本章我们以ReentrantLock
的实现来分析 AQS 的源码。ReentrantLock
是独占锁,实现了tryAcquire
和 tryRelease
1、基本实现
public class ReentrantLock implements Lock, java.io.Serializable {
// ------ 属性 ------
private final Sync sync; // 同步器,继承自AQS抽象类
// ------ 内部类 ------
abstract static class Sync extends AbstractQueuedSynchronizer {...}
static final class NonfairSync extends Sync {...}
static final class FairSync extends Sync {...}
// ------ 构造方法 ------
public ReentrantLock() {
sync = new NonfairSync(); // 默认是 非公平锁
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync(); // 参数指定 公平锁 或 非公平锁
}
// ------ 成员方法 ------
public void lock() {
sync.lock(); // 最终都是调用具体 sync 的实现
}
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.release(1);
}
// ...
}
2、加锁
1)lock
public class ReentrantLock implements Lock, java.io.Serializable {
public void lock() {
sync.lock(); // 最终都是调用具体 sync 的实现
}
static final class NonfairSync extends Sync {
@ReservedStackAccess
final void lock() {
if (compareAndSetState(0, 1))
// 获取锁成功,则将当前线程设置为独占线程。(和 FairSync 的区别,体现了非公平性,不管队列)
setExclusiveOwnerThread(Thread.currentThread());
else
// 获取锁失败,则进入acquire方法进行后续处理。
acquire(1);
}
}
static final class FairSync extends Sync {
final void lock() {
// 进入acquire方法进行后续处理。
acquire(1);
}
}
}
可以看到,ReentrantLock
的lock
方法无论是NonfairSync
还是FairSync
,最后都会调用 AQS抽象类
中的 acquire
// java.util.concurrent.locks.AbstractQueuedSynchronizer
@ReservedStackAccess
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 线程在等待资源的过程中被唤醒,唤醒后还是会不断地去尝试获取锁,直到抢到锁为止。
// 也就是说,在整个流程中,并不响应中断,只是记录中断记录。
// 最后抢到锁返回了,那么如果被中断过的话(acquireQueued返回true),就需要补充一次中断。
selfInterrupt();
}
static void selfInterrupt() {
// 重新产生一次中断
Thread.currentThread().interrupt();
}
acquire
的整体流程:
-
调用
tryAcquire(arg)
方法,尝试获取锁(protected
修饰,由NonfairSync
和FairSync
实现)获取锁成功,直接返回
-
获取锁失败,调用
addWaiter
方法,创建Node放入等待队列末尾,返回创建的Node -
调用
acquireQueued
方法, 让 加入等待队列的线程 自旋获取锁,直到获取成功 或者 中断。 -
selfInterrupt()
补充一次中断(因为acquireQueued
中并不响应中断,只是记录中断记录)
2)addWaiter
tryAcquire(arg)
获取锁失败,就会执行addWaiter(Node.EXCLUSIVE)
,把当前线程加入等待队列末尾。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private Node addWaiter(Node mode) {
// 为当前线程创建一个Node
Node node = new Node(Thread.currentThread(), mode);
// pred 指向 tail
Node pred = tail;
// 等待队列中有元素 -> 将当前节点放到等待队列的末尾
if (pred != null) {
node.prev = pred;
// pred 和 tailOffset 地址一致,则将 tailOffset 更新为 node
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 等待队列中没有元素 或 compareAndSetTail执行失败 -> 执行enq
enq(node);
return node;
}
【1】compareAndSetTail
compareAndSetTail
方法通过比较 tailOffset
和 tail节点地址
是否一致来 设置tail
(更新 tailOffset
)
// java.util.concurrent.locks.AbstractQueuedSynchronizer
/**
* tailOffset 和 expect 地址一致,则更新 tailOffset 为 update 的地址
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
// 静态代码块,初始化一些属性在内存当中的偏移量,这样就可以根据这个偏移量在对象内存当中找到对应的属性。
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
【2】enq
以下两种情况,需要继续向后执行 enq(node)
方法
- 情况1:
tail==null
,说明等待队列中没有元素 - 情况2:
compareAndSetTail
返回false
(tailOffset
和tail节点
地址不同),说明已经被别的线程修改。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private Node enq(final Node node) {
// 自旋,将 当前节点 加到等待队列末尾
for (;;) {
Node t = tail;
if (t == null) {
// 队列中没有Node:设置一个虚节点作为head(虚节点是指:仅占位,没有任何信息的节点)
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 队列中有Node:将当前节点放到等待队列的末尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
这里注意:
-
等待队列中,
head
是一个虚节点(new Node()
,仅占位,不存储任何信息)真正的第一个有效节点,是
head
的next
。 -
如果等待队列中只有一个节点,那么
tail = head
3)tryAcquire
ReentrantLock
的lock
方法首先会通过tryAcquire
尝试获取锁(NonfairSync
和 FairSync
的实现稍有不同)
【1】非公平的 tryAcquire
public class ReentrantLock implements Lock, java.io.Serializable {
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
// 调用Sync的nonfairTryAcquire
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 没有线程持有锁,可以获取锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 锁重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 其他线程持有锁,tryAcquire失败
return false;
}
}
}
【2】公平的 tryAcquire
和NonfairSync
几乎一样,区别是多了一个!hasQueuedPredecessors()
的判断
!hasQueuedPredecessors()
主要是确保等待队列中没有排在当前节点前面的节点,当前线程才可以直接获取锁
public class ReentrantLock implements Lock, java.io.Serializable {
static final class FairSync extends Sync {
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 和 NonfairSync 唯一的区别:多了一个 !hasQueuedPredecessors() 的判断
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
}
【3】hasQueuedPredecessors
上面说到, FairSync
和 NonfairSync
的 tryAcquire
方法基本一样,唯一的区别在于:
FairSync
比NonfairSync
多了一个!hasQueuedPredecessors()
的判断
// java.util.concurrent.locks.AbstractQueuedSynchronizer
/**
* 公平锁加锁时,需要判断:等待队列中是否存在排在当前节点前面的节点
*/
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// 首尾节点相同,说明队列中只有一个占位的「虚节点」,没有等待的「有效节点」,返回false
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
-
(s = h.next) == null
,说明有线程正在放到等待队列中,但是只放了一半只将
Node
的上个节点指向了tail
,还没有将tail
的下个节点指向Node
(放了一半也是排在前面,返回true) -
(s = h.next) != null
,说明等待队列中至少有一个有效节点s.thread == Thread.currentThread()
,说明 等待队列中首个有效节点就是当前线程,可以直接获取锁。s.thread != Thread.currentThread()
,说明 等待队列中首个有效节点不是当前线程,需要排队。
为什么有
(s = h.next) == null
这个情况呢?我们回顾一下addWaiter
方法中的enq
方法
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private Node enq(final Node node) {
// 自旋设置尾节点
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 这里可以看到,节点放到等待队列的2步操作,并不是原子的
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
可以看到,节点放到等待队列的操作并不是原子的,所以, (s = h.next) == null
是为了解决极端情况下的并发问题。
4)acquireQueued
addWaiter
方法将当前线程加到等待队列末尾后,会返回封装了当前线程的Node
,并作为参数传给acquireQueued
方法
acquireQueued
方法会让 加入等待队列的线程 自旋获取锁,直到获取成功 或者 中断。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; // 是否成功获取锁
try {
boolean interrupted = false; // 是否发生中断
// 开始自旋,要么获取锁,要么中断
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 前驱节点是head,说明当前节点是等待队列中首个有效节点,当前线程可以直接获取锁(head是虚节点)
if (p == head && tryAcquire(arg)) {
// 获取锁成功,当前节点成为head
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 判断获取锁失败后,是否需要park阻塞;如果需要,则调用 parkAndCheckInterrupt 进行park
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
// 走到if内部,说明 parkAndCheckInterrupt 返回的中断标记是true
// 这里是不可中断的`lock`,所以只会记录中断记录,在获取锁之后返回给acquire方法处理
// 如果是`lockInterruptibly`,会直接在这里抛出 InterruptedException
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 把传入的节点设置为head(虚节点),仅占位
*/
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
【1】shouldParkAfterFailedAcquire
如果首次调用时,前驱节点的状态不为 SIGNAL,则CAS设置前驱节点的状态为 SIGNAL,返回false --> 继续循环…
- 再次调用时,前驱节点已经更新为 SIGNAL 了,返回true --> 调用
parkAndCheckInterrupt
阻塞
// java.util.concurrent.locks.AbstractQueuedSynchronizer
/**
* 获取锁失败后,判断线程是否要阻塞(条件:前驱节点 waitStatus 是 SIGNAL)
*
* 如果 前驱节点 waitStatus 是 SIGNAL,说明 当前节点 可以尝试获取锁了。
* 因此返回true,表示当前节点应该park,以便后续被唤醒尝试获取锁。
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驱节点的 waitStatus
int ws = pred.waitStatus;
// 前驱节点 waitStatus 是 SIGNAL
if (ws == Node.SIGNAL)
return true;
// > 0 是 取消状态
if (ws > 0) {
// 前驱节点是取消状态 -> 向前找 不是取消状态的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// next连接到当前节点
pred.next = node;
} else {
// 前驱节点不是取消状态 -> 设置 前驱节点 的 waitStatus 为 SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
【2】parkAndCheckInterrupt
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private final boolean parkAndCheckInterrupt() {
// 阻塞当前线程(注意:中断标记为true时,park失效,调用park线程不会阻塞)
LockSupport.park(this);
// 返回线程是否中断(注意:interrupted会清除中断标记,即置为false)
return Thread.interrupted();
}
【3】cancelAcquire
acquireQueued
方法的finally
代码块中,调用了cancelAcquire
方法,这个方法会将Node
的状态标记为CANCELLED
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
// 当前节点往前遍历,找到未被取消的前驱节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// pred的下个节点
Node predNext = pred.next;
// 把当前节点的状态设置为CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果当前节点是尾节点,将pred设置为tail节点
if (node == tail && compareAndSetTail(node, pred)) {
// 更新成功,将predNext设置为null
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
// 1. 判断pred是否为SIGNAL
((ws = pred.waitStatus) == Node.SIGNAL ||
// 2. 如果不是,CAS把pred设置为SINGAL
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
// 把pred的next更新为当前节点的next
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 唤醒当前节点的下一个有效节点(不为null && 不是取消状态) 解锁的时候还会提到这个方法
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
整体流程
- 获取当前节点的前驱节点,如果前驱节点的状态是CANCELLED,那就一直往前遍历,找到第一个waitStatus <= 0的节点,将找到的Pred节点和当前Node关联,将当前Node设置为CANCELLED。
- 根据当前节点的位置,考虑以下三种情况:
情况1:当前节点是尾节点。
情况2:当前节点是Head的后继节点。
情况3:当前节点不是Head的后继节点,也不是尾节点。
为什么 CANCELLED节点状态的产生和变化 都是对 Next指针 进行了操作,而没有对Prev指针进行操作呢?
执行cancelAcquire
的时候,当前节点的前驱节点可能已经从队列中出去了,如果此时修改Prev指针,有可能会导致Prev指向另一个已经移除队列的Node,因此这块变化Prev指针不安全。
为什么
shouldParkAfterFailedAcquire
方法中是对Prev指针进行操作的?
shouldParkAfterFailedAcquire
方法中,会执行下面的代码,其实就是在处理Prev指针。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// ...
// 找到上一个非 CANCELLED 状态的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// ...
}
shouldParkAfterFailedAcquire
是获取锁失败的情况下才会执行,进入该方法后,说明共享资源已被获取,当前节点之前的节点都不会出现变化,因此这个时候变更Prev指针比较安全。
5)小结
ReentrantLock
的lock
方法小结:
-
无论是
NonfairSync
还是FairSync
,最后都会调用AQS抽象类
中的acquire
-
先调用
tryAcquire(arg)
方法,尝试获取锁- 没有线程持有锁,可以直接获取(如果是公平锁,还要通过
hasQueuedPredecessors
判断前面还有没有在排队的)- 先CAS更新
state
,再setExclusiveOwnerThread(current)
- 先CAS更新
- 当前线程持有锁,获取锁成功(锁重入,更新state - 重入次数)
- 其他线程持有锁,获取锁失败
- 没有线程持有锁,可以直接获取(如果是公平锁,还要通过
-
获取锁成功,直接返回;
-
获取锁失败,调用
addWaiter
方法:- 创建Node放入等待队列末尾,返回创建的Node。(这里会初始化head - 仅占位的虚节点)
-
调用
acquireQueued
方法,让 加入等待队列的线程 自旋获取锁,直到获取成功 或者 中断。-
参数是
addWaiter
方法返回的 Node -
如果前驱节点是head,可以直接获取锁
- 获取成功,当前节点成为head
- 获取失败,通过
shouldParkAfterFailedAcquire
方法判断是否需要阻塞- 需要阻塞,调用
parkAndCheckInterrupt
方法 park 阻塞
- 需要阻塞,调用
-
-
acquireQueued
方法返回中断记录,如果为true,需要调用selfInterrupt()
补充一次中断
3、解锁
1)unlock
public class ReentrantLock implements Lock, java.io.Serializable {
public void unlock() {
sync.release(1);
}
}
ReentrantLock
中的unlock
方法,本质调用的是AQS抽象类
中的release
方法
// java.util.concurrent.locks.AbstractQueuedSynchronizer
@ReservedStackAccess
public final boolean release(int arg) {
// tryRelease返回true,说明锁没有被任何线程持有
if (tryRelease(arg)) {
Node h = head;
// head不为空 并且 head的waitStatus不为0
if (h != null && h.waitStatus != 0)
// 唤醒 head 的下一个有效节点(不为null && 不是取消状态)
unparkSuccessor(h);
return true;
}
return false;
}
这里的判断条件为什么是h != null && h.waitStatus != 0
?
-
h == null
,表明Head还没初始化。第一个节点入队,Head会被初始化一个虚拟节点。所以说,这里如果还没来得及入队,就会出现 head == null 的情况。
-
h != null && waitStatus == 0
,表明后继节点对应的线程仍在运行中,不需要唤醒。 -
h != null && waitStatus < 0
,表明后继节点可能被阻塞了,需要唤醒。
2)tryRelease
AQS抽象类
的release
方法,先调用了ReentrantLock
中Sync
实现的tryRelease
方法
public class ReentrantLock implements Lock, java.io.Serializable {
abstract static class Sync extends AbstractQueuedSynchronizer {
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
// 减少重入次数
int c = getState() - releases;
// 当前线程不是持有锁的线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// state==0,则将当前独占锁持有线程设置为null
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 更新state
setState(c);
return free;
}
}
}
3)unparkSuccessor
调用unparkSuccessor
方法唤醒当前节点的下一个有效节点(不为null && 不是取消状态)
// java.util.concurrent.locks.AbstractQueuedSynchronizer
/**
* 唤醒当前节点的下一个有效节点(不为null && 不是取消状态)
*/
private void unparkSuccessor(Node node) {
// 获取当前节点的waitStatus
int ws = node.waitStatus;
// 没有取消,则设置为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取后继节点
Node s = node.next;
// 后继节点为null 或 后继节点是取消状态
if (s == null || s.waitStatus > 0) {
s = null;
// 不考虑已取消的节点, 从等待队列的tail向前找到最前面需要 unpark 的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
为什么要从后往前找需要 unpark 的节点呢?原因如下。
-
之前讲
tryAcquire
方法时提到过,addWaiter
方法中节点入队并不是原子的如果在 执行
node.prev = t
和t.next = node
之间执行了unparkSuccessor
方法,此时只有prev
还没有next
-
在产生
CANCELLED
状态节点的时候,先断开的是next
指针,prev
指针并未断开
综上所述,如果是从前往后找,可能会导致无法遍历所有的节点。
4)小结
ReentrantLock
的unlock
方法小结:
ReentrantLock
中的unlock
方法,本质调用的是AQS抽象类
中的release
方法- 首先调用
tryRelease
方法:- 当前线程不是持有锁的线程,抛出异常
- 当前线程是持有锁的线程
state==0
,则将当前独占锁持有线程设置为null,解锁成功state!=0
,还有重入,解锁失败
- 解锁成功 —> 调用
unparkSuccessor
唤醒head的后继节点(不为null 且 不是取消状态) - 后继节点被唤醒后,重新在
acquireQueued
的死循环中获取锁,这时就可以成功获取锁了。
4、可重入
不管是公平锁还是非公平锁,tryAcquire
方法中都有这么一段逻辑。
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
可以看到,有一个同步状态state
来控制整体可重入的情况。state
是volatile
修饰的,用于保证可见性和有序性。
state
初始化的时候为0,表示没有任何线程持有锁。- 当有线程持有该锁时,值就会在原来的基础上+1,同一个线程多次获得锁时,就会多次+1,这里就是可重入的概念。
- 解锁也是对这个字段-1,一直到0,此线程对锁释放。
5、使用案例
public class ReentrantLockDemo {
private static int number = 30;
private static final Lock lock = new ReentrantLock();
public static void main(String[] args) {
Runnable task = () -> {
String currentThread = Thread.currentThread().getName();
for (int i = 0; i < 10; i++) {
try {
lock.lock();
if (number > 0) {
number--;
System.out.println(currentThread + "卖了一张票,剩余:" + number);
}
} catch (Exception e) {
// TODO: handle exception
} finally {
lock.unlock();
}
}
};
// 多线程操作
new Thread(task, "窗口1").start();
new Thread(task, "窗口2").start();
new Thread(task, "窗口3").start();
}
}
三、AQS 应用
除了上边的ReentrantLock的应用,AQS作为并发编程的框架,为很多其他同步工具提供了良好的解决方案。下面列出了JUC中的几种同步工具,大体介绍一下AQS的应用场景:
1、Semaphore(信号量)
Semaphore
用于控制同时访问特定资源的线程数量。
1)基本实现
public class Semaphore implements java.io.Serializable {
// ------ 属性 ------
private final Sync sync;
// ------ 内部类 ------
abstract static class Sync extends AbstractQueuedSynchronizer {...}
static final class NonfairSync extends Sync {...}
static final class FairSync extends Sync {...}
// ------ 构造方法 ------
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
// ------ 成员方法 ------
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
public void release() {
sync.releaseShared(1);
}
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
// ...
}
2)原理
Semaphore
的基本实现和 ReentrantLock
差不多
- 通过
Sync抽象类
继承AQS抽象类
。 - 通过
Sync抽象类
的子类NonfairSync
和FairSync
来实现 非公平锁 和 公平锁 - 主要的方法最终调用的都是具体
Sync
的实现
public class Semaphore implements java.io.Serializable {
abstract static class Sync extends AbstractQueuedSynchronizer {
// ...
Sync(int permits) {
setState(permits);
}
// ...
}
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
}
static final class FairSync extends Sync {
FairSync(int permits) {
super(permits);
}
}
}
从构造方法可以看出,Semaphore
使用 AQS的同步状态 state
来保存信号量的许可证计数permits
。
-
执行任务的线程先通过
acquire()
方法获取许可证(减少计数)。 -
当执行任务的线程数量超出
permits
,那么多余的线程将会被放入阻塞队列park
,并自旋判断 state 是否大于 0。只有当 state 大于 0 的时候,阻塞的线程才能继续执行。
-
线程调用
release()
方法会释放许可证(增加计数),那么阻塞的线程就可以获取许可证。
如此,每次只有最多不超过 permits
数量的线程可以获取许可证执行,便限制了执行任务线程的数量。
3)使用案例
/**
* 停车位
*/
public class SemaphoreDemo {
public static void main(String[] args) {
// 计数信号量 - 3个停车位!限流!
Semaphore semaphore = new Semaphore(3);
// 6辆车抢车位
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
// acquire() 获取许可证
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "抢到车位");
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "离开车位");
} catch (InterruptedException ignored) {
// Todo Handle Exception
} finally {
// release() 释放许可证
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
}
/**
* 停车位 - 多个共享资源互斥使用 & 并发限流,控制最大的线程数
*/
public class SemaphoreDemo {
public static void main(String[] args) {
// 计数信号量 - 3个停车位!限流!
Semaphore semaphore = new Semaphore(3);
// 6辆车抢车位
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
// acquire() 获取许可证
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "抢到车位");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// release() 添加许可证
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
}
2、CountDownLatch(计数器闭锁)
CountDownLatch
用于等待一组操作完成后再执行某个操作。
1)基本实现
public class CountDownLatch {
// ------ 属性 ------
private final Sync sync;
// ------ 内部类 ------
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
// ------ 构造方法 ------
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
// ------ 成员方法 ------
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void countDown() {
sync.releaseShared(1);
}
// ...
}
2)原理
CountDownLatch
的基本实现和 ReentrantLock
差不多,但是没有 公平 的概念
- 通过
Sync抽象类
继承AQS抽象类
。 - 主要的方法最终调用的都是具体
Sync
的实现。
CountDownLatch
,使用 AQS的同步状态 state
来保存 计数 count
。
-
任务分为
count
个线程去执行,AQS 的 state 也初始化为count
(注意count
要与线程个数一致)。 -
在线程中调用
CountDownLatch.await()
方法,会使当前线程阻塞,直到state
的值为0。 -
线程执行完后调用
countDown()
,state 会 CAS 减 1 -
所有线程执行完毕并调用
countDown()
之后,state
的值也会减到0(因此执行任务的线程数必须和count
对应)此时之前因调用
await()
方法而阻塞的线程都会被唤醒,继续执行后续任务。
3)使用案例
/**
* 阻塞直到所有线程执行结束
*/
public class CountDownLatchDemo {
public static void main(String[] args) {
// 计数器
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 1; i <= 5; i++) {
final int num = i;
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "开始");
TimeUnit.SECONDS.sleep(num);
System.out.println(Thread.currentThread().getName() + "结束");
} catch (InterruptedException ignored) {
// Todo Handle Exception
} finally {
// 计数器-1
countDownLatch.countDown();
}
}, String.valueOf(i)).start();
}
try {
// 等待计数器归零,然后再向下操作
countDownLatch.await();
} catch (InterruptedException e) {
// Todo Handle Exception
}
System.out.println("所有线程执行结束");
}
}
3、CyclicBarrier(循环栅栏)
CyclicBarrier
用于等待一组线程达到一个共同的栅栏点后再继续执行,可以指定任务。
1)基本实现
public class CyclicBarrier {
// ------ 属性 ------
/** The number of parties */
private final int parties;
/** Number of parties still waiting */
private int count;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The current generation */
private Generation generation = new Generation();
// ------ 内部类 ------
/** The current generation */
private static class Generation {
boolean broken = false;
}
// ------ 构造方法 ------
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
// ------ 核心成员方法 ------
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
// ------ 相关成员方法 ------
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
}
核心方法 await
主要是由dowait
方法实现的
public class CyclicBarrier {
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
// count减到0,说明最后一个线程已经到达栅栏,也就是可以执行 await 方法之后的任务了
if (index == 0) {
boolean ranAction = false;
try {
// 有任务执行任务
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 唤醒所有等待的线程
// 重置 count(parties的初始值)
// 重置 generation(新new一个)
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
// 调用condition的await等待
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 解锁
lock.unlock();
}
}
}
2)原理
CycliBarrier
的实现是基于 ReentrantLock
和 Condition
的。
CyclicBarrier
内部通过一个 count 变量作为计数器,count 的初始值为 parties 属性的初始化值。- 每当一个线程到了栅栏这里了,那么就将计数器减一。
- 如果 count 值为 0 了,表示这是这一批最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务。
3)使用案例
/**
* 集齐七颗龙珠,召唤神龙
*/
public class CyclicBarrierDemo {
public static void main(String[] args) {
// 加法计数器 - 线程数达到指定数量7,执行指定的Runnable任务
CyclicBarrier cyclicBarrier = new CyclicBarrier(
7, () -> System.out.println("集齐七颗龙珠,召唤神龙"));
for (int i = 1; i <= 7; i++) {
final int num = i;
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(num);
System.out.println(Thread.currentThread().getName() + "收集第" + num + "颗龙珠");
cyclicBarrier.await(); // 等待
System.out.println(Thread.currentThread().getName() + "执行结束");
} catch (InterruptedException | BrokenBarrierException e) {
// Todo Handle Exception
}
}, String.valueOf(i)).start();
}
}
}
4、ReentrantReadWriteLock
ReentrantReadWriteLock
可以保证多个线程可以同时读,所以在读操作远大于写操作的时候,读写锁就非常有用了。
1)基本实现
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
// ------ 属性 ------
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
// ------ 构造方法 ------
public ReentrantReadWriteLock() {
this(false); // 默认是非公平的
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
// ------ 内部类 ------
abstract static class Sync extends AbstractQueuedSynchronizer {...}
static final class NonfairSync extends Sync {...}
static final class FairSync extends Sync {...}
public static class ReadLock implements Lock, java.io.Serializable {
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquireShared(1);
}
public boolean tryLock() {
return sync.tryReadLock();
}
public void unlock() {
sync.releaseShared(1);
}
public Condition newCondition() {
throw new UnsupportedOperationException();
}
// ...
}
public static class WriteLock implements Lock, java.io.Serializable {
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquire(1);
}
public boolean tryLock( ) {
return sync.tryWriteLock();
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
// ...
}
// ...
}
2)原理
ReentrantReadWriteLock
的基本实现和 ReentrantLock
也有很多相似之处
- 通过
Sync抽象类
继承AQS抽象类
。 - 通过
Sync抽象类
的子类NonfairSync
和FairSync
来实现 非公平锁 和 公平锁 - 主要的方法最终调用的都是具体
Sync
的实现
区别在于:
-
ReentrantReadWriteLock
通过ReadLock
和WriteLock
内部类 来实现 读锁 和 写锁允许多个线程同时获取读锁,但是只允许一个线程获取写锁,而且在写锁被获取时,所有的读锁都会被阻塞。
-
ReentrantReadWriteLock
中的Sync抽象类
将读锁和写锁的持有数分别保存在不同的位段中这样可以避免在更新同步状态时出现竞争,很容易地进行读锁和写锁的控制。
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
abstract static class Sync extends AbstractQueuedSynchronizer {
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 高16位:用于保存共享锁(读锁)的持有数。
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 低16位:用于保存独占锁(写锁)的持有数。
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
}
}
3)使用案例
public class ReentrantReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 1; i <= 5; i++) {
final int tmp = i;
new Thread(() -> myCache.write(tmp, tmp), String.valueOf(i)).start();
}
for (int i = 1; i <= 5; i++) {
final int tmp = i;
new Thread(() -> myCache.read(tmp), String.valueOf(i)).start();
}
}
}
class MyCache {
// 读写锁
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Map<Integer, Object> cache = new HashMap<>();
public void read(Integer key) {
try {
// 加读锁
readWriteLock.readLock().lock();
System.out.println(Thread.currentThread().getName() + "读取" + key);
cache.get(key);
TimeUnit.MILLISECONDS.sleep(500);
System.out.println(Thread.currentThread().getName() + "读取OK");
} catch (Exception e) {
// TODO: handle exception
} finally {
// 释放读锁
readWriteLock.readLock().unlock();
}
}
public void write(Integer key, Object value) {
try {
// 加写锁
readWriteLock.writeLock().lock();
System.out.println(Thread.currentThread().getName() + "写入" + key);
cache.put(key, value);
TimeUnit.MILLISECONDS.sleep(500);
System.out.println(Thread.currentThread().getName() + "写入OK");
} catch (Exception e) {
// TODO: handle exception
} finally {
// 释放写锁
readWriteLock.writeLock().unlock();
}
}
}
5、ThreadPoolExecutor
ThreadPoolExecutor
中的 Worker
也是基于 AbstractQueuedSynchronizer
实现的
public class ThreadPoolExecutor extends AbstractExecutorService {
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
}
6、CountDownLatch vs CyclicBarrier
javadoc 是这么描述 CountDownLatch
和 CyclicBarrier
的
CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.(一种同步辅助,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成)
CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.(一种同步辅助,它允许一组线程都等待彼此到达一个公共屏障点)
CountDownLatch 对比 CyclicBarrier
- 使用场景上:
- CountDownLatch:相当于一个计数器,一个或者多个线程,等待其他多个线程执行完毕后再进行某些操作。
- CyclicBarrier:多个线程互相等待,直到到达同一个同步点,再被唤醒继续执行。
- 实现上:
- CountDownLatch:通过继承
AbstractQueuedSynchronizer
实现 - CyclicBarrier:没有直接实现AQS框架,而是借助
ReentrantLock
和Condition
来实现的同步机制。
- CountDownLatch:通过继承
- 使用上:
- Countdownlatch:一次性的。计数器是递减的,一旦计数器减到0,等待的线程会被唤醒,而且计数器不能重置。
- CyclicBarrier:可以循环使用。提供
reset
功能。
- 返回值:
- CyclicBarrier:
await
方法有返回值,可以通过返回值计算出它是第几个到达的线程。 - Countdownlatch:没有返回值
- CyclicBarrier:
- 指定任务:
- CyclicBarrier:创建CyclicBarrier对象的时候,能够传入一个Runnable参数,达到屏障后,执行该任务。
- Countdownlatch:不具备这个功能
四、自定义同步工具
public class CustomLock {
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire (int arg) {
return compareAndSetState(0, 1);
}
@Override
protected boolean tryRelease (int arg) {
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively () {
return getState() == 1;
}
}
private Sync sync = new Sync();
public void lock () {
sync.acquire(1);
}
public void unlock () {
sync.release(1);
}
}
public class CustomLockTest {
static int count = 0;
static CustomLock lock = new CustomLock();
public static void main (String[] args) throws InterruptedException {
Runnable runnable = new Runnable() {
@Override
public void run () {
try {
lock.lock();
for (int i = 0; i < 10000; i++) {
count++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
};
Thread thread1 = new Thread(runnable);
Thread thread2 = new Thread(runnable);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(count);
}
}
上述代码每次运行结果都会是20000。通过简单的几行代码就能实现同步功能,这就是AQS的强大之处。