Semaphore(信号量)是操作系统中PV操作的原语在java中的实现,它也是基于AQS实现的。其中PV操作是操作系统中一种实现进程互斥与同步的有效方法。PV操作与信号量(S)的处理有关,P表示通过,V表示释放。用PV操作来管理共享资源时,首先要确保PV操作自身执行的正确性。
P操作的主要动作如下:
- S减1;
- 若S减1后仍大于等于0,则进程继续执行;
- 若S减1后小于0,则该进程被阻塞后放入等待该信号量的等待队列中,转进程调度。
V操作的主要动作如下:
- S加1;
- 若相加后结果大于0,则进程继续执行;
- 若相加后结果小于等于0,则从该信号的等待队列中释放一个等待进程,再返回原进程继续执行或转进程调度。
ReentrantLock是AQS的独占锁实现,Semaphore是AQS的共享锁实现。Semaphore通过设置资源数量可以实现限流的功能,即控制同时只能有n个线程获取信号量。AQS的state对Semaphore来说可以是共享资源的数量,也可以是许可证的数量。当state>0时线程可以获得许可证继续执行,state-1;当state=0时线程不能获得许可证进入同步等待队列,阻塞直到被唤醒。
Semaphore有两个构造函数,如下:
//传入许可证数量
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
//传入共享资源数量和是否公平锁,默认是非公平锁
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
其常用方法和方法作用如下:
- public void acquire() throws InterruptedException:尝试获取锁,如果获取失败则阻塞;
- public boolean tryAcquire():尝试获取锁,如果获取失败则直接返回false,不会阻塞;
- public void release():释放许可;
- public int availablePermits():获取可用的许可证数;
- public final int getQueueLength():返回正在等待获取许可证的线程数;
- public final boolean hasQueuedThreads():是否有线程正在等待许可证;
- protected void reducePermits(int reduction):减少reduction个许可证;
- public final Collection<Thread> getQueuedThreads():获取等待许可证的线程集合。
Semaphore使用示例如下,初始化一个许可证数量为3的信号量,doSomething()方法每次都需要获取一个信号量才能执行,执行时间为2S,因此即便是main方法中的线程池每秒执行5次doSomething()方法,最终的效果仍然是每两秒执行三次doSomething()方法,这就达到了限流的目的。
@Slf4j
public class SemaphoreExample {
//定义一个许可证数量为3的信号量
private static Semaphore semaphore = new Semaphore(3);
//线程池
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,30, TimeUnit.SECONDS,new ArrayBlockingQueue<>(50));
public static void main(String[] args) throws InterruptedException {
while (true) {
executor.execute(() -> doSomething());
//每秒执行5次
Thread.sleep(200);
}
}
private static void doSomething() {
try {
//尝试获取一个许可证
semaphore.acquire(1);
log.info("正在执行...");
Thread.sleep(2000);
} catch (InterruptedException e) {
log.error("操作失败",e);
} finally {
//释放许可
semaphore.release();
}
}
}
控制台打印如下,可以看到大概每两秒才会执行三次doSomething()方法,这就是由信号量Semaphore来控制的。
17:42:54.199 [pool-1-thread-1] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:42:54.409 [pool-1-thread-2] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:42:54.609 [pool-1-thread-3] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:42:56.203 [pool-1-thread-4] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:42:56.421 [pool-1-thread-2] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:42:56.622 [pool-1-thread-5] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:42:58.204 [pool-1-thread-1] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:42:58.421 [pool-1-thread-3] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:42:58.623 [pool-1-thread-5] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:43:00.214 [pool-1-thread-1] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:43:00.427 [pool-1-thread-4] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:43:00.626 [pool-1-thread-5] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
源码解析
Semaphore的AQS实现也区分公平和非公平,由于这两种锁的区别很小,此处只介绍较常用的也是默认的非公平锁的实现。
acquire(permits)获取许可证
- Semaphore获取许可证的方法是acquire(permits),实现如下:
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
- acquireSharedInterruptibly(permits)方法是AQS中定义的共享锁获取锁的通用方法,实现如下:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
- tryAcquireShared(arg)是提供给子类实现的模版方法,该方法在Semaphore中的实现如下:
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//获取state许可证数量
int available = getState();
int remaining = available - acquires;
//remaining小于0,表示获取许可证失败,返回一个负值
//remaining大于等于0,表示剩余许可证是足够的,使用CAS尝试修改state许可证数量,如果获取失败则重复获取直到获取成功,返回一个大于等于0的值
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
- 在第2步中,如果tryAcquireShared(arg)方法返回值不小于0,则表示当前线程使用CAS获取许可证成功,否则获取失败调用doAcquireSharedInterruptibly(arg)方法进入同步等待队列阻塞,doAcquireSharedInterruptibly(arg)方法实现如下:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//将当前线程封装成一个Node对象
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
//如果当前节点的的前一个节点是head,则尝试获取许可证
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//获取许可证成功则设置当前节点为head节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//如果当前节点不是head的下一个节点,则直接阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquire(permits)获取许可证方法的流程总结如下:
- 调用Semaphore实现的tryAcquireShared(arg)方法尝试获取许可证;
- 如果当前许可证数量足够(即state-要获取的许可证数量>=0),则循环调用CAS尝试修改state获取许可证,直到获取成功直接返回或者许可证被其他线程获取导致数量不够;
- 如果当前许可证数量不够(即state-要获取的许可证数量<0),则将当前线程封装到Node对象并添加到同步队列中;
- 判断当前节点是否是head节点的下一个节点,如果是则尝试获取许可证、出队head节点、设置当前Node为head节点并返回;如果不是,则调用LockSupport.park()方法阻塞当前线程。
release(permits)释放许可证
- release(permits)方法的实现如下:
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
- releaseShared(permits)是AQS中实现的共享锁释放锁的通用方法,实现如下:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
- tryReleaseShared(arg)是AQS定义的给子类实现的模版方法,该方法在Semaphore的实现如下:
protected final boolean tryReleaseShared(int releases) {
for (;;) {
//获取许可证数量
int current = getState();
int next = current + releases;
//next < current表示要释放的许可证releases<0,抛出异常
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//使用CAS修改许可证数量,如果失败则重复调用,直到修改成功返回true
if (compareAndSetState(current, next))
return true;
}
}
- tryReleaseShared(arg)返回true后,调用doReleaseShared()方法,该方法在AQS中实现,具体实现如下:
private void doReleaseShared() {
//遍历唤醒同步等待队列节点,释放许可证
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//前面如果没有节点被唤醒,则h仍然指向head,表示许可证已经释放完成或者许可证数量已经不够了,直接返回
if (h == head) // loop if head changed
break;
}
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
release(permits)方法释放许可证的流程总结如下:
- 调用tryReleaseShared(arg)方法尝试释放许可证,如果释放失败则一直调用CAS修改state许可证数量,直到成功返回true;
- 释放成功后,唤醒同步等待队列中的head的下一个节点;
- head的下一个节点被唤醒后,会继续执行doAcquireSharedInterruptibly()方法中的循环语句尝试获取许可证;
- 另外,此处会一直唤醒同步等待队列中的节点,直到同步等待队列节点为空或者许可证数量不够。
需要注意的是,tryReleaseShared(arg)方法释放许可证方法并没有判断许可证上限,例如定义了信号量的许可证数量为1,直接调用release()方法,在tryReleaseShared()方法中调用CAS是可以修改成功的,这里直接修改的是AQS中的state,因此先调用release()方法会影响限流效果。
例如下面初始化Semaphore的许可证数量为1,调用10次release()方法后,许可证数量变成了11。
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(1);
for (int i=0;i<10;i++) {
semaphore.release(1);
}
System.out.println(semaphore.availablePermits());
}