文章目录
- 1.单锁实现
- 2.双锁实现
1.单锁实现
Java 中防止代码段交错执行,有两种锁选择
- synchronized 代码块,属于关键字级别提供锁保护,功能少
- ReentrantLock 类,功能丰富
以 ReentrantLock 为例
ReentrantLock lock = new ReentrantLock();
public void offer(String e) {
lock.lockInterruptibly();
try {
array[tail] = e;
tail++;
} finally {
lock.unlock();
}
}
只要两个线程执行上段代码时,锁对象是同一个,就能保证 try 块内的代码的执行不会出现指令交错现象,即执行顺序只可能是下面两种情况之一
线程1 | 线程2 | 说明 |
---|---|---|
lock.lockInterruptibly() | t1对锁对象上锁 | |
array[tail]=e1 | ||
lock.lockInterruptibly() | 即使 CPU 切换到线程2,但由于t1已经对该对象上锁,因此线程2卡在这儿进不去 | |
tail++ | 切换回线程1 执行后续代码 | |
lock.unlock() | 线程1 解锁 | |
array[tail]=e2 | 线程2 此时才能获得锁,执行它的代码 | |
tail++ |
- 另一种情况是线程2 先获得锁,线程1 被挡在外面
- 保护的是 tail 位置读写的安全
锁同时保护了 tail 和 size 的读写安全
ReentrantLock lock = new ReentrantLock();
int size = 0;
public void offer(String e) {
lock.lockInterruptibly();
try {
if(isFull()) {
// 满了怎么办?
}
array[tail] = e;
tail++;
size++;
} finally {
lock.unlock();
}
}
private boolean isFull() {
return size == array.length;
}
之前是返回 false 表示添加失败,前面分析过想达到这么一种效果:
- 在队列满时,不是立刻返回,而是当前线程进入等待
- 什么时候队列不满了,再唤醒这个等待的线程,从上次的代码处继续向下运行
ReentrantLock 可以配合条件变量来实现
ReentrantLock lock = new ReentrantLock();
Condition tailWaits = lock.newCondition(); // 条件变量
int size = 0;
public void offer(String e) {
lock.lockInterruptibly();
try {
while (isFull()) {
tailWaits.await(); // 当队列满时, 当前线程进入 tailWaits 等待
}
array[tail] = e;
tail++;
size++;
} finally {
lock.unlock();
}
}
private boolean isFull() {
return size == array.length;
}
- 条件变量底层也是个队列,用来存储这些需要等待的线程,当队列满了,就会将 offer 线程加入条件队列,并暂时释放锁
- 将来队列如果不满了(由 poll 线程那边得知)可以调用 tailWaits.signal() 来唤醒 tailWaits 中首个等待的线程,被唤醒的线程会再次抢到锁,从上次 await 处继续向下运行
操作前 | offer(4) | offer(5) | poll() | 操作后 |
---|---|---|---|---|
[1 2 3] | 队列满,进入tailWaits 等待 | [1 2 3] | ||
[1 2 3] | 取走 1,队列不满,唤醒线程 | [2 3] | ||
[2 3] | 抢先获得锁,发现不满,放入 5 | [2 3 5] | ||
[2 3 5] | 从上次等待处直接向下执行 | [2 3 5 ?] |
关键点:为何要用 while 而不是 if
- 从 tailWaits 中唤醒的线程,会与新来的 offer 的线程争抢锁,谁能抢到是不一定的,如果后者先抢到,就会导致条件又发生变化
- 这种情况称之为虚假唤醒,唤醒后应该重新检查条件,是不是需要重新进入等待
代码
/**
* 单锁实现
* @param <E> 元素类型
*/
public class BlockingQueue1<E> implements BlockingQueue<E> {
private final E[] array;
private int head = 0;
private int tail = 0;
private int size = 0; // 元素个数
@SuppressWarnings("all")
public BlockingQueue1(int capacity) {
array = (E[]) new Object[capacity];
}
ReentrantLock lock = new ReentrantLock();
Condition tailWaits = lock.newCondition();
Condition headWaits = lock.newCondition();
@Override
public void offer(E e) throws InterruptedException {
lock.lockInterruptibly();
try {
while (isFull()) {
tailWaits.await();
}
array[tail] = e;
if (++tail == array.length) {
tail = 0;
}
size++;
headWaits.signal();
} finally {
lock.unlock();
}
}
@Override
public void offer(E e, long timeout) throws InterruptedException {
lock.lockInterruptibly();
try {
long t = TimeUnit.MILLISECONDS.toNanos(timeout);
while (isFull()) {
if (t <= 0) {
return;
}
t = tailWaits.awaitNanos(t);
}
array[tail] = e;
if (++tail == array.length) {
tail = 0;
}
size++;
headWaits.signal();
} finally {
lock.unlock();
}
}
@Override
public E poll() throws InterruptedException {
lock.lockInterruptibly();
try {
while (isEmpty()) {
headWaits.await();
}
E e = array[head];
array[head] = null; // help GC
if (++head == array.length) {
head = 0;
}
size--;
tailWaits.signal();
return e;
} finally {
lock.unlock();
}
}
private boolean isEmpty() {
return size == 0;
}
private boolean isFull() {
return size == array.length;
}
}
注意
- JDK 中 BlockingQueue 接口的方法命名
- 方法 offer(E e) 是非阻塞的实现,阻塞实现方法为 put(E e)
- 方法 poll() 是非阻塞的实现,阻塞实现方法为 take()
2.双锁实现
单锁的缺点在于:
- 生产和消费几乎是不冲突的,唯一冲突的是生产者和消费者它们有可能同时修改 size
- 冲突的主要是生产者之间:多个 offer 线程修改 tail
- 冲突的还有消费者之间:多个 poll 线程修改 head
进一步提高性能,可以用两把锁
- 一把锁保护 tail
- 另一把锁保护 head
ReentrantLock headLock = new ReentrantLock(); // 保护 head 的锁
Condition headWaits = headLock.newCondition(); // 队列空时,需要等待的线程集合
ReentrantLock tailLock = new ReentrantLock(); // 保护 tail 的锁
Condition tailWaits = tailLock.newCondition(); // 队列满时,需要等待的线程集合
offer 方法
缺点是 size 并不受 tailLock 保护,tailLock 与 headLock 是两把不同的锁,并不能实现互斥的效果
@Override
public void offer(E e) throws InterruptedException {
tailLock.lockInterruptibly();
try {
// 队列满等待
while (isFull()) {
tailWaits.await();
}
// 不满则入队
array[tail] = e;
if (++tail == array.length) {
tail = 0;
}
// 修改 size (有问题)
size++;
} finally {
tailLock.unlock();
}
}
size 需要用下面的代码保证原子性
AtomicInteger size = new AtomicInteger(0); // 保护 size 的原子变量
size.getAndIncrement(); // 自增
size.getAndDecrement(); // 自减
代码修改为
@Override
public void offer(E e) throws InterruptedException {
tailLock.lockInterruptibly();
try {
// 队列满等待
while (isFull()) {
tailWaits.await();
}
// 不满则入队
array[tail] = e;
if (++tail == array.length) {
tail = 0;
}
// 修改 size
size.getAndIncrement();
} finally {
tailLock.unlock();
}
}
poll 方法
@Override
public E poll() throws InterruptedException {
E e;
headLock.lockInterruptibly();
try {
// 队列空等待
while (isEmpty()) {
headWaits.await();
}
// 不空则出队
e = array[head];
if (++head == array.length) {
head = 0;
}
// 修改 size
size.getAndDecrement();
} finally {
headLock.unlock();
}
return e;
}
如何通知 headWaits 和 tailWaits 中等待的线程,比如 poll 方法拿走一个元素,通知 tailWaits。
代码
@Override
public E poll() throws InterruptedException {
E e;
headLock.lockInterruptibly();
try {
// 队列空等待
while (isEmpty()) {
headWaits.await();
}
// 不空则出队
e = array[head];
if (++head == array.length) {
head = 0;
}
// 修改 size
size.getAndDecrement();
// 通知 tailWaits 不满(有问题)
tailWaits.signal();
} finally {
headLock.unlock();
}
return e;
}
要使用这些条件变量的 await(), signal() 等方法需要先获得与之关联的锁,上面代码运行出现错误
java.lang.IllegalMonitorStateException
解决:避免嵌套,两段加锁的代码平级
性能提升
-
代码调整后 offer 并没有同时获取 tailLock 和 headLock 两把锁,因此两次加锁之间会有空隙,这个空隙内可能有其它的 offer 线程添加了更多的元素,那么这些线程都要执行 signal(),通知 poll 线程队列非空吗?
- 每次调用 signal() 都需要这些 offer 线程先获得 headLock 锁,成本较高,要想法减少 offer 线程获得 headLock 锁的次数
- 可以加一个条件:当 offer 增加前队列为空,即从 0 变化到不空,才由此 offer 线程来通知 headWaits,其它情况不归它管
-
队列从 0 变化到不空,会唤醒一个等待的 poll 线程,这个线程被唤醒后,肯定能拿到 headLock 锁,因此它具备了唤醒 headWaits 上其它 poll 线程的先决条件。如果检查出此时有其它 offer 线程新增了元素(不空,但不是从0变化而来),那么不妨由此 poll 线程来唤醒其它 poll 线程,称之为级联通知(cascading notifies)
-
在 poll 时队列从满变化到不满,才由此 poll 线程来唤醒一个等待的 offer 线程,目的也是为了减少 poll 线程对 tailLock 上锁次数,剩下等待的 offer 线程由这个 offer 线程间接唤醒
代码
public class BlockingQueue2<E> implements BlockingQueue<E> {
private final E[] array;
private int head = 0;
private int tail = 0;
private final AtomicInteger size = new AtomicInteger(0);
ReentrantLock headLock = new ReentrantLock();
Condition headWaits = headLock.newCondition();
ReentrantLock tailLock = new ReentrantLock();
Condition tailWaits = tailLock.newCondition();
public BlockingQueue2(int capacity) {
this.array = (E[]) new Object[capacity];
}
@Override
public void offer(E e) throws InterruptedException {
int c;
tailLock.lockInterruptibly();
try {
while (isFull()) {
tailWaits.await();
}
array[tail] = e;
if (++tail == array.length) {
tail = 0;
}
c = size.getAndIncrement();
// a. 队列不满, 但不是从满->不满, 由此offer线程唤醒其它offer线程
if (c + 1 < array.length) {
tailWaits.signal();
}
} finally {
tailLock.unlock();
}
// b. 从0->不空, 由此offer线程唤醒等待的poll线程
if (c == 0) {
headLock.lock();
try {
headWaits.signal();
} finally {
headLock.unlock();
}
}
}
@Override
public E poll() throws InterruptedException {
E e;
int c;
headLock.lockInterruptibly();
try {
while (isEmpty()) {
headWaits.await();
}
e = array[head];
if (++head == array.length) {
head = 0;
}
c = size.getAndDecrement();
// b. 队列不空, 但不是从0变化到不空,由此poll线程通知其它poll线程
if (c > 1) {
headWaits.signal();
}
} finally {
headLock.unlock();
}
// a. 从满->不满, 由此poll线程唤醒等待的offer线程
if (c == array.length) {
tailLock.lock();
try {
tailWaits.signal();
} finally {
tailLock.unlock();
}
}
return e;
}
private boolean isEmpty() {
return size.get() == 0;
}
private boolean isFull() {
return size.get() == array.length;
}
}