本文将介绍AQS的简单原理。
首先有个整体认识,全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架。常用的ReentrantLock、Semaphore、CountDownLatch等都有实现它。
本文参考:
深入理解AbstractQueuedSynchronizer只需15张图_abstractqueuedsynchronizer流程图-CSDN博客
黑马程序员深入学习Java并发编程,JUC并发编程全套教程_哔哩哔哩_bilibili
从ReentrantLock的实现看AQS的原理及应用 - 美团技术团队 (meituan.com)
Java AQS 核心数据结构-CLH 锁 (qq.com)
AQS关键组成部分
state
state用来表示资源的状态(独占模式和共享模式),子类需要控制这个变量,就能够获取锁或者释放锁。
独占模式:只允许一个线程访问资源。
共享模式:允许多个线程访问资源。
CLH队列
这个队列保存的是没有获得锁资源进入阻塞的线程。使用的是头节点和尾结点来创建出这个双向队列。这个队列是先进先出的,不支持优先级。这里的CLH队列是AQS对于CLH锁的升级改后的应用。
该队列是对自旋锁的改进。那么自旋锁有上面缺点呢?
自旋锁
public class SpinLock {
private AtomicReference<Thread> owner = new AtomicReference<Thread>();
public void lock() {
Thread currentThread = Thread.currentThread();
// 如果锁未被占用,则设置当前线程为锁的拥有者
// 这里就是自旋锁的核心部分
while (!owner.compareAndSet(null, currentThread)) {
}
}
public void unlock() {
Thread currentThread = Thread.currentThread();
// 只有锁的拥有者才能释放锁
owner.compareAndSet(currentThread, null);
}
}
第一个是锁饥饿问题。在锁竞争激烈的情况下,可能存在一个线程一直被其他线程”插队“而一直获取不到锁的情况。
第二是性能问题。在实际的多处理上运行的自旋锁在锁竞争激烈时性能较差。
CLH队列就把上述的两个问题解决了。
CLH队列过程(加解锁)
- 初始化一个 Tail 指向一个状态为false的空节点。
- 当t1线程获取锁,tail就指向t1线程,同时修改状态为true
- 当t2也想获得锁,此时t1还没释放,tail就指向t2,同时t1对应的节点。此时t2检查到t1节点为false,就开始轮询上一个节点的状态
- 当t1结束后,就把值修改为false
- 当t2轮询到值为false后,就是获取锁成功。
AQS升级CLH锁
- 扩展了每个节点的状态(waitStatus),从简单的true和false,扩展到如下部分。
- 维护前驱后继节点
原始版本的 CLH 锁中,节点间甚至都没有互相链接。但是,通过在节点中显式地维护前驱节点,CLH 锁就可以处理“超时”和各种形式的“取消”:如果一个节点的前驱节点取消了,这个节点就可以滑动去使用前面一个节点的状态字段。 - 用阻塞等待代替自选操作
当前一个节点释放锁之后,它会通知后一个节点获取锁。
条件变量
如果线程不满足条件变量后,那么它就要进入另外一种队列进行等待。每一个条件变量都会创建出一个队列。其中这个队列是单向的。
锁资源获取与释放
- 独占式
- acquire获取资源
- release释放资源
- 共享式
- acquireShared获取资源
- releaseShared释放资源
获得独占锁
acquire是个模板函数,模板流程就是线程获取共享资源,如果获取资源成功,线程直接返回,否则进入CLH队列,直到获取资源成功为止,且整个过程忽略中断的影响。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
释放独占锁
AQS中提供了release模板函数来释放资源,模板流程就是线程释放资源成功,唤醒CLH队列的第二个线程节点。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
获得共享锁
acquireShared是个模板函数,模板流程就是线程获取共享资源,如果获取到资源,线程直接返回,否则进入CLH队列,直到获取到资源为止,且整个过程忽略中断的影响。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
释放共享锁
AQS中提供了releaseShared模板函数来释放资源,模板流程就是线程释放资源成功,唤醒CHL队列的第二个线程节点(首节点的下个节点)
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
自定义锁
下面我们自定义一个锁,该锁内部的同步器是不可重入的独占锁。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
// 自定义锁
public class MyLock implements Lock {
// 独占锁 同步器类
public class MySync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
// 加锁,并设置成当前线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int acquires) {
setExclusiveOwnerThread(null);
// state是被volatile修饰的,exclusiveOwnerThread 没有被volatile修饰
// 所以设置exclusiveOwnerThread成null要放在前面
setState(0);
return true;
}
protected Condition newCondition() {
return new ConditionObject();
}
// 是否持有独占锁
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
}
private MySync sync = new MySync();
// 加锁,不成功进入等待队列
@Override
public void lock() {
sync.acquire(1);
}
// 加锁,不成功进入等待队列,可被中断
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
// 尝试加锁一次,不成功直接返回
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
// 尝试加锁(带超时的),不成功进入等待队列,可被中断
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
// 解锁
@Override
public void unlock() {
// 这里release本质上调用的是tryRelease方法 解锁并唤醒等待队列中的线程
// 如果调用的是release方法,就不会唤醒等待的线程
sync.release(1);
}
// 创建一个Condition
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
class Test {
public static void main(String[] args) {
MyLock lock = new MyLock();
new Thread(() -> {
lock.lock();
long now = System.currentTimeMillis();
try {
System.out.println("t1 获取锁");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("t1 释放锁");
System.out.println("t1 耗时:" + (System.currentTimeMillis() - now));
lock.unlock();
}
}, "t1").start();
new Thread(() -> {
lock.lock();
try {
System.out.println("t2 获取锁");
} finally {
System.out.println("t2 释放锁");
lock.unlock();
}
}, "t2").start();
}
}