AQS之Semaphore详解
- 一、Semaphore类的继承关系
- 1. AbstractQueuedSynchronizer:提供了一个同步器的框架。
- 2. Sync:Semaphore的内部类,提供了锁的具体实现。
- 3. FairSync:Sync的子类,实现公平锁。
- 4. NonfairSync:Sync的子类,实现非公平锁。
- 二、Semaphore的基本使用
- 1. 使用场景:
- 2. 代码实现:
- 3. 运行结果:
- 4. 案例分析:
- 三、Semaphore的优缺点
- 1. 简介:
- 2. 优点:
- 2.1、灵活性:
- 2.2、可重入性:
- 2.3、公平性:
- 2.4、可以用于多种场景:
- 3. 优点:
- 3.1、使用复杂:
- 3.2、容易造成死锁:
- 3.3、不支持条件等待:
- 3.4、可能出现饥饿:
- 3.5 小结:
- 四、源码分析
- 1. 构造方法
- 1.1 Semaphore(int permits)
- 1.2 Semaphore(int permits, boolean fair)
- 1.3 构造方法小结
- 2. acquire方法
- 2.1 AQS#acquireSharedInterruptibly
- 2.2 Semaphore#tryAcquireShared
- 2.3 AQS#doAcquireSharedInterruptibly
- 2.4 AQS#doAcquireSharedInterruptibly#addWaiter
- 2.5 AQS#addWaiter#enq
- 2.6 AQS#shouldParkAfterFailedAcquire
- 2.7 AQS#parkAndCheckInterrupt
- 2.8 AQS#LockSupport.park
- 2.9 流程图
- 3. release方法
- 3.1 AQS#releaseShared
- 3.2 Semaphore#tryReleaseShared
- 3.3 AQS#doReleaseShared
- 3.4 AQS#doReleaseShared#unparkSuccessor
- 3.5 AQS#LockSupport.unpark
- 3.6 AQS#doReleaseShared#setHeadAndPropagate
- 3.7 AQS#doReleaseShared#setHeadAndPropagate#isShared
- 3.8 流程图
一、Semaphore类的继承关系
1. AbstractQueuedSynchronizer:提供了一个同步器的框架。
- AQS为Semaphore提供了基本的针对共享资源的获取失败入队出队阻塞唤醒的逻辑。
- Semaphore通过AQS的同步状态来表示可用的许可数,并通过AQS的等待队列来管理等待获取许可的线程。
- 当一个线程请求获取许可时,如果许可数不足,则该线程会被阻塞并加入到AQS的等待队列中。
- 当有其他线程释放许可时,AQS会从等待队列中选择一个线程唤醒,使其重新尝试获取许可。
- 这样就实现了对共享资源的获取失败入队出队阻塞唤醒的逻辑。
2. Sync:Semaphore的内部类,提供了锁的具体实现。
- Sync是Semaphore的内部类,它继承自AQS并重写了其中的方法,用于实现Semaphore的同步逻辑。
3. FairSync:Sync的子类,实现公平锁。
- FairSync是Sync的子类,它实现了公平的获取许可的机制。当线程请求许可时,FairSync会按照FIFO的顺序选择等待的线程获取许可。
4. NonfairSync:Sync的子类,实现非公平锁。
- NoFairSync是Sync的子类,它实现了非公平的获取许可的机制。当线程请求许可时,NoFairSync会直接尝试获取许可,而不管是否有其他线程在等待。
二、Semaphore的基本使用
1. 使用场景:
假设有一个公共游泳池,最多允许同时有5个人在游泳,其他人需要等待。这就可以使用Semaphore来控制并发访问的线程数。
2. 代码实现:
import java.util.concurrent.Semaphore;
public class SwimmingPool {
private static final int MAX_SWIMMERS = 5;
private static final Semaphore semaphore = new Semaphore(MAX_SWIMMERS);
public static void main(String[] args) {
for (int i = 1; i <= 10; i++) {
Thread swimmer = new Thread(new Swimmer(i));
swimmer.start();
}
}
static class Swimmer implements Runnable {
private int id;
public Swimmer(int id) {
this.id = id;
}
@Override
public void run() {
try {
semaphore.acquire(); // 请求获取许可,如果没有可用许可则阻塞等待
System.out.println("Swimmer " + id + " starts swimming.");
Thread.sleep(10000); // 模拟游泳过程
semaphore.release(); // 释放许可
System.out.println("Swimmer " + id + " finishes swimming.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
3. 运行结果:
4. 案例分析:
- 在这个案例中,有10个游泳者想要在游泳池中游泳,但是游泳池最多允许同时有5个人在游泳。
- Semaphore的初始许可数为5,当有游泳者调用acquire()方法请求获取许可时,如果许可数不足,则该游泳者会被阻塞等待。
- 当有其他游泳者完成游泳并调用release()方法释放许可时,Semaphore会选择一个等待的游泳者唤醒,使其开始游泳。
- 通过Semaphore的控制,保证了同时游泳的人数不超过5个。
三、Semaphore的优缺点
1. 简介:
Semaphore是一种并发控制工具,用于限制同时访问某个资源或执行某个任务的线程数量。它在多线程环境中起到了线程同步和互斥的作用。
2. 优点:
2.1、灵活性:
Semaphore可以根据需要设置初始许可数,允许多个线程同时访问共享资源,或者限制同时执行的任务数量。
2.2、可重入性:
Semaphore是可重入的,同一个线程可以多次获取和释放许可。
2.3、公平性:
Semaphore可以选择是否公平地分配许可。如果设置为公平模式,那么等待时间最长的线程将优先获取许可。
2.4、可以用于多种场景:
Semaphore可以用于解决生产者-消费者问题、连接池管理、并发线程数控制等多种并发场景。
3. 优点:
3.1、使用复杂:
相对于其他线程同步和互斥的工具,Semaphore的使用相对复杂,需要手动调用acquire()和release()方法进行许可的获取和释放,容易出现逻辑错误。
3.2、容易造成死锁:
如果在使用Semaphore时没有正确地释放许可,可能会导致线程间的死锁情况,造成程序无法继续执行。
3.3、不支持条件等待:
与ReentrantLock相比,Semaphore不支持线程的条件等待和通知,无法使用wait()和notify()方法进行线程间的通信。
3.4、可能出现饥饿:
在公平模式下,许可的获取是按照线程等待的先后顺序进行的,这可能导致某些线程一直无法获取到许可,出现饥饿现象。
3.5 小结:
综上所述,Semaphore是一种功能强大的并发控制工具,能够灵活地管理共享资源的访问和任务的执行。然而,由于其使用复杂、可能导致死锁、不支持条件等待以及可能出现饥饿等缺点,开发人员在使用Semaphore时需要仔细考虑和处理这些问题。
四、源码分析
1. 构造方法
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
1.1 Semaphore(int permits)
- 参数permits表示同时可以访问的线程数目。如果permits的值为1,表示Semaphore对象可以用作互斥锁。
- 当permits的值大于1时,Semaphore对象可以用作资源池的控制器,限制可以访问资源的线程的数量。
1.2 Semaphore(int permits, boolean fair)
- 参数fair表示是否采用公平锁策略。如果为true,表示Semaphore对象采用公平锁策略,即先进入等待队列的线程将先获得许可;如果为false,表示Semaphore对象采用非公平锁策略,线程获取许可的顺序是不确定的。
1.3 构造方法小结
- 使用Semaphore的构造方法,可以创建一个具有指定许可数目和锁策略的Semaphore对象。
- 根据不同的应用场景,可以选择适合的构造方法来创建Semaphore对象,然后使用acquire()和release()方法来获取和释放许可。
2. acquire方法
外部调用加锁方法acquire(),(支持可中断)
中断概念如果不清楚的话,可以参考Java之线程中断
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
2.1 AQS#acquireSharedInterruptibly
acquire方法内部会调用acquireSharedInterruptibly方法,可以看到
AQS给我们提供了模板方法: tryAcquireShared
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
注意,这里没有实现加锁的逻辑(模板方法留给子类实现)
2.2 Semaphore#tryAcquireShared
调用子类Semaphore的tryAcquireShared方法(这里以非公平为例)
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
//非公平
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
//公平
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
- 获取剩余资源许可int available = getState();
- 得到剩余许可remaining
- 小于0直接返回,否则,说明有许可,CAS操作保证线程安全获取锁
- CAS成功则获取锁,执行业务代码。
- 如果没有获取锁tryAcquireShared(arg) < 0,回到acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
2.3 AQS#doAcquireSharedInterruptibly
没有可用资源,只能调用doAcquireSharedInterruptibly(走入队逻辑)
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2.4 AQS#doAcquireSharedInterruptibly#addWaiter
addWaiter方法: 添加到等待队列
- 构造Node结点
- 第一个入队的线程需要调用enq(node);
- 创建队列
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
创建一个需要入队的结点,队列不为空的情况下,CAS操作将其设置为tail尾结点。
2.5 AQS#addWaiter#enq
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
- 队列没有一个结点,那么第一个线程需要构建队列。
- 开始入队,尾插法
2.1 node.prev = t; 修改当前结点的前驱指针
2.2 compareAndSetTail(t, node) 将当前结点CAS置为tail尾结点
2.3 t.next = node;修改当前结点的后继指针 - addWaiter方法结束,入队完成。
- final Node p = node.predecessor();获取当前结点的前驱结点
- 如果前驱结点是head,又会调用tryAcquireShared(子类),尝试获取资源
- 如果没有可用许可,调用shouldParkAfterFailedAcquire准备阻塞
2.6 AQS#shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
准备阻塞:将当前结点的前驱结点pred.waitStatus(CAS)置为-1 SIGNAL 可唤醒
static final int SIGNAL = -1;
-1 表示后继的线程结点需要被唤醒
2.7 AQS#parkAndCheckInterrupt
调用parkAndCheckInterrupt()方法
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
2.8 AQS#LockSupport.park
最后一步,LockSupport.park(this),真正阻塞,将当前线程挂起。
2.9 流程图
3. release方法
外部调用释放锁方法:release()方法
public void release() {
sync.releaseShared(1);
}
3.1 AQS#releaseShared
通过内部类sync调用AQS模板方法:releaseShared()方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared方法AQS内部没有实现,留给子类实现。
3.2 Semaphore#tryReleaseShared
来到Semaphore类tryReleaseShared 子类提供具体实现释放锁的逻辑。
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
tryReleaseShared大致逻辑:
- 获取当前state变量值,把资源的许可数量+1
- 通过CAS+for循环保证操作更新成功。
- tryReleaseShared结束(释放资源)
3.3 AQS#doReleaseShared
tryReleaseShared结束来到,AQS的doReleaseShared方法。
调用doReleaseShared,唤醒阻塞队列线程。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
如果头结点head等于Node.SIGNAL,CAS置为0,唤醒前的准备工作。
3.4 AQS#doReleaseShared#unparkSuccessor
doReleaseShared内部调用unparkSuccessor方法
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
获取到Node s = node.next;头结点的下一个结点要唤醒的结点
3.5 AQS#LockSupport.unpark
执行LockSupport.unpark(s.thread)方法
唤醒阻塞线程
3.6 AQS#doReleaseShared#setHeadAndPropagate
紧接着被唤醒的线程会来到doAcquireSharedInterruptibly方法的for循环里
刚好等于头结点,可以尝试获取资源
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
获取head结点,将当前结点设置为head结点,释放之前的头结点,让gc回收。
3.7 AQS#doReleaseShared#setHeadAndPropagate#isShared
到这里还没完,setHeadAndPropagate方法里会调用isShared() 判断当前链表是否是共享模式
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
如果前驱节点正好是head,就可以尝试获取资源,获取成功就可以执行业务逻辑,只要资源数(state>0)充足,可以一直唤醒。