文章目录
- Pre
- 概述
- Condition 主要方法
- Condition案例
- Condition的源码解析
- 1. 等待:condition. await
- 2. 唤醒Condition. signal
- Condition总结
Pre
J.U.C - 深入解析ReentrantLock原理&源码
概述
配合synchronized同步锁在同步代码块中调用加锁对象notify和wait方法,可以实现线程间同步,在JUC锁里面也有相似的实现:Condition类,利用条件变量Condition的signal和await方法用来配合JUC锁也可以实现多线程之间的同步。
和synchronized不同的是,一个synchronized锁只能用一个共享变量(锁对象)的notify或wait方法实现同步,而AQS的一个锁可以对应多个条件变量,对线程的等待、唤醒操作更加详细和灵活。
在调用加锁对象的notify和wait方法前必须先获取该共享变量的内置锁。同样,在调用条件变量的signal和await方法前也必须先获取条件变量对应的锁,因此Condition必须要配合锁一起使用,一个Condition的实例必须与一个Lock绑定,Condition一般都是作为Lock的内部实现。
Condition 主要方法
Condition提供了一系列的方法来阻塞和唤醒线程:
- 1)
await()
:造成当前线程在接到信号或被中断之前一直处于等待状态。 - 2)
await(long time,TimeUnit unit)
:当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。 - 3)
awaitNanos(long nanosTimeout)
:当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在nanosTimesout之前唤醒,那么返回值 = nanosTimeout - 消耗时间,如果返回值<= 0,则可以认定它已经超时。 - 4)
awaitUninterruptibly()
:当前线程在接到信号之前一直处于等待状态。注意:该方法对中断不敏感。 - 5)
awaitUntil(Date deadline)
:当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回false。 - 6)
signal()
:唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。 - 7)
signalAll()
:唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。
Condition案例
如何使用 ReentrantLock
和 Condition
实现“等待”和“唤醒”的功能。创建了两个线程:Thread-await
和 Thread-signal
,分别实现等待和唤醒的功能。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionExample {
public static void main(String[] args) {
// 创建一个重入独占锁
Lock lock = new ReentrantLock();
// 使用锁对象创建一个条件变量
Condition condition = lock.newCondition();
// 创建等待线程
Thread awaitThread = new Thread(() -> {
lock.lock();
try {
System.out.println("Thread-await: 已获得锁,开始等待...");
// 调用await方法,当前线程会释放锁并被阻塞等待
condition.await();
System.out.println("Thread-await: 被唤醒,继续执行...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println("Thread-await: 释放锁");
}
}, "Thread-await");
// 创建唤醒线程
Thread signalThread = new Thread(() -> {
lock.lock();
try {
// 模拟一些处理时间
Thread.sleep(2000);
System.out.println("Thread-signal: 已获得锁,唤醒等待线程...");
// 调用signal方法,唤醒一个被阻塞的线程
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println("Thread-signal: 释放锁");
}
}, "Thread-signal");
// 启动等待线程
awaitThread.start();
// 让等待线程先执行一段时间
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 启动唤醒线程
signalThread.start();
}
}
-
创建锁和条件变量:
Lock lock = new ReentrantLock(); Condition condition = lock.newCondition();
-
创建等待线程
Thread-await
:- 获取锁。
- 调用
condition.await()
方法,当前线程会释放锁并被阻塞等待。 - 被唤醒后,继续执行并释放锁。
-
创建唤醒线程
Thread-signal
:- 获取锁。
- 模拟一些处理时间(2秒)。
- 调用
condition.signal()
方法,唤醒一个被阻塞的线程。 - 释放锁。
-
启动线程:
- 先启动等待线程
Thread-await
。 - 让等待线程先执行一段时间(1秒)。
- 再启动唤醒线程
Thread-signal
。
- 先启动等待线程
运行过程
-
线程
Thread-await
获得锁并调用await
方法:- 输出:“Thread-await: 已获得锁,开始等待…”
- 当前线程释放锁并被阻塞等待。
-
线程
Thread-signal
获得锁并调用signal
方法:- 模拟处理时间(2秒)。
- 输出:“Thread-signal: 已获得锁,唤醒等待线程…”
- 调用
condition.signal()
方法,唤醒被阻塞的Thread-await
线程。 - 释放锁。
-
线程
Thread-await
被唤醒并重新获得锁:- 输出:“Thread-await: 被唤醒,继续执行…”
- 继续执行并释放锁。
-
线程
Thread-await
释放锁:- 输出:“Thread-await: 释放锁”
Condition的源码解析
在案例中,lock. newCondition()
的作用其实是新建了 一 个ConditionObject
对象,ConditionObject
是AQS的内部类,可以访问AQS内部的变量(例如状态变量state)和方法。
在每个条件变量内部都维护了一个条件队列,用来存放调用条件变量的await()方法时被阻塞的线程。
/**
* 返回一个 {@link Condition} 实例,用于与这个 {@link Lock} 实例一起使用。
*
* <p>返回的 {@link Condition} 实例支持与内置监视器锁一起使用的 {@link Object} 监视器方法({@link Object#wait() wait},
* {@link Object#notify notify} 和 {@link Object#notifyAll notifyAll})相同的功能。
*
* <ul>
*
* <li>如果在调用 {@link Condition} 的任何 {@linkplain Condition#await() 等待} 或 {@linkplain Condition#signal 通知} 方法时未持有此锁,
* 则会抛出 {@link IllegalMonitorStateException}。
*
* <li>当调用条件的 {@linkplain Condition#await() 等待} 方法时,锁会被释放,并且在这些方法返回之前,锁会被重新获取,锁的持有计数恢复到调用方法时的状态。
*
* <li>如果线程在等待时被 {@linkplain Thread#interrupt 中断},则等待将终止,抛出 {@link InterruptedException},并且线程的中断状态将被清除。
*
* <li>等待的线程按先进先出(FIFO)顺序被通知。
*
* <li>从等待方法返回的线程重新获取锁的顺序与最初获取锁的线程相同,默认情况下未指定,但对于 <em>公平</em> 锁,优先考虑等待时间最长的线程。
*
* </ul>
*
* @return 条件对象
*/
public Condition newCondition() {
return sync.newCondition();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
1. 等待:condition. await
调用Condition的await()系列方法,会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。
注意这个Condition队列和AQS队列有所区别,Condition的等待队列是一个单向队列
/**
* 实现可中断的条件等待。
* <ol>
* <li>如果当前线程被中断,抛出 InterruptedException。
* <li>保存由 {@link #getState} 返回的锁状态。
* <li>使用保存的锁状态作为参数调用 {@link #release},如果失败则抛出 IllegalMonitorStateException。
* <li>阻塞直到被信号唤醒或被中断。
* <li>通过调用带有保存的锁状态作为参数的 {@link #acquire} 的专门版本来重新获取锁。
* <li>如果在第 4 步被阻塞时被中断,抛出 InterruptedException。
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException(); // 如果当前线程被中断,抛出 InterruptedException
ConditionNode node = new ConditionNode(); // 创建一个新的条件节点
int savedState = enableWait(node); // 保存锁状态
LockSupport.setCurrentBlocker(this); // 设置当前阻塞者,用于向后兼容
boolean interrupted = false, cancelled = false, rejected = false; // 初始化标志变量
while (!canReacquire(node)) { // 循环直到可以重新获取锁
if (interrupted |= Thread.interrupted()) { // 检查当前线程是否被中断
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break; // 如果节点状态为 COND,则中断后退出循环
} else if ((node.status & COND) != 0) { // 检查节点状态是否为 COND
try {
if (rejected)
node.block(); // 如果被拒绝,调用 block 方法
else
ForkJoinPool.managedBlock(node); // 否则调用 managedBlock 方法
} catch (RejectedExecutionException ex) {
rejected = true; // 标记为被拒绝
} catch (InterruptedException ie) {
interrupted = true; // 标记为中断
}
} else
Thread.onSpinWait(); // 如果在入队过程中被唤醒,进行自旋等待
}
LockSupport.setCurrentBlocker(null); // 清除当前阻塞者
node.clearStatus(); // 清除节点状态
acquire(node, savedState, false, false, false, 0L); // 重新获取锁
if (interrupted) { // 如果线程被中断
if (cancelled) {
unlinkCancelledWaiters(node); // 移除已取消的等待者
throw new InterruptedException(); // 抛出 InterruptedException
}
Thread.currentThread().interrupt(); // 重新设置线程的中断状态
}
}
2. 唤醒Condition. signal
调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中
/**
* 将等待时间最长的线程(如果存在)从这个条件的等待队列移动到拥有锁的等待队列。
*
* @throws IllegalMonitorStateException 如果 {@link #isHeldExclusively} 返回 {@code false}
*/
public final void signal() {
ConditionNode first = firstWaiter; // 获取等待队列中的第一个节点
if (!isHeldExclusively()) // 检查当前线程是否独占持有锁
throw new IllegalMonitorStateException(); // 如果没有持有锁,抛出异常
if (first != null) // 如果等待队列不为空
doSignal(first, false); // 调用 doSignal 方法,将第一个节点从条件等待队列移动到锁的等待队列
}
/**
* 移除并转移一个或所有等待者到同步队列。
*/
private void doSignal(ConditionNode first, boolean all) {
while (first != null) { // 遍历等待队列
ConditionNode next = first.nextWaiter; // 获取下一个等待节点
if ((firstWaiter = next) == null) // 更新 firstWaiter,如果 next 为 null,则 lastWaiter 也设为 null
lastWaiter = null;
if ((first.getAndUnsetStatus(COND) & COND) != 0) { // 检查节点状态是否为 COND
enqueue(first); // 将节点从条件等待队列移动到同步队列
if (!all) // 如果只需要唤醒一个等待者,则退出循环
break;
}
first = next; // 继续遍历下一个节点
}
}
Condition总结
阻塞:await()
方法中,在线程释放锁资源之后,如果节点不在AQS等待队列,则阻塞当前线程,如果在等待队列,则自旋等待尝试获取锁。
释放:signal()
后,节点会从condition队列移动到AQS等待队列,则进入正常锁的获取流程