前言
相关系列
- 《Java & Lock & 目录》(持续更新)
- 《Java & Lock & Semaphore & 源码》(学习过程/多有漏误/仅作参考/不再更新)
- 《Java & Lock & Semaphore & 总结》(学习总结/最新最准/持续更新)
- 《Java & Lock & Semaphore & 问题》(学习解答/持续更新)
涉及内容
- 《Java & Lock & AQS & 总结》
概述
简介
Semaphore @ 信号量类是俗称“三剑客”的三类常用线程控制工具之一,主要用于对流量进行协调控制以达到通常意义上限流的效果。信号量类通过在内部维护一定数量的许可实现流量限流,许可的本质是数值而非具体的对象,其初始数量会在信号量创建时指定。当线程到达信号量时会尝试获取许可,成功获取许可意味着线程已被允许通过信号量,同时也意味着许可数量的减少。而由于许可的数量终究是有限的,因此当许可耗尽时线程将因为无法获取到许可而被拦截,从而达到限流的作用,由此我们可知信号量的最大线程并发量即为许可数量。
信号量类可以通过释放增加许可数量。释放的本质是单纯的增加,这句看起来完全是废话的理论实际上非常重要,因为其意味着信号量类并不只允许已获取许可的线程释放许可。开发者最容易对信号量类产生的误解是:线程在释放许可之前必须先从信号量中获取许可。产生该误解的原因一方面是因为获取/释放这两种操作名称天然带有因果关系;另一方面也是因为我们的思维深受锁类API“加锁后必须解锁,解锁前必须加锁”的模式固化,因此产生上述误解是非常正常的,但事实是信号量类并不要求释放许可的线程必须先获取许可。也正是因为该特性的原因,许可释放完全可以由任意线程进行,这使得信号量类除了可用于常规限流场景外,还可通过灵活使用达到对线程进行批量拦截及控制任务在不同线程中执行顺序的效果,该知识点会在下文讲解灵活使用时详述。
许可的初始数量并不代表上限。信号量类支持在后续运行中通过释放使得许可数量超过创建时指定的初始数量,该特性使得信号量可以动态地限制流量大小,即动态控制线程的最大并发量,以贴合运行过程中的实际限流所需。
信号量类支持公平/非公平两种访问策略。信号量类支持在创建实例时指定公平策略,从而令线程可以按访问顺序通过信号量以满足现实开发中的公平需求。如果在创建时不指定,那么按照默认规则信号量将是非公平的。非公平信号量虽然无法保证线程“先到先得”,但整体性能相对于公平信号量来说却有质的提升。因此如果没有公平需求,那么非公平信号量应当是实际开发中的首选,该知识点会在下文讲解公平/非公平时详述。
信号量类基于AQS类的共享模式实现。AQS类是Java设计用于在并发环境下保护资源的API,而所谓基于AQS类实现,是指信号量类的相应方法实现本质都是对AQS类内部字段/方法/机制的赋值/重写/调用。但需要注意的是:信号量类并不是AQS类的子类,大多数基于AQS类实现的API都采用在内部定义/实现单/多个AQS类子类并调用实例的方式来实现自身设计,信号量类也不例外,该知识点会在下文讲解AQS类时详述。
使用
创建
-
public Semaphore(int permits) —— 创建指定许可数量的非公平信号量,指定许可数量可为负数。
-
public Semaphore(int permits, boolean fair) —— 创建指定许可数量/公平性的信号量,指定许可数量可为负数。
方法
-
public void acquire() throws InterruptedException —— 获取 —— 从当前信号量中获取许可,如果许可存在则获取;否则无限等待至存在许可为止。线程在进入方法/等待许可期间如果已/被中断会抛出中断异常,但中断状态会被清除。
-
public void acquire(int permits) throws InterruptedException —— 获取 —— 从当前信号量中获取指定数量的许可,如果许可存在且数量足够则获取;否则无限等待至存在足够数量的许可为止。线程在进入方法/等待许可期间如果已/被中断会抛出中断异常,但中断状态会被清除。
-
public void acquireUninterruptibly() —— 获取(不中断) —— 从当前信号量中获取许可,如果许可存在则获取;否则无限等待至存在许可为止。线程在进入方法/等待许可期间如果已/被中断不会抛出中断异常,但中断状态会被保留。
-
public void acquireUninterruptibly(int permits) —— 获取(不中断) —— 从当前信号量中获取指定数量的许可,如果许可存在且数量足够则获取;否则无限等待至存在足够数量的许可为止。线程在进入方法/等待许可期间如果已/被中断不会抛出中断异常,但中断状态会被保留。
-
public boolean tryAcquire() —— 尝试获取 —— 从当前信号量中获取一个许可,如果许可存在则获取并返回true;否则返回false。该方法的公平性不受访问策略的影响。
-
public boolean tryAcquire(int permits) —— 尝试获取 —— 从当前信号量中获取指定数量的许可,如果许可存在且数量足够则获取并返回true;否则返回false。该方法的公平性不受访问策略的影响。
-
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException —— 尝试获取 —— 从当前信号量中获取许可,如果许可存在则获取并返回true;否则有限等待至存在许可为止,超出指定等待时间则返回false。线程在进入方法/等待许可期间如果已/被中断会抛出中断异常,但中断状态会被清除。
-
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException —— 尝试获取 —— 从当前信号量中获取指定数量的许可,如果许可存在且数量足够则获取并返回true;否则有限等待至存在许可为止,超出指定等待时间则返回false。线程在进入方法/等待许可期间如果已/被中断会抛出中断异常,但中断状态会被清除。
-
public void release() —— 释放 —— 释放许可至当前信号量中,该方法不要求当前线程必须先从当前信号量中获取许可。
-
public void release(int permits) —— 释放 —— 释放指定数量的许可至当前信号量中,该方法不要求当前线程必须先从当前信号量中获取许可。
-
public int availablePermits() —— 可用许可 —— 获取当前信号量的许可数量,由于实际许可数量可能在获取期间发生变化,因此该方法的返回值无法作为准确的判断依据。
-
public int drainPermits() —— 流失许可 —— 从当前信号量中获取所有许可,并返回获取的许可数量。
-
protected void reducePermits(int reduction) —— 减少 —— 从当前信号量中减少指定数量的许可,其与获取的区别在于无论当前信号量是否存在许可都可进行减少,因此该方法可令许可数量为负数。*
-
public boolean isFair() —— 是否公平 —— 判断当前信号量是否公平,是则返回true;否则返回false。
-
public final boolean hasQueuedThreads() —— 存在排队线程 —— 判断当前信号量是否存在正在等待获取许可的线程,是则返回true;否则返回false。由于线程可能随时开始/结束等待,因此该方法的返回值无法作为准确的判断依据。
-
public final int getQueueLength() —— 获取队列长度 —— 获取当前信号量中正在等待获取许可的线程数量估计值。由于线程可能随时开始/结束等待,因此该方法的返回值无法作为准确的判断依据。
-
protected Collection getQueuedThreads() —— 获取排队线程集 —— 获取当前信号量中正在等待获取许可的线程估计集。由于线程可能随时开始/结束等待,因此该方法的返回值无法作为准确的判断依据。
模板
/**
* 线程池执行器(注意!!!此处只是为了快速获取线程池执行器,开发环境不推荐使用
* newFixedThreadPool(int nThreads)方法创建线程池执行器,易造成OOM。)
*/
private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(4);
private static final Random RANDOM = new Random();
public static void main(String[] args) {
// 创建指定许可数量的非公平信号量。
Semaphore semaphore = new Semaphore(2);
// 创建指定许可数量及指定公平性的信号量。
// Semaphore semaphore = new Semaphore(5, true);
for (int i = 0; i < 4; i++) {
EXECUTOR_SERVICE.submit(() -> {
try {
long threadId = Thread.currentThread().getId();
System.out.println("线程【" + threadId + "】已到达信号量!!!");
semaphore.acquire();
System.out.println("线程【" + threadId + "】已成功获取许可并通过信号量!!!");
// 休眠至少3秒,模拟业务执行。
System.out.println("线程【" + threadId + "】开始执行任务......");
Thread.sleep(RANDOM.nextInt(5000) + 3000);
System.out.println("线程【" + threadId + "】结束执行任务!!!");
System.out.println("线程【" + threadId + "】已释放许可至信号量!!!");
semaphore.release();
} catch (InterruptedException e) {
// 什么也不做。
}
});
}
}
实现
AQS/安全/同步
AQS类是Java设计用于在并发环境下保护资源的API。AQS类全称AbstractQueuedSynchronizer @ 抽象队列同步器类,其“半”实现了以“同步”为核心的线程管理机制,用于对想要/已经访问资源的线程进行等待/保存/唤醒管理以使之达成/解除同步,该线程管理机制被称简称为同步机制。所谓“同步”是由AQS类定义的概念,并且即使在概念中也极为抽象的存在,原因是其并未像“行走/飞翔/游泳”概念一样被明确,而是类似于“行动”概念一样的再抽象体。如果一定要对“同步”概念进行描述,那么将之大致理解为“规则”是比较准确的,因为达成同步的线程将因为“遵守规则”而实现对受保护资源的安全并发访问。“安全”同样是极为抽象的概念,初学者很容易从数据角度切入而将之片面理解为正确,但由于环境/硬件/需求同样也是程序开发/运行的限制/影响因素,因此“安全”概念实际上也可能基于正确/限流/批次等多种维度被明确。
AQS类将“安全/同步”概念交由子类明确/实现。由于各子类对“安全”概念的明确不同,并且不同的“安全”概念明确又需要制定相应的规则,即明确/实现相应的“同步”概念予以保证,因此AQS类仅是定义了“安全/同步”概念,而概念的明确/实现则被交由子类负责,故而上文才会说其“半”实现了同步机制。通过对各子类“同步”概念明确/实现的回调,AQS类可以在达成各类同步的同时确保同步机制线程管理功能的统一性。这种编程方式被称为模板模式,Java中基本所有抽象类形式的API都使用了模板模式。AQS类子类对“安全”概念是只需明确而无需实现的,因为其作用仅是为“同步”概念提供明确/实现依据,即我们必须先知道资源对安全访问的实际要求为何,随后才能为之设计相应的访问规则。
信号量类基于“拦截”规则在内部实现了AQS类子类。需要事先重点说明的是:虽然信号量类基于AQS类实现,但信号量类并不是AQS类的子类。信号量类通过在内部定义/实现AQS类子类并调用其实例的方式实现自身设计。在信号量类的内部AQS类子类中,“安全”概念被明确为“限流”,即不允许超出指定数量的线程对资源进行访问;而“同步”概念则被明确为“拦截”,即通过拦截保证访问资源的线程不超出指定数量。当创建信号量时,构造方法会联动创建内部AQS类子类实例并保存在[sync @ 同步]中,由于同步的达成便意味着线程对规则的遵守,因此信号量拦截/释放线程的本质即为通过[同步]等待/达成同步。
信号量类内部实现了多种AQS类子类。信号量类在设计上存在公平/非公平两种访问策略,用于控制线程是否按访问顺序通过信号量,该知识点会在下文讲解公平/非公平时详述。而出于支持多种访问策略的目的,信号量锁类共实现了三种AQS类子类,这些AQS类子类会在创建信号量时根据规则/入参被相应的创建并保存在[同步]中…其具体名称及作用如下所示:
Sync @ 同步类:同步类是AQS类的直接抽象子类,同时也是后续两个AQS类间接子类的父/超类,其核心作用在于为AQS类“两荤两素,四菜一汤”中的tryReleaseShared(int releases)方法提供通用实现,并为tryAcquireShared(int acquires)方法定义/实现了非公平策略的底层nonfairTryAcquireShared(int acquires)方法。信号量类之所以会将nonfairTryAcquireShared(int acquires)方法从非公平策略的AQS类子类中抽离是为了确保获取方法“特殊值”形式实现的非公平性不会受AQS类子类公平性的影响。
FairSync @ 公平同步类:公平同步类是同步类/AQS类的直接/间接子类,也是公平策略的功能实现类,其为AQS类“两荤两素,四菜一汤”中的tryAcquireShared(int acquires)方法提供了公平实现。被指定为公平策略的信号量会在创建时实例化公平同步存于[同步]中。
NonFairSync @ 非公平同步类:非公平同步类是同步类/AQS类的直接/间接子类,也是非公平策略的功能实现类,其为AQS类“两荤两素,四菜一汤”中的tryAcquireShared(int acquires)方法提供了非公平实现,其本质是对同步类nonfairTryAcquire(int acquires)方法的直接调用。被指定为非公平策略的信号量会在创建时实例化非公平同步存于[同步]中。
比起同步的达成/解除,信号量类更注重对同步机制的灵活使用。如果说基于AQS类实现的锁类API更注重于同步达成/解除的话,那以“三剑客”为首的线程控制工具就更注重于对同步机制的灵活使用。这两者的区别在于前者中的同步机制只被单纯作为令同步达成/解除的辅助手段,即如果不是因为线程为了达成/解除同步而可能需要等待,那么同步机制就完全没有存在的必要;而后者中的同步机制则转而变为了同步达成/解除的辅助对象,即如果不是因为同步机制需要同步的达成/解除作为其等待/唤醒线程的判断条件,则同步的达成/解除也完全没有存在的意义。故而我们可以知道的是:包含信号量类在内的线程控制工具其拦截/释放线程的本质实际上都是通过对同步机制的灵活调用而令线程进入/退出有限/无限等待状态,由于进入/退出等待状态的线程将停止/恢复对任务的执行,因此就变相达到了线程拦截/释放的效果。该知识点会在下文讲解拦截/释放时详述。
状态/获取/释放/模式
AQS类设计子类使用[state @ 状态]作为同步数据的存储介质。虽说“安全/同步”概念的明确/实现被交由子类负责,但AQS类也并非完全没有为之提供实现思路,其推荐子类使用[状态]来记录同步数据。所谓[状态]是指AQS类所组合的int类型字段,虽说各种AQS类子类会根据目标资源的不同而明确/实现不同的“安全/同步”概念,但究其根本就会发现其实现核心大都是对同步“标记”与“计数”的记录,即记录“线程是否已达成同步”及“线程已达成几次同步”。对于前者这是任意数据类型都可以轻易做到的,而后者则通常使用整数类型记录为最佳,因此[状态]便可供子类在实现“同步”概念时统一保存两项关键数据,故而子类对“同步”概念的实现通常无需考虑同步数据的存储介质问题。但需要特别注意的是:AQS类并没有强制子类必须使用[状态]记录同步数据,事实上由于AQS类只在条件机制中绑定了[状态]的读取操作,因此如果子类并无需使用条件机制,则其也完全可以抛弃或设计其它数据存储介质来实现“同步”概念…虽然通常并没有这个必要。
AQS类子类有义务保证[状态]的正确性。无论是[状态]的获取还是释放,其本质都是对[状态]的赋值行为,而又因为线程获取/释放[状态]的过程可能存在竞争,因此AQS类子类在明确/实现时有义务保证[状态]的正确性。为此AQS类子类往往需要使用CAS来完成对[状态]的赋值,而AQS类也提供了相应的CAS方法以供子类赋值[状态]时调用…当然…这并不是必要的,在已保证线程安全的情况下,对[状态]的赋值也可通过常规方式进行,因此除CAS方法外AQS类也提供了常规的赋值方法以供选择。
AQS类子类对“同步”概念的明确/实现实际上就是对[状态]存在/获取/释放的明确/实现。所谓[状态]存在是指[状态]的情况是否支持执行获取操作;而获取/释放则通常是指线程在[状态]中记录/清除同步数据的行为,由此我们可知线程达成/解除同步的本质即为[状态]的获取/释放。需要特别注意的是:这里的[状态]并不单指[状态],而是泛指所有AQS类子类的实际同步数据存储介质。只是由于[状态]是AQS类首推的同步数据存储介质,因此便被简称为[状态]的存在/获取/释放。
AQS类基于独占/共享特性对[状态]的获取/释放进行了两种定义。同步机制存在独占/共享两种模式,即存在独占/共享两套对线程进行管理以使之达成/解除同步的流程,这两种模式的核心差异点具体有三:一是独占模式的[状态]获取/释放必须前后/成对的出现,但共享模式却并无此硬性规定;二是独占模式流程一次只能唤醒一条等待线程,而共享模式流程理论上一次可以唤醒所有等待线程;三是对[状态]的获取必须分别是基于独占/共享特性的实现,即[状态]在独占模式流程中不允许被多线程同时获取,但在共享模式流程中却可以。因此AQS类定义了两类方法用于对[状态]进行独占/共享特性的获取/释放,并分别供以相应的模式流程进行调用。这些方法因为风格被俗称为“两荤两素,四菜一汤”,具体定义/名称/作用/特性如下文所示。需要特别注意的是:AQS类将模式的使用规则全权交给了子类自定义而自身并未进行任何维度的限制,即AQS类子类可根据自身设计自由选择并明确/实现这些方法,因此在子类中两种模式的线程并存或线程兼具两种模式的情况都是可能存在的,也没有以某种模式获取的[状态]就必须以相同的模式释放这种说法…当然目前主流的AQS类子类中似乎还没有这种混合获取/释放的行为…但我们必须明白的是[状态]本身是没有模式概念的,而是[状态]的获取/释放有模式概念。
- protected boolean tryAcquire(int arg) —— 尝试获取 —— 令当前线程以独占模式尝试获取当前AQS指定数量的状态,成功则返回true;否则返回false。
- protected boolean tryRelease(int arg) —— 尝试释放 —— 令当前线程以独占模式尝试释放指定数量的状态至当前AQS中,如果彻底释放则返回true;否则返回false。
- protected int tryAcquireShared(int arg) —— 尝试共享获取 —— 令当前线程以共享模式尝试获取当前AQS指定数量的状态,成功则返回0/正数表示剩余可用状态总数;否则返回负数。
- protected boolean tryReleaseShared(int arg) —— 尝试共享释放 —— 令当前线程以共享模式尝试释放指定数量的状态至当前AQS中,如果彻底释放则返回true;否则返回false。
- protected boolean isHeldExclusively() —— 是否独占持有 —— 判断当前AQS是否被当前线程独占,是则返回true;否则返回false。
AQS类通过循环尝试确保[状态]获取的必然成功。我们可以从“两荤两素,四菜一汤”中发现的是:[状态]的获取尝试并无法保证成功的必然性,对于这种情况AQS类会通过控制线程循环尝试的方式来保证[状态]获取的必然成功,而这也正是同步机制的核心作用。导致[状态]获取尝试失败的原因有很多,或者说是不可数的,但根据实际情况可以将之具体地分为“[状态]存在”及“[状态]不存在”两类。这其中后者并不值得多言,因为在[状态]不支持获取的情况下失败是理所应当的结果。但前者却是值得重点讲述的,因为如果失败不是因为[状态]不存在而导致,则AQS类并不建议子类将该获取尝试直接判定为失败,而是推荐其在尝试方法中嵌套循环尝试至成功或因为[状态]不存在而失败为止…即AQS类不希望[状态]的获取尝试因为除[状态]不存在以外的原因而失败…因为将线程交由同步机制负责循环重试是相当低效的。而也正是因为该原因,获取尝试的失败通常都带有[状态]不存在的隐性含义。
AQS类子类需要确保[状态]释放尝试的必然成功。与[状态]的获取尝试不同,[状态]的释放尝试在定义上是不允许失败的,因为同步的解除理论上不受除达成以外的任何因素影响,甚至在共享模式中也不受同步达成的影响。但由于AQS类子类对“同步”概念的明确/实现以及赋值CAS确实可能导致失败的情况,故而AQS类子类需要人为确保[状态]释放尝试的必然成功,而这通常会在尝试方法中内嵌循环尝试来实现。虽说[状态]的释放尝试没有失败的说法,但其却存在“彻底”的概念,该概念用于表示释放尝试是否可令AQS存在[状态],因为[状态]的释放与存在之间并不是必然关系。这么说确实有些抽象,但我们可以通过以下例子来理解它:如果某AQS类子类规定线程以独占模式获取N次[状态]后也必须释放N次才能解除同步,那么当释放次数少于N次时,虽然其已释放过[状态],但其它线程依然会因为独占特性而无法成功获取,因此此时AQS中依然是不存在[状态]的,这种情况下释放尝试就需要返回false以表示其未彻底释放。而当线程第N次释放尝试解除同步后,由于此时的AQS已支持其它线程达成同步,因此第N次释放尝试就应该返回true以表示其彻底释放了[状态]。由此我们可知[状态]的彻底释放会带有[状态]存在的隐性含义,也因此同步机制会在[状态]彻底释放时唤醒在同步队列中等待的线程。
信号量类的内部AQS类子类采用了共享模式,并将[状态]的获取明确/实现为减少,而将[状态]的释放明确/实现为增加。由于信号量类只基于共享特性实现的原因,信号量类的内部AQS类子类只对[状态]存在/获取/释放的共享定义进行了实现以提供功能支持。我们首先需要知道的是:由于信号量类在许可的设计上存在上限的原因,其内部需要对许可数量进行记录以判断许可是否耗尽,并将之作为是否拦截线程的判断依据。信号量类选择将许可数量记录在[状态]中,即[状态]的值即为许可的数量,因此[状态]的获取/释放被明确/实现为减少/增加便可对应许可数量在获取/释放时的减少/增加,而当[状态]不足以提供线程所要获取的指定数量许可,即[状态] - 指定数量许可 < 0时便意味着许可已耗尽。信号量类通过上述加/减/判断行为来调用AQS类的同步机制以实现对线程的拦截/释放,该知识点会在下文讲解拦截/释放时详述。
许可的耗尽并不表示其数量必然 <= 0。我们已知许可耗尽的概念是[状态] - 指定数量许可 <= 0,而由于信号量类支持一次性获取多个许可,因此虽然[状态]的值可能并不足以供线程减少指定的数量,但这也并不意味着许可数量 <= 0。典型的案例是:当信号量中存在两个许可时,如果线程试图一次性获取三个许可则必然会被拦截,但此时的信号量中依然存在两个许可,因为许可的获取是一次性的,因此所谓许可耗尽的本质是许可的数量不足而非许可数量 <= 0。
同步队列
AQS类使用同步队列保存尝试达成同步失败的线程。同步队列是AQS类用于保存线程的数据结构,当线程尝试同步失败时,AQS类会将线程封装为节点并尾插至同步队列中有限/无限等待。一个值得思考的问题是:既然同步机制会控制线程循环尝试达成同步,那又为什么要将尝试同步失败的线程加入到同步队列中等待呢?实际上该问题的答案在上文中其实已经提及过,即[状态]获取尝试的失败通常都带有[状态]不存在的隐性含义。而在[状态]不存在的情况下,令线程持续不断地进行必然/大概率失败的同步尝试不过只是徒增开销的无意义行为,因此令线程在同步队列中等待实际上是避免无意义开销的有效手段。当[状态]因线程彻底释放而存在,或同步因为中断/超时而取消时,等待中的线程将被信号/中断/超时唤醒并再次/取消尝试同步。
同步队列是逻辑队列。所谓逻辑队列是指同步队列并不是类似LinkedList @ 链接列表的对象,其本质只是单纯的链表,而AQS类则持有其[head/tail @ 头/尾节点]的引用。由于同步队列的节点类在结构设计上支持持有[前驱/后继节点]的引用,因此AQS类只要持有了[头/尾节点]就相当于持有了整个同步队列。
同步队列是AQS类为子类提供的公平策略实现方案。同步队列是标准FIFO @ 先入先出队列,线程会从队列的尾部插入,并在同步达成后从头部移除。由于AQS类规定只有位于同步队列头部的线程才具备同步资格,因此在同步队列中同步的达成必然是公平的,即在同步队列中成功达成同步的线程必然是访问时间最早/等待时间最久的。此外虽然AQS类只会在线程尝试达成同步失败时将之插入同步队列中,但是否失败却是由子类全权负责明确/实现的,因此除[状态]不存在而导致的被动失败外,AQS类子类还可以先通过“故意/计划”性质的主动失败令线程在加入同步队列后再进行真正的尝试同步,从而确保线程同步达成的必然公平,因此AQS类子类可通过同步队列实现自身的公平策略。而事实上,所有基于AQS类的API其公平策略(如果存在的话)也确实都是如此实现的…至少我没有发现例外。
同步队列是低效的。我们其实不难理解这一点,因为无论同步队列中保存了多少线程,按照AQS类的设定也就只有头部线程可以尝试达成同步,因此同步队列中同步实际上就是在单线程环境中达成的,故而性能低下也是可以预见的。而也正是因为该原因,除非[状态]确实不存在,否则正如上文所说AQS类其实并不建议子类将尝试同步失败的线程交由同步机制负责重试,而是推荐其在尝试方法中嵌套循环尝试至成功或因为[状态]不存在而失败为止,因为这将导致线程在[状态]存在的情况下被加入同步队列中有限/无限等待。虽说这并不会对[状态]获取的成功必然性造成影响,但却会对AQS类子类的性能造成严重的损害。毕竟只有在尝试同步必然/大概率失败的情况下,将线程加入同步队列中等待才有益于减少连续尝试造成的性能损失。
拦截/释放
信号量类通过tryAcquireShared(int acquires)方法尝试减少[状态]。AQS类的共享模式流程通过调用tryAcquireShared(int acquires)方法对[状态]进行尝试性的共享获取,而又因为[状态]共享获取的概念被明确/实现为[状态]的减少,因此tryAcquireShared(int acquires)方法在信号量类中的实际作用即为尝试减少[状态]。由于我们已经说过的是信号量类会将许可数量记录在[状态]中,因此[状态]减少的本质即为许可数量的减少,而当[状态]不足以提供足够数量的许可供线程获取时便意味着许可已被耗尽,同时也意味着信号量需要拦截线程。我们已知信号量类拦截线程的本质是通过灵活调用同步机制而令线程进入有限/无限等待状态,并且上文也已说过在同步机制中线程只有在获取尝试失败的情况下才能被加入同步队列中等待,故而在[状态] - 指定数量许可 < 0时的情况下tryAcquireShared(int acquires)方法会返回false来宣告获取尝试的失败。关于tryAcquireShared(int acquires)方法的具体流程此处暂不展示,原因是为了支持多访问策略信号量类实现了多种AQS类子类,因此tryAcquireShared(int acquires)方法的实现也存在多个,故而该知识点会在下文讲解公平/非公平时详述。此外也是因为成功非必然的原因,tryAcquireShared(int acquires)方法方法虽然是AQS类子类实际需要明确/实现的方法,但实际上信号量类除了“特殊值”形式的获取方法会对其进行了直接调用外,其它形式的许可获取方法都是通过包含等待/保存/唤醒/循环等逻辑的包装方法而对其进行间接调用的。
线程对[状态]的获取/减少需要通过CAS来保证正确性。由于可能有其它线程并发增加/减少[状态],因此线程对[状态]的减少赋值必须通过CAS来保证正确。而又因为赋值CAS可能因为竞争而失败,因此信号量类还在tryAcquireShared(int acquires)方法中内嵌了循环来保证赋值CAS会持续执行到成功或许可耗尽为止。这一点恰好对应了上文“AQS类不希望[状态]的获取尝试因为除[状态]不存在以外的原因而失败”的内容,因为除非许可耗尽,否则赋值CAS失败并不代表[状态]不存在。线程在正式递增[状态]前需要先判断递增后的[状态]是否会溢出/超出Integer.MAX_VALUE,是则直接抛出错误而不执行递增。
信号量类通过tryReleaseShared(int releases)方法尝试增加[状态]。AQS类的共享模式流程通过调用tryReleaseShared(int releases)方法对[状态]进行尝试性的共享释放,而又因为[状态]共享释放的概念被明确/实现为[状态]的增加,因此tryReleaseShared(int releases)方法在信号量类中的实际作用即为尝试增加[状态]。由于可能有其它线程并发增加/减少[状态],因此赋值CAS会循环执行至成功为止以保证释放尝试的必然成功,这一点又恰好对应了上文“AQS类子类需要确保[状态]释放尝试的必然成功”的内容。[状态]的成功增加意味着许可数量的增加,这也意味着原本因为许可耗尽而被拦截的线程将可能成功获取到足够的许可,因此这种情况下信号量需要释放所有拦截线程。我们已知信号量类释放线程的本质是通过灵活调用同步机制而令线程退出有限/无限等待状态,即信号唤醒处于有限/无限等待状态中的线程,并且上文也已说过同步机制的共享模式会在[状态]彻底释放时“一次”唤醒同步队列中所有的等待线程,因此当[状态]成功增加时tryReleaseShared(int releases)方法会返回true来宣告[状态]的彻底释放…相关流程如下:
信号量类并不要求释放许可的线程必须先获取许可,并且许可的初始数量也并不代表上限。我们可以从上述流程中发现的是:释放尝试在增加许可时并不会判断当前线程是否获取过许可,而事实上信号量类也确实没有任何数据结构/字段对已获取许可的线程进行记录,由此我们可知信号量类并不要求释放许可的线程必须先获取许可。而也正是因为该特性的原因,许可释放完全可以由任意线程进行,这使得信号量类除了可用于常规限流场景外,还可通过灵活使用达到对线程进行批量拦截及控制任务在不同线程中执行顺序的效果。此外我们还可以发现的是:许可的增加除了会因为超出int类型的物理上限而抛出错误外,不存在包含初始数量在内的任意其它上限判断,由此我们可知许可的初始数量并不代表上限,即信号量类支持在后续运行中通过释放使得许可数量超过创建时指定的初始数量,该特性使得信号量可以动态地限制流量大小,即动态控制线程的最大并发量,以贴合运行过程中的实际限流所需。
公平/非公平
我们已知同步队列是AQS类为子类提供的公平策略实现方案,因为AQS类通过结合“同步队列的FIFO特性”与“只允许头部线程尝试同步的设定”保证了位于同步队列中的线程同步必然是公平的,即在同步队列中达成同步的线程必然是访问时间最早/等待时间最久的。因此信号量类的公平策略自然也毫无意外的基于同步队列实现,故而在这里我们会先对信号量类公平策略的实现进行讲述,随后再讲解非公平策略实现是如何破坏这种公平性的。
公平同步的实现核心在于入队。既然同步队列中的线程同步必然是公平的,那么如何确保线程在正式尝试同步前必然已先加入同步队列就成为信号量类公平策略的实现关键。我们已经知道的是线程只会在同步尝试失败后受同步机制的控制而加入同步队列,但如果说线程在加入同步队列前必须先进行尝试获取,那么公平性似乎就变得无法保证,因为线程完全可能在此次尝试中成功同步。但事实上这一点并非是无法解决的,因为正如上文所说既然“同步”概念由子类负责明确/实现,那么同步的成功/失败概念自然也由子类明确/实现。因此AQS类子类完全可以先通过“故意/计划”性质的主动失败而令线程必然加入同步队列后再进行真正的同步尝试…相关流程如下:
“故意/计划”性质的主动失败在于判断上游线程是否存在。所谓上游线程是指访问顺序在当前线程之前的线程,其存在意味着当前线程并不是访问时间最早的线程,因此为了保证公平当前线程会主动放弃当前同步尝试,从而保证自身必然在加入同步队列后再正式尝试同步。上游线程的存在判断实际上是对hasQueuedPredecessors()方法的直接调用,该方法由AQS类定义/实现,会先判断同步队列中是否存在线程,随后在确定存在的情况下继续判断当前线程是否是同步队列的头部线程。如果当前线程为头部线程,则意味着当前线程同步达成的公平性已被保证,并且还具有尝试同步的权利,因此这种情况下hasQueuedPredecessors()方法会返回false以表示当前线程可以继续执行此次同步尝试。而如果当前线程不是头部线程,则说明当前线程存在上游线程,因此出于公平性考虑hasQueuedPredecessors()方法会返回true以告知线程主动放弃此次同步尝试;而如果是同步队列为空的情况,则说明当前线程是最早访问信号量的线程。由于在公平策略中最早访问的线程本身就具有尝试同步的权利,因此出于性能上的考量信号量类允许该线程在不加入同步队列的情况下直接尝试同步,即这种情况下hasQueuedPredecessors()方法会返回false。
一个值得思考的问题是:在上文提及线程可以不加入同步队列的场景中,如果有多个线程在极短的时间范围内一同访问公平信号量,那么该如何确保同步由最早到达的线程完成呢?这确实是一个非常棘手的问题,因为多核及系统时间片分配等原因,线程先发而后至是极为常见/正常的情况,因此信号量的确无法保证该情况下的同步必然由最早访问的线程成功执行。故而信号量类对公平策略的实现实际上是一种相对公平,即其会将同一批次中成功同步的线程定性为最早访问的线程,并将其它线程的入队顺序定性为它们的访问顺序。这是一种很好的变相处理方案,因为在计算机的领域中绝对公平实际上是不存在的。
非公平同步的实现核心在于插队。我们已知公平同步的实现核心在于将线程加入同步队列后再进行正式同步,因此非公平同步只要避免这种线程入队的“必要性”就能轻松实现。但注意!非公平策略并不会导致线程无法加入同步队列,因为如果线程被动尝试同步失败,则其依然会在同步机制的调度下加入同步队列,毕竟保存线程才是同步队列的核心功能。非公平信号量与公平信号量的差别在于其并不会要求线程只有在无上游线程的情况下才允许进行真正的同步尝试,即线程可以先于上游线程直接进行尝试同步。这种将新线程的同步优先级凌驾于上游线程之上的行为被称为“插队”,即无论同步队列是否存在等待中的上游线程,新线程都具备直接尝试同步的权利。
非公平信号量的性能要远高于公平信号量。由于非公平信号量插队行为的存在,线程在不入队/排队的情况下完成独占的概率要远高于公平信号量。并且由于同步队列中的线程只能按访问顺序进行低效率的公平同步,因此非公平信号量的性能要远高于公平信号量。因此在没有特殊需求的情况下,使用非公平信号量应是开发时的首选。
灵活使用
已知的是:信号量类支持在后续运行中通过释放使得许可数量超过创建时指定的初始数量,此外由于信号量类并不要求释放许可的线程必须先获取许可,因此许可释放完全可以由获取许可线程之外的线程来进行。这使得信号量类除了可用于常规限流场景外,还可通过灵活使用达到对线程进行批量拦截及控制任务在不同线程中执行顺序的效果,下文的两段代码分别展示这两类效果的实现过程。
批量拦截
/**
* 线程池执行器(注意!!!此处只是为了快速获取线程池执行器,开发环境不推荐使用
* newFixedThreadPool(int nThreads)方法创建线程池执行器,易造成OOM。)
*/
private static final int PERMITS = 5;
private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(PERMITS);
public static void main(String[] args) throws InterruptedException {
// 创建指定许可数量的非公平信号量。
Semaphore semaphore = new Semaphore(0);
// 创建指定许可数量及指定公平性的信号量。
// Semaphore semaphore = new Semaphore(5, true);
for (int i = 0; i < PERMITS; i++) {
EXECUTOR_SERVICE.submit(() -> {
try {
long threadId = Thread.currentThread().getId();
System.out.println("线程【" + threadId + "】已到达信号量,并因为无许可而被拦截!!!");
semaphore.acquire();
System.out.println("线程【" + threadId + "】已成功获取许可并通过信号量!!!");
System.out.println("线程【" + threadId + "】开始执行任务......");
} catch (InterruptedException e) {
// 什么也不做。
}
});
}
// 休眠5秒,模拟业务执行。
Thread.sleep(5000);
// 通过释放增加5个许可。
System.out.println("主线程已释放" + PERMITS + "个许可至信号量!!!");
System.out.println();
semaphore.release(PERMITS);
}
控制任务在不同线程中执行顺序
public static void main(String[] args) {
// 创建指定许可数量的非公平信号量。
Semaphore semaphore = new Semaphore(0);
// 创建指定许可数量及指定公平性的信号量。
// Semaphore semaphore = new Semaphore(5, true);
Thread thread1 = new Thread(() -> {
System.out.println("任务A准备开始执行......");
System.out.println("任务A开始执行......");
// 休眠3秒,模拟业务执行。
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("任务A结束执行!!!");
System.out.println("任务A已释放许可至信号量!!!");
semaphore.release();
});
Thread thread2 = new Thread(() -> {
System.out.println("任务B准备开始执行......");
System.out.println("任务B由于信号量不存在许可而被拦截!!!");
try {
semaphore.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("任务B成功获取许可并通过信号量!!!");
System.out.println("任务B开始执行......");
// 休眠1秒,模拟业务执行。
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("任务B结束执行!!!");
});
thread1.start();
thread2.start();
}
BUG
相比三剑客中的其它两个线程控制工具,信号量类有着超乎寻常的知名度。它的著名一方面源于限流场景非常常见,使得其在实际开发中的使用率非常高;另一方面则是因为其存在一个非常闻名遐迩的BUG,或者更准确的说是其设计暴露了底层AQS类的一个BUG。该BUG虽然已在JDK1.6版本中被修复,但直接后果是导致AQS类的学习难度上升了好几个数量级。因此对AQS类有过深入学习的开发者必然对之记忆犹新,顺带着对信号量类也怨念尤深咬牙切齿,以至于那个案例如今依然保存在ORACLE官网上的Java BUG存档中。
BUG的核心问题在于其破坏了“共享节点[线程]状态唤醒的传播性”。“共享节点[线程]状态唤醒的传播性”简单的说就是当等待中的线程在被唤醒并以共享模式成功获取[状态]后,如果当前AQS中还存在[状态],则当前被唤醒的线程还需要负责唤醒后续的等待线程。而所谓“共享节点[线程]状态唤醒的传播性”的破坏则是指在当前AQS中还存在[状态]的情况下,当前被唤醒的线程未唤醒后续等待线程的情况。该情况会导致后续等待线程被延迟唤醒(被非“共享节点[线程]状态唤醒的传播性”的方式唤醒)或永远无法被唤醒,从而造成线程永久等待的情况发生。实际上,在线程“先获取许可再释放许可”常规的使用方式下信号量类是不会触发该BUG的,但如果采用“非对称”的使用方式则存在该可能,关于该知识点的内容会在AQS类的相应文章中详述。