尚硅谷JUC极速版笔记
- 1、JUC概述
- 1.1 进程和线程
- 1.2 线程的状态(6个)
- 1.3 wait和sleep
- 1.4 并发与并行
- 1.5 管程(锁)
- 1.6 用户线程和守护线程
- 2、Lock接口
- 2.1 复习synchronized(java内置同步锁)
- 2.2 什么是Lock接口
- 2.3 创建线程的多种方式(4种)
- 2.4 使用Lock实现卖票例子
- 2.5 synchronized和Lock两者差异
- 3、线程间通信
- 3.1 synchronized 实现案例(Object中的 wait() 和 notifyAll() 方法)
- 3.2 虚假唤醒问题(if改while)
- 3.3 Lock 实现案例(Condition 类中有 await() 和 signalAll() )
- 4、线程间定制化通信(Condition 类可以进行选择性通知)
- 进程/线程同步四个原则(单标志法存在的问题)
- 5、集合的线程安全
- 5.1 集合线程不安全演示,以 ArrayList 为例
- 5.2 解决方案-Vector增加synchronized关键字
- 5.3 解决方案-Collections接口中的 synchronizedList(List list) 方法
- 5.4 解决方案-CopyOnWriteArrayList(Redis持久化也使用了CopyOnWrite思想)
- 5.5 HashSet的线程不安全(解决方案-CopyOnWriteArraySet )
- 5.6 HashMap的线程不安全(解决方案-ConcurrentHashMap)
- 6、多线程锁 (公平锁和非公平锁,死锁,可重锁)
- 6.1 synchronized 锁的八种情况总结
- 6.2 公平锁和非公平锁
- 6.3 可重入锁
- 6.4 死锁(定义,死锁产生的四个必要条件)
- 7、Callable接口
- 7.1 Callable接口创建线程
- 7.2 FutureTask(适配器模式)
- 8、JUC强大辅助类
- 8.1 减少计数CountDownLatch
- 8.2 循环栅栏CyclicBarrier
- 8.3 信号灯 Semaphore
- 9、读写锁
- 9.1 悲观锁和乐观锁
- 9.2 表锁|行锁|读锁|写锁
- 9.3 读写锁概述
- 9.4 读写锁的演变历程
- 9.5 锁降级的过程和必要性
- 10、阻塞队列
- 10.1 阻塞队列概述
- 10.2 阻塞队列架构
- 10.3 阻塞队列分类
- 10.4 阻塞队列核心方法
- 11、线程池
- 11.1 线程池概述
- 11.2 线程池类图架构
- 11.3 线程池使用方式
- 11.4 线程池底层原则
- 11.5 线程池的七个参数
- 11.6 线程池底层工作流程
- 11.7 自定义线程池
- 12、Fork与Join分支
- 13、异步回调
- 13.1 CompletableFuture
- 13.2 Future 与 CompletableFuture
1、JUC概述
1.1 进程和线程
进程:指在系统中正在运行的一个应用程序,进程是资源分配的最小单位
线程:系统分配处理器时间资源的基本单元,或者说进程之内独立执行的一个单元执行流。线程是程序执行的最小单位
1.2 线程的状态(6个)
在java.lang.Thread 类,该类的内部类 State中共定义了六个状态,分别是:NEW(新建)、RUNNABLE(准备就绪)、BLOCKED(阻塞)、WAITING(等待-不见不散)、TIMED_WAITING(等待-过时不候)、TERMINATED(终结)
1.3 wait和sleep
- sleep是Thread的静态方法;wait是Object的方法,任何对象实例都能调用。
- sleep不会释放锁,它也不需要占用锁;wait会释放锁,但调用它的前提是当前线程占有锁(即代码要在synchronized中)
- 它们都可以被interrupt方法中断
1.4 并发与并行
并发:同一时间间隔内多个线程交替执行,实际上是宏观上并行,微观上串行
并行:同一时刻多个线程正在执行,多核并行
1.5 管程(锁)
保证了同一时刻只有一个进程在管程内活动,即管程内定义的操作在同一时刻只被一个进程调用(由编译器实现)。
1.6 用户线程和守护线程
用户线程:自定义线程,不随主线程结束而结束。主线程结束了,用户线程还在运行,jvm还存活。
守护线程:随主线程结束而结束,比如说垃圾回收线程。只有守护线程时,主线程结束,jvm结束。
2、Lock接口
2.1 复习synchronized(java内置同步锁)
synchronized是Java的关键字,是一种同步锁,能够修饰 一个代码块对象,变量,方法,来控制这个所修饰的,被顺序的访问。
多线程的编程步骤:
第一:创建一个资源类,属性和操作方法
第二:创建多线程,调用类里面的操作方法
使用synchronized实现卖票例子(自动上锁,解锁)
class Ticket{
private int rest = 300;
public synchronized void sale() {
if (rest > 0)
System.out.println(Thread.currentThread().getName() + "卖出一张票,还剩:" + --rest + "张;");
}
}
public class SyncKey {
public static void main(String[] args) {
//创建资源
Ticket ticket = new Ticket();
//抽取的 Runnable 接口
Runnable Ir = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
ticket.sale();
}
}
};
//通过实现Runnable接口创建线程
new Thread(Ir, "A").start();
new Thread(Ir, "B").start();
new Thread(Ir, "C").start();
}
}
2.2 什么是Lock接口
Lock 实现提供比使用 synchronized 方法和语句可以获得的更广泛的锁定操作。 它们允许更灵活的结构化,可能具有完全不同的属性,并且可以支持多个相关联的对象 Condition。当在不同范围内发生锁定和解锁时,必须注意确保在锁定时执行的所有代码由 try-finally 或 try-catch 保护,以确保在必要时释放锁定。
Lock 实现提供了使用 synchronized 方法和语句的附加功能,通过提供非阻塞尝试来获取锁 tryLock(),尝试获取可被中断的锁 lockInterruptibly() ,以及尝试获取可以超时 tryLock(long, TimeUnit)。
2.3 创建线程的多种方式(4种)
- 继承Thread类。由于 Java 是单继承编程语言,继承是十分宝贵的,所以一般不使用这种方法
- 实现Runnable接口。实现Runnable接口是主要创建线程的方法之一。
- 使用Callable接口
- 使用线程池
2.4 使用Lock实现卖票例子
使用 ReentrantLock(Lock的实现类之一)实现卖票例子(手动上锁,解锁)
class LTicket{
private int number = 300;
//创建可重入锁ReentrantLock
private final ReentrantLock lock = new ReentrantLock();
public void sale() {
lock.lock();//上锁
try {
if (number > 0){
System.out.println(Thread.currentThread().getName() + "卖出一张票,还剩:" + --number + "张;");
}
} finally {//由于要防止上锁后出现异常导致无法解锁,必须释放锁,因此放在finally中
lock.unlock();//解锁
}
}
}
public class LSaleTicket {
public static void main(String[] args) {
//创建资源
LTicket ticket = new LTicket();
//抽取的 Runnable 接口
Runnable Ir = () -> {
for (int i = 0; i < 1000; i++) {
ticket.sale();
}
};
//通过实现Runnable接口创建线程
new Thread(Ir, "A").start();
new Thread(Ir, "B").start();
new Thread(Ir, "C").start();
}
}
这里涉及到可重入锁的概念:可重入锁,就是可以重复获取相同的锁,synchronized和ReentrantLock都是可重入的。
2.5 synchronized和Lock两者差异
- synchronized是java内置关键字。Lock不是内置,是一个类,可以实现同步访问且比 synchronized中的方法更加丰富
- synchronized自动释放锁,而lock需手动释放锁(不解锁会出现死锁,需要在 finally 块中释放锁)
- Lock 可以让等待锁的线程响应中断,而等待synchronized锁的线程不能响应中断,只会一直等待
- 通过 Lock 可以知道有没有成功获取锁,而 synchronized 却无法办到
- Lock 可以提高多个线程进行读操作的效率(当多个线程竞争的时候,Lock 性能远远好于synchronized)
3、线程间通信
线程间通信有两种实现方法:
- 关键字
synchronized
与wait()/notify()
这两个方法一起使用可以实现等待/通知模式。 Lock
接口中的 newContition() 方法返回 Condition 对象,Condition 类的await()/signalAll()
也可以实现等待/通知模式。
多线程编程步骤:
1、创建一个资源类,属性和操作方法
2、在资源类操作方法(判断,执行,通知)
3、创建多线程,调用类里面的操作方法
4、防止虚假唤醒
3.1 synchronized 实现案例(Object中的 wait() 和 notifyAll() 方法)
class Share {
int number = 1;
public synchronized void incr() throws InterruptedException {
//判断
if(number!=0){
this.wait();//这里会释放锁,把当前的线程放入对象的等待集合
}
//执行
number++;
System.out.println(Thread.currentThread().getName()+" : "+number);
// 通知
this.notifyAll();//唤醒在等待集合中的线程
}
public synchronized void decr() throws InterruptedException {
//判断
if (number!=1){
this.wait();
}
//执行
number--;
System.out.println(Thread.currentThread().getName()+" : "+number);
//通知
this.notifyAll();
}
}
public class ThreadDemo1 {
public static void main(String[] args) {
Share share = new Share();
new Thread(()->{
for (int i=1;i<=10;i++){
try {
share.incr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"ThreadA").start();
new Thread(()->{
for (int i=1;i<=10;i++){
try {
share.decr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"ThreadB").start();
}
}
3.2 虚假唤醒问题(if改while)
当多个线程都处于等待集合中,一旦收到通知,可以直接操作而不再判断,这叫做【虚假唤醒】问题。 将this.wait();
放在while循环中可以解决该问题。
具体地,同样使用上述3.1案例,现在有四个线程,分别为A,B,C,D,其中A,C线程做+1操作,B,D线程做-1操作,想要的结尾应该是A,C线程输出值为1,B,D线程输出值为0。
class Share {
int number = 0;
public synchronized void incr() throws InterruptedException {
//判断
if(number!=0){
this.wait();//这里会释放锁
}
//执行
number++;
System.out.print(Thread.currentThread().getName()+" : "+number+"-->");
// 通知
this.notifyAll();
}
public synchronized void decr() throws InterruptedException {
//判断
if (number!=1){
this.wait();
}
//执行
number--;
System.out.println(Thread.currentThread().getName()+" : "+number);
//通知
this.notifyAll();
}
}
public class ThreadDemo1 {
public static void main(String[] args) {
Share share = new Share();
new Thread(()->{
for (int i=1;i<=100;i++){
try {
share.incr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"A").start();
new Thread(()->{
for (int i=1;i<=100;i++){
try {
share.decr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"B").start();
new Thread(()->{
for (int i=1;i<=100;i++){
try {
share.incr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"C").start();
new Thread(()->{
for (int i=1;i<=100;i++){
try {
share.decr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"D").start();
}
}
但是根据输出可以发现,存在虚假唤醒的问题:
出现负数的原因是,在上一阶段结束后,B和D线程都处于waiting状态,当线程C执行完发通知后,B先抢到锁,执行-1操作,然后发出通知,接着D抢到锁,再执行-1操作得到-1。
可以发现,这是由于 wait()
方法使线程在哪里睡就在哪里醒,所以接下来B和D执行操作时不会再通过 if 判断,从而导致最后输出的结果和我们预想的不一致。
为了保证线程“醒”了之后再次判断,需要将wait()
方法放入while
循环中。
class Share {
int number = 0;
public synchronized void incr() throws InterruptedException {
//判断
while(number!=0){
this.wait();//这里会释放锁
}
//执行
number++;
System.out.print(Thread.currentThread().getName()+" : "+number+"-->");
// 通知
this.notifyAll();
}
public synchronized void decr() throws InterruptedException {
//判断
while (number!=1){
this.wait();
}
//执行
number--;
System.out.println(Thread.currentThread().getName()+" : "+number);
//通知
this.notifyAll();
}
}
public class ThreadDemo1 {
public static void main(String[] args) {
Share share = new Share();
new Thread(()->{
for (int i=1;i<=100;i++){
try {
share.incr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"A").start();
new Thread(()->{
for (int i=1;i<=100;i++){
try {
share.decr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"B").start();
new Thread(()->{
for (int i=1;i<=100;i++){
try {
share.incr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"C").start();
new Thread(()->{
for (int i=1;i<=100;i++){
try {
share.decr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"D").start();
}
}
3.3 Lock 实现案例(Condition 类中有 await() 和 signalAll() )
在 Lock 接口中,有一个 newCondition() 方法,该方法返回一个新 Condition 绑定到该实例 Lock 实例。
Condition 类中有 await() 和 signalAll() 等方法,和 synchronized 实现案例中的 wait() 和 notifyAll() 方法相同。所以通过 Lock 接口创建一个 Condition 对象,由该对象的方法进行等待和唤醒操作
class Share{
private int number = 0;
//创建Lock
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void incr() throws InterruptedException {
lock.lock();
try {
while(number!=0){
condition.await();
}
number++;
System.out.print(Thread.currentThread().getName()+" : "+number+"-->");
condition.signalAll();
} finally {
lock.unlock();
}
}
public void decr() throws InterruptedException {
lock.lock();
try {
//判断
while (number!=1){
condition.await();
}
//执行
number--;
System.out.println(Thread.currentThread().getName()+" : "+number);
//通知
condition.signalAll();
} finally {
lock.unlock();
}
}
}
public class ThreadDemo2 {
public static void main(String[] args) {
Share share = new Share();
new Thread(()->{
for (int i=1;i<=100;i++){
try {
share.incr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"A").start();
new Thread(()->{
for (int i=1;i<=100;i++){
try {
share.decr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"B").start();
new Thread(()->{
for (int i=1;i<=100;i++){
try {
share.incr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"C").start();
new Thread(()->{
for (int i=1;i<=100;i++){
try {
share.decr();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"D").start();
}
}
4、线程间定制化通信(Condition 类可以进行选择性通知)
案例: 启动三个线程,按照如下要求执行,AA打印5此,BB打印10次,CC打印15次,一共进行10轮
具体思路: 每个线程添加一个标志位,是该标志位则执行操作,并且修改为下一个标志位,通知下一个标志位的线程。
考虑到使用 Condition 类可以进行选择性通知,为提高性能可以直接通知下一个要执行操作的线程。即分别创建三个Condition 对象来发送开锁通知(他们能实现指定唤醒)
class ShareResource {
private int flag = 1;//标志位 1:AA 2:BB 3:CC
private ReentrantLock lock = new ReentrantLock();
// 创建三个Comdition对象,为了定向唤醒相乘
Condition c1 = lock.newCondition();
Condition c2 = lock.newCondition();
Condition c3 = lock.newCondition();
public void print5(int loop) throws InterruptedException {
lock.lock();
try {
while (flag != 1) {
c1.await();
}
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + " :: " + i + ", loop=" + loop);
}
flag=2;//改标志位
c2.signal();//通知
} finally {
lock.unlock();
}
}
public void print10(int loop) throws InterruptedException {
lock.lock();
try {
while (flag != 2) {
c2.await();
}
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + " :: " + i + ", loop=" + loop);
}
flag=3;//改标志位
c3.signal();//通知
} finally {
lock.unlock();
}
}
public void print15(int loop) throws InterruptedException {
lock.lock();
try {
while (flag != 3) {
c3.await();
}
for (int i = 0; i < 15; i++) {
System.out.println(Thread.currentThread().getName() + " :: " + i + ", loop=" + loop);
}
flag=1;//改标志位
c1.signal();//通知
} finally {
lock.unlock();
}
}
}
public class ThreadDemo3 {
public static void main(String[] args) {
ShareResource shareResource = new ShareResource();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
shareResource.print5(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"AA").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
shareResource.print10(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"BB").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
shareResource.print15(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"CC").start();
}
}
进程/线程同步四个原则(单标志法存在的问题)
我们在学习操作系统中的同步可以知道,进程/线程同步有四个原则,都是为了禁止两个进程同时进入临界区。同步机制应该遵循以下原则
- 空闲让进:临界区空闲时,可以允许一个请求进入临界区的进程立即进入临界区
- 忙则等待:当已经有进程进入临界区的时候,其他试图进入临界区的进程必须等待
- 有限等待:对请求访问的进程,应保证能在有限时间内进入临界区
- 让权等待:当进程不能进入临界区的时候,应立即释放处理机,防止进程忙等待
上述案例被采用单标志法。因为该案例设置一个公用整型变量flag,用于指示被允许进入临界区的进程编号。若 flag =1,则允许 AA 进程进入临界区;若 flag =2,则允许 BB 进程进入临界区;若 flag =3,则允许 CC 进程进入临界区。
该算法可确保每次只允许一个进程进入临界区。但两个进程必须交替进入临界区,若某个进程不再进入临界区,则另一个进程也无法进入临界区。比如在线程的run()
方法调用中设置不同的loop次数,在后期会有部分线程不能访问 Share 资源了,违背了"空闲让进"原则,让资源利用不充分。
5、集合的线程安全
5.1 集合线程不安全演示,以 ArrayList 为例
为什么在多线程中会出现不安全?
以 ArrayList 为例,其源码的add方法没有synchronized
关键字,没有使用同步锁互斥,所以在多线程并发时,会出现线程异常。
代码演示:
public class ThreadDemo4 {
public static void main(String[] args) {
List<String> list = new ArrayList<>();
for (int i = 0; i < 30; i++) {
new Thread(()->{
// 向集合中添加内容
list.add(UUID.randomUUID().toString().substring(0,8));
// 从集合中取出内容
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
报异常:
解决该方法主要有三种,即使用这三个类:Vector、Collections、CopyOnWriteArrayList(常用)
5.2 解决方案-Vector增加synchronized关键字
Vector类中的add方法加了synchronized
关键字,因此可以保证线程安全。将List<String> list = new ArrayList<>();
替换为List<String> list1 = new Vector<>();
即可解决线程安全问题。
但是 Vector 用的不多,因为每次对添加的元素上锁,而且使用的是重量级锁synchronized是十分占用资源的,效率是十分低下的。
5.3 解决方案-Collections接口中的 synchronizedList(List list) 方法
Collections 接口中的 synchronizedList(List list)
方法,可以将传入的 List列表对象 转为 支持的同步(线程安全的)列表并返回。将List<String> list = new ArrayList<>();
替换为List<String> list = Collections.synchronizedList(new ArrayList<>());
即可解决线程安全问题。
5.4 解决方案-CopyOnWriteArrayList(Redis持久化也使用了CopyOnWrite思想)
将List<String> list = new ArrayList<>();
替换为List<String> list = new CopyOnWriteArrayList<>();
即可解决线程安全问题。
CopyOnWriteArrayList 的 add() 方法源代码如下:(JDK14)
public boolean add(E e) {
synchronized (lock) {
Object[] es = getArray();
int len = es.length;
es = Arrays.copyOf(es, len + 1);
es[len] = e;
setArray(es);
return true;
}
}
CopyOnWriteArrayList 采用读写分离的思想,读操作不加锁,写操作加锁。
写时复制技术
- 读的时候并发读取旧数据(多个线程操作)
- 写的时候独立,先复制一份比旧数据长 1 的数据出来,在最后添加数据,旧新合并,完成写操作,之后就可以读所有数据(每次加新内容都写到新区域,合并之前旧区域,读取新区域添加的内容)
5.5 HashSet的线程不安全(解决方案-CopyOnWriteArraySet )
HashSet 同时读写时也会出现 ConcurrentModificationException
异常,他的问题和 ArrayList 一样,没有对 add(E e)
方法做同步处理。其解决方法与 CopyOnWriteArrayList 类似,JUC提供了CopyOnWriteArraySet 类,将Set<String> set = new HashSet<>();
替换为Set<String> set = new CopyOnWriteArraySet<>();
即可解决线程安全问题。
演示代码:
public class ThreadDemo4 {
public static void main(String[] args) {
// Set<String> set = new HashSet<>();
Set<String> set = new CopyOnWriteArraySet<>();
for (int i = 0; i < 30; i++) {
new Thread(()->{
// 向集合中添加内容
set.add(UUID.randomUUID().toString().substring(0,8));
// 从集合中取出内容
System.out.println(set);
},String.valueOf(i)).start();
}
}
}
使用Set set = new HashSet<>();会报错:
5.6 HashMap的线程不安全(解决方案-ConcurrentHashMap)
HashMap 同时读写时一样会出现 ConcurrentModificationException
异常。JUC提供了一个叫做 ConcurrentHashMap
的类,来实现 HashMap 的同步。
ConcurrentHashMap可以做到读取数据不加锁,并且其内部的结构可以让其在进行写操作的时候能够将锁的粒度保持地尽量地小,允许多个修改操作并发进行,其关键在于使用了锁分段技术。JDK1.8锁的粒度就是HashEntry(首节点)。
6、多线程锁 (公平锁和非公平锁,死锁,可重锁)
6.1 synchronized 锁的八种情况总结
synchronized 锁的是方法,则是对象锁,同个对象锁的机制要等待,不同对象锁的机制调用同一个不用等待。
synchronized 锁的是static方法,则为class锁(类锁)而不是对象锁。
对于同步方法块,锁是 synchronized 括号里配置对象。
视频中案例分析:
class Phone {
public synchronized void sendSMS() throws Exception {
//停留4秒
TimeUnit.SECONDS.sleep(4);
System.out.println("------sendSMS");
}
public synchronized void sendEmail() throws Exception {
System.out.println("------sendEmail");
}
public void getHello() {
System.out.println("------getHello");
}
}
public class SynchronizedLockTest {
public static void main(String[] args) throws Exception {
Phone phone = new Phone();
Phone phone2 = new Phone();
new Thread(() -> {
try {
phone.sendSMS();
} catch (Exception e) {
e.printStackTrace();
}
}, "A").start();
Thread.sleep(100);
new Thread(() -> {
try {
// phone.sendEmail();
// phone.getHello();
phone2.sendEmail();
} catch (Exception e) {
e.printStackTrace();
}
}, "B").start();
}
}
不同案例输出的不同结果和分析
1 标准访问,先打印短信还是邮件?
------sendSMS
------sendEmail
按线程启动顺序获取锁释放锁
2 停4秒在短信方法内,先打印短信还是邮件
------sendSMS
------sendEmail
此时获取同一个对象锁,第一次获取锁后等待4秒,执行完释放锁之后,第二个线程才能获取同一把对象锁。
3 新增普通的hello方法,是先打短信还是hello
------getHello
------sendSMS
线程A获取到锁,开始等待,因为线程B执行不许要获取同步锁,所以先输出getHello,然后经过四秒,线程A等待结束,执行输出操作并释放锁。
4 现在有两部手机,先打印短信还是邮件
------sendEmail
------sendSMS
线程A和B获取到不同的对象锁,之间没有竞争关系,因此B线程先输出,A现存等待结束之后输出。
5 两个静态同步方法,1部手机,先打印短信还是邮件
------sendSMS
------sendEmail
线程A和B争夺同一把类锁,线程A 先获取锁,因此A等待4秒执行输出释放锁之后,B才获取到锁并执行输出。
6 两个静态同步方法,2部手机,先打印短信还是邮件
------sendSMS
------sendEmail
线程A和B通过不同对象争夺同一把类锁,线程A 先获取锁,因此A等待4秒执行输出释放锁之后,B才获取到锁并执行输出。
7 1个静态同步方法,1个普通同步方法,1部手机,先打印短信还是邮件
------sendEmail
------sendSMS
线程A获取类锁,执行等待,期间线程B获取对象锁,执行输出并释放对象锁,线程A等待结束执行输出释放类锁。(因为获取的锁对象不同,不存在竞争,按照时间顺序输出)
8 1个静态同步方法,1个普通同步方法,2部手机,先打印短信还是邮件
------sendEmail
------sendSMS
这里类似第7种情况,只是通过不同对象获取不同的锁,不存在竞争。
6.2 公平锁和非公平锁
公平锁 :效率相对低 ,但是cpu 的利用高了
非公平锁:效率高,但是线程容易饿死(所有的工作,由一个线程完成)
用法: 在创建可重入锁ReentrantLock时,调用有参构造器,传入参数true设置为公平锁
private final ReentrantLock lock = new ReentrantLock(true);
6.3 可重入锁
可重入锁就是某个线程已经获得某个锁,可以重复获取同一个锁而不死锁。可重入锁也叫递归锁。
synchronized和lock都是可重入锁,sychronized是隐式锁,不用手工上锁与解锁,而lock为显式锁,需要手工上锁与解锁。
synchronized的示例代码
public class WhatReentrantSynchronized {
// 创建一个锁对象
static Object mylock = new Object();
public static void main(String[] args) {
new Thread(()->{
// 创建第一个锁
synchronized (mylock){
System.out.println("这是第一层锁");
synchronized (mylock){
System.out.println("这是第二层锁");
}
}
}).start();
}
}
ReentrantLock的示例代码
/**
* lock和unlock的数量必须一致,否则会出现死锁
* */
public class WhatReentrantLock {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
new Thread(()->{
// 上锁
lock.lock();
try {
System.out.println("这是第一层锁");
// 再次上锁
lock.lock();
try{
System.out.println("这是第二层锁");
}finally {
lock.unlock();
}
}finally {
lock.unlock();
}
}).start();
}
}
6.4 死锁(定义,死锁产生的四个必要条件)
1、什么是死锁:两个或以上的进程因为争夺资源而造成互相等待资源的现象称为死锁。如果没有外力干涉,他们无法继续执行。
2、产生死锁的原因:
系统资源不足
系统资源分配不当
进程运行顺序不当
3、死锁产生的四个必要条件:线程互斥,不可抢占,请求保持,循环等待
互斥使用:当资源被一个线程使用或者占用时,别的线程不能使用该资源
不可抢占:获取资源的一方,不能从正在使用资源的一方抢占掠夺资源,资源只能被使用者主动释放
请求保持:资源请求者在请求别的资源时,同时保持对已有资源的占有
循环等待:即p1占有p2的资源,p2占有p3的资源,p3占有p1的资源,这样形成了一个等待环路
4、死锁代码案例
public class DeadLock {
static Object a = new Object();
static Object b = new Object();
public static void main(String[] args) {
new Thread(() -> {
synchronized (a) {
System.out.println("外层,已经获取a,试图获取b");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (b) {
System.out.println("内层");
}
}
}, "A").start();
new Thread(() -> {
synchronized (b) {
System.out.println("外层,已经获取a,试图获取b");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (a) {
System.out.println("内层");
}
}
}, "B").start();
}
}
5、如何验证死锁?
jps -l
类似于linux中的 ps -ef查看进程号和状态
jstack
自带的堆栈跟踪工具确认死锁。
7、Callable接口
创建线程的多种方式:
继承Thread类
实现Runnable接口
Callable接口
线程池
7.1 Callable接口创建线程
使用 Runnable 创建的线程缺少的一项功能,当线程终止时(即 run()
完成时),我们无法使线程返回结果。为了支持此功能,Java 中提供了 Callable 接口,即线程终止(call()
执行完成时)后返回结果。
两个接口对比 | Runnable接口 | Callable 接口 |
---|---|---|
返回值 | 没有 | 有 |
抛出异常 | 没有 | 有 |
实现方法名称 | run() | call() |
因为Thread的构造函数中没有Callable接口的参数设置,不可以直接替换,只能用FutureTask类来实现线程创建(FutureTask类既能传入Callable构造,又是Runnable接口的实现类)
class MyThread1 implements Runnable{
@Override
public void run() {
}
}
class MyThread2 implements Callable{
@Override
public Integer call() throws Exception {
return 200;
}
}
public class Demo1 {
public static void main(String[] args) {
// Runnable接口创建线程
new Thread(new MyThread1(),"AA").start();
// Callable接口创建线程
new Thread(new FutureTask<>(new MyThread2()),"BB").start();
}
}
7.2 FutureTask(适配器模式)
代码演示:
class MyThread2 implements Callable{
@Override
public Integer call() throws Exception {
System.out.println(Thread.currentThread().getName()+" come in callable");
return 200;
}
}
public class Demo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask futureTask1 = new FutureTask<>(new MyThread2());
// lambda表达式简化Callable实现类的call()方法
FutureTask<Integer> futureTask2 = new FutureTask<>(() -> {
System.out.println(Thread.currentThread().getName()+" come in callable");
return 1024;
});
//创建一个线程
new Thread(futureTask2,"lucy").start();
new Thread(futureTask1,"mary").start();
// while(!futureTask2.isDone()){
// System.out.println("waiting...");
// }
//调用FutureTask的get方法获取线程运行结果
System.out.println(futureTask2.get());
System.out.println(futureTask2.get());
System.out.println(futureTask1.get());
System.out.println(Thread.currentThread().getName()+" is over");
}
}
8、JUC强大辅助类
8.1 减少计数CountDownLatch
CountDownLatch 类可以设置一个计数器,然后通过 countDown 方法来进行减 1 的操作,使用 await 方法等待计数器不大于 0,然后继续执行 await 方法之后的语句。具体步骤可以演化为定义一个类,减1操作,并等待到0,为0执行结果。
CountDownLatch 常用方法说明
1、CountDownLatch(int count); //构造方法,创建一个值为count 的计数器。
2、await();//阻塞当前线程,将当前线程加入阻塞队列。
3、countDown();//对计数器进行递减1操作,当计数器递减至0时,当前线程会去唤醒阻塞队列里的所有线程。
【具体的案例加深理解】
6个同学陆续离开教室之后,班长才能锁门。
如果不加 CountDownLatch类,会出现线程混乱执行,同学还未离开教室班长就已经锁门了
不使用CountDownLatch,导致线程混乱:
public class CountDownLatchDemo {
public static void main(String[] args) {
for (int i = 0; i < 6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"同学离开教室");
},String.valueOf(i)).start();
}
System.out.println(Thread.currentThread().getName()+"班长锁门");
}
}
/*
输出为
main班长锁门
0同学离开教室
4同学离开教室
2同学离开教室
5同学离开教室
1同学离开教室
3同学离开教室
*/
通过CountDownLatch计数,保证主线程输出语句最后执行:
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"同学离开教室");
countDownLatch.countDown();
},String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+"班长锁门");
}
}
/*
输出结果
2同学离开教室
1同学离开教室
5同学离开教室
4同学离开教室
3同学离开教室
0同学离开教室
main班长锁门
*/
8.2 循环栅栏CyclicBarrier
CyclicBarrier是 允许一组线程互相 等待,直到到达某个公共屏障点,在设计一组固定大小的线程的程序中,这些线程必须互相等待,因为barrier在释放等待线程后可以重用,所以称为循环barrier
CyclicBarrier常用方法说明
1、CyclicBarrier(int parties,Runnable barrierAction) //构造方法,创建一个值为parties的屏障。
2、await();//当一个线程到了栅栏这里了,那么就将计数器减 1
public class CyclicBarrierDemo {
public static final int NUMBER=7;
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER,()->{
System.out.println("集齐七颗龙珠就可以召唤神龙");
});
for (int i = 0; i <7; i++) {
new Thread(()->{
try {
System.out.println("第 "+Thread.currentThread().getName()+" 颗龙珠被收集到");
//等待
cyclicBarrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
},String.valueOf(i)).start();
}
}
}
/*
输出结果:
第 1 颗龙珠被收集到
第 6 颗龙珠被收集到
第 4 颗龙珠被收集到
第 0 颗龙珠被收集到
第 2 颗龙珠被收集到
第 3 颗龙珠被收集到
第 5 颗龙珠被收集到
集齐七颗龙珠就可以召唤神龙
*/
8.3 信号灯 Semaphore
一个计数信号量,从概念上将,信号量维护了一个许可集,如有必要,在许可可用前会阻塞每一个acquire(),然后在获取该许可。每个release()添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore只对可用许可的号码进行计数,并采取相应的行动
Semaphore常用方法说明
1、Semaphore(int permits); // 创建具有给定的许可数和非公平的公平设置的Semapore
2、acquire()从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断
3、release()释放一个许可,将其返回给信号量
【具体案例】
6辆汽车,停3个车位
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 6; i++) {
new Thread(()->{
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"车抢到车位");
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
System.out.println("--"+Thread.currentThread().getName()+"车离开车位");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
semaphore.release();
}
},String.valueOf(i)).start();
}
}
}
/*
输出结果:
0车抢到车位
2车抢到车位
1车抢到车位
--2车离开车位
3车抢到车位
--3车离开车位
4车抢到车位
--0车离开车位
5车抢到车位
--5车离开车位
--1车离开车位
--4车离开车位
*/
9、读写锁
9.1 悲观锁和乐观锁
悲观锁:顾名思义,它是干什么都很悲观,所以在操作的时候,每次都先上锁,使用时解锁
乐观锁:它很乐观,多线程,并不上锁,但是会发生线程安全问题,通过比较版本号来同步
9.2 表锁|行锁|读锁|写锁
表锁:整个表操作,不会发生死锁
行锁:每个表中的单独一行进行加锁,会发生死锁
读锁:共享锁(可以有多个人读),会发生死锁
写锁:独占锁(只能有一个人写),会发生死锁
9.3 读写锁概述
读写锁:一个资源可以被多个读线程访问,也可以被一个写线程访问,但不能同时存在读写线程,读写互斥,读读共享
读写锁 ReentrantReadWriteLock
读锁为 ReentrantReadWriteLock.ReadLock,readLock()
方法
写锁为 ReentrantReadWriteLock.WriteLock,writeLock()
方法
创建读写锁对象 private ReadWriteLock rwLock = new ReentrantReadWriteLock();
写锁 加锁 rwLock.writeLock().lock();
,解锁为rwLock.writeLock().unlock();
读锁 加锁 rwLock.readLock().lock();
,解锁为rwLock.readLock().unlock();
【模仿数据读写案例】
在不加读写锁的情况下:
class MyCache{
// 需要模仿从Map中取对象,所以创建一个map对象
private volatile Map<String, Object> map = new HashMap<>();
// 放数据
public void put(String key, Object value) {
try {
System.out.println(Thread.currentThread().getName()+"正在写操作"+key);
// 暂停一会
TimeUnit.MICROSECONDS.sleep(300);
// 放数据
map.put(key, value);
System.out.println(Thread.currentThread().getName()+"写完了"+key);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 取数据
public void get(String key) {
try {
System.out.println(Thread.currentThread().getName()+"正在读操作"+key);
// 暂停一会
TimeUnit.MICROSECONDS.sleep(300);
// 放数据
map.get(key);
System.out.println(Thread.currentThread().getName()+"取完了"+key);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 0; i < 5; i++) {
final int num = i;
new Thread(()->{
myCache.put(num+"",num+"");
},String.valueOf(i)).start();
}
for (int i = 1; i <= 6; i++) {
final int num = i;
new Thread(()->{
myCache.get(num+"");
},String.valueOf(i)).start();
}
}
}
//输出有问题,还没写完就已经在读了
很显然,线程在写操作的时候,有线程在读操作,这可能会出现脏数据
加上读写锁:
class MyCache{
// 需要模仿从Map中取对象,所以创建一个map对象
private volatile Map<String, Object> map = new HashMap<>();
//创建读写锁对象
private ReadWriteLock rwLock = new ReentrantReadWriteLock();
// 放数据
public void put(String key, Object value) {
//添加写锁
rwLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"正在写操作"+key);
// 暂停一会
TimeUnit.MICROSECONDS.sleep(300);
// 放数据
map.put(key, value);
System.out.println(Thread.currentThread().getName()+"写完了"+key);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//释放写锁
rwLock.writeLock().unlock();
}
}
// 取数据
public void get(String key) {
//添加读锁
rwLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"正在读操作"+key);
// 暂停一会
TimeUnit.MICROSECONDS.sleep(300);
// 放数据
map.get(key);
System.out.println(Thread.currentThread().getName()+"读完了"+key);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//释放读锁
rwLock.readLock().unlock();
}
}
}
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 0; i < 5; i++) {
final int num = i;
new Thread(()->{
myCache.put(num+"",num+"");
},String.valueOf(i)).start();
}
for (int i = 1; i <= 6; i++) {
final int num = i;
new Thread(()->{
myCache.get(num+"");
},String.valueOf(i)).start();
}
}
}
9.4 读写锁的演变历程
无锁 | 独占锁 | 读写锁 |
---|---|---|
多线程抢夺资源,乱 | synchronized和ReentranrLock,只能由一个线程操作 | ReentrantReadWriteLock,读读共享,读写互斥,写写互斥。存在锁饥饿问题。 |
9.5 锁降级的过程和必要性
写锁可以降级为读锁,读锁不能升级为写锁。
锁降级的过程:获取写锁->获取读锁->释放写锁->释放读锁
//演示读写锁降级
public class Demo1 {
public static void main(String[] args) {
//可重入读写锁对象
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();//读锁
ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();//写锁
//锁降级
//1 获取写锁
writeLock.lock();
System.out.println("---write");
//2 获取读锁
readLock.lock();
System.out.println("---read");
//3 释放写锁
writeLock.unlock();
//4 释放读锁
readLock.unlock();
}
}
我们在使用读写锁时遵守下面的获取规则(即不同线程间读读共享,读写互斥,写写互斥)
1.如果有一个线程已经占用了读锁,则此时其他线程如果要申请读锁,可以申请成功。
2.如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁,因为读写不能同时操作。
3.如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,都必须等待之前的线程释放写锁,同样也因为读写不能同时,并且两个线程不应该同时写
主要是为了保证数据的可见性,如果当前线程写完数据直接释放写锁,假设此刻另一个线程(记作线程T)获取了写锁并修改了数据,那么当前线程无法读取线程T修改前的数据。如果当前线程获取读锁,即遵循锁降级的步骤,则线程T将会被阻塞,直到当前线程读取数据并释放读锁之后,线程T才能获取写锁进行数据更新。
这时因为可能存在一个事务线程不希望自己的操作被别的线程中断,而这个事务操作可能分成多部分操作更新不同的数据(或表)甚至非常耗时。如果长时间用写锁独占,显然对于某些高响应的应用是不允许的,所以在完成部分写操作后,退而使用读锁降级,来允许响应其他进程的读操作。只有当全部事务完成后才真正释放锁。但是由于不能锁升级,读之后再写需要读完之后释放读锁,重新获取写锁。
10、阻塞队列
10.1 阻塞队列概述
阻塞队列是共享队列(多线程操作),一端输入,一端输出,不能无限放队列,满了之后就会进入阻塞,取出也同理。
- 当队列是空的,从队列中获取元素的操作将会被阻塞
- 当队列是满的,从队列中添加元素的操作将会被阻塞
- 试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素
- 试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增
10.2 阻塞队列架构
BlockingQueue是一个接口,它的父接口有: Collection, Iterable, Queue;
它的子接口有:BlockingDeque, TransferQueue;
它的实现类有:ArrayBlockingQueue, DelayQueue , LinkedBlockingDeque, LinkedBlockingQueue, LinkedTransferQueue, PriorityBlockingQueue, SynchronousQueue
10.3 阻塞队列分类
- ArrayBlockingQueue(常用):由数组结构组成的有界阻塞队列。ArrayBlockingQueue 在生产者放入数据和消费者获取数据,都是共用同一个锁对象,无法并行。
- LinkedBlockingQueue(常用):由链表结构组成的有界(但大小默认值为Integer.MAX_VALUE)阻塞队列。LinkedBlockingQueue对于生产者端和消费者端分别采用了独立的锁来控制数据同步,能够高效的处理并发数据。这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
- DelayQueue:使用优先级队列实现的延迟无界阻塞队列。 DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
- PriorityBlockingQueue:基于优先级的阻塞队列,支持优先级排序的无界阻塞队列。不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者
- **SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列。**一种无缓冲的等待队列,相对于有缓冲的 BlockingQueue 来说,少了一个中间经销商的环节(缓冲区)。默认非公平模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。
- LinkedTransferQueue:由链表结构组成的无界阻塞 TransferQueue 队列。
- LinkedBlockingDeque:由链表结构组成的双向阻塞队列。
10.4 阻塞队列核心方法
11、线程池
11.1 线程池概述
连接池是创建和管理一个连接的缓冲池的技术,这些连接准备好被任何需要它们的线程使用
线程池(英语:thread pool)一种线程使用模式。 线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度
线程池的特点: 线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超过数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
线程池的优势:
1、降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
2、提高响应速度: 当任务到达时,任务可以不需要等待线程创建就能立即执行。
3、提高线程的可管理性: 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
11.2 线程池类图架构
Java 中的线程池是通过 Executor
框架实现的,该框架中用到了 Executor,ExecutorService,ThreadPoolExecutor 这几个类, Executors 工具类可以协助创建线程池。
11.3 线程池使用方式
Executors.newFixedThreadPool(int)
一池N线程:创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待,在显示关闭之前线程一致存在。
Executors.newSingleThreadExecutor()
一池一线程:创建单个线程数的线程池,它可以保证先进先出的执行顺序;
Executors.newCachedThreadPool()
一池可扩容根据需求创建线程:创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程;
执行线程:execute()
参数为Runnable接口类,可以通过设置lambda
关闭线程:shutdown()
【具体案例代码案例】Executors工具类创建,阿里巴巴规范不推荐使用
public class ThreadPoolDemo {
public static void main(String[] args) {
// 一池五线程
// ExecutorService ThreadPool = Executors.newFixedThreadPool(5);
// 一池单线程
// ExecutorService ThreadPool = Executors.newSingleThreadExecutor();
//可扩容线程池
ExecutorService ThreadPool = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 20; i++) {
// 执行
ThreadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"正在办理业务");
});
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
// 关闭线程池
ThreadPool.shutdown();
}
}
}
11.4 线程池底层原则
上面三种方式创建线程池的类源代码都创建了 ThreadPoolExecutor
对象,该类构造方法涉及七个参数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
11.5 线程池的七个参数
int corePoolSize:常驻线程数量(核心)
int maximumPoolSize:最大线程数量
long keepAliveTime:当线程数大于corePoolSize核心时,线程池中空闲线程等待时间
TimeUnit unit:线程存活时间单位
BlockingQueue workQueue:在执行任务之前用于保存任务的阻塞队列(排队的任务放入)
ThreadFactory threadFactory:线程工厂,用于创建线程
RejectedExecutionHandler handler:拒绝测试(线程满了)
11.6 线程池底层工作流程
具体工作流程是:
在创建ThreadPoolExecutor
对象的时候不会创建线程,执行execute()
才会创建线程
请求先到常驻线程(核心),满了之后再到阻塞队列进行等待,阻塞队列满了之后,在往外扩容线程,扩容线程不能大于最大线程数。大于最大线程数和阻塞队列之和后,会执行拒绝策略。
四种基本拒绝策略:
- 抛异常-AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行
- 谁调用找谁-CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量
- 抛弃最久执行当前-DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中,尝试再次提交当前任务
- 不理不问-DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抱出异常。如果允许任务丢失,这是最好的一种策略
11.7 自定义线程池
Executors 返回线程池对象的弊端:
FixedThreadPool
和SingleThreadExecutor
:使用的是无界的 LinkedBlockingQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。CachedThreadPool
:使用的是同步队列 SynchronousQueue, 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。ScheduledThreadPool
和SingleThreadScheduledExecutor
: 使用的无界的延迟阻塞队列DelayedWorkQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
调用ThreadPoolExecutor类,自定义参数:
public class ThreadPoolDemo2{
public static void main(String[] args) {
// 组定义线程池
ExecutorService threadPool = new ThreadPoolExecutor(
// 常驻线程数量(核心)2个
2,
// 最大线程数量5个
5,
// 线程存活时间:2秒
2L,
TimeUnit.SECONDS,
// 阻塞队列
new ArrayBlockingQueue<>(3),
// 默认线程工厂
Executors.defaultThreadFactory(),
// 拒绝策略。抛出异常
new ThreadPoolExecutor.AbortPolicy()
);
try{
for (int i = 1; i <= 8; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" 办理业务");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
// 关闭线程池
threadPool.shutdown();
}
}
}
12、Fork与Join分支
Fork/Join框架需要了解两个概念:分而治之,工作窃取算法。
1、分而治之
Fork就是把一个大任务切分为若干子任务并行的执行。
Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。
2、工作窃取算法
工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。于是干完活的线程就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
【案例】使用 Fork/Join 框架计算0~100的和
- 设置任务分割的大小
- 继承RecursiveAction或者RecursiveTask 重写其compute方法
- 在compute方法里首先判断当前任务是否足够小, 如果如果小就执行, 如果大了就再次初始化任务去执行fork去分割
- 累加返回结果,注意这一步骤是你继承了RecursiveTask才有的, 如果你继承的是RecursiveAction是没有返回值的
- 在主线程里使用Future对象去具体的运行ForkJoin的
submit()
方法, 得到的结果使用Future对象的get就可以获得
class MyTask extends RecursiveTask<Integer>{
//拆分时差值不超过10
private static final Integer VALUE=10;
private int begin;
private int end;
private int result;
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
//拆分和合并过程
@Override
protected Integer compute() {
// 判断
if((end-begin)<=VALUE){
for (int i = begin; i <= end; i++) {
result+=i;
}
}else{
//进一步拆分
int mid=(begin+end)/2;
//拆分
MyTask myTask1 = new MyTask(begin, mid);
MyTask myTask2 = new MyTask(mid+1, end);
myTask1.fork();
myTask2.fork();
//合并
result = myTask1.join()+myTask2.join();
}
return result;
}
}
public class ForkJoinDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyTask myTask = new MyTask(0,100);
//创建分支合并池对象
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);
//获取最终合并结果
Integer res = forkJoinTask.get();
System.out.println(res);
//关闭分支合并池对象
forkJoinPool.shutdown();
}
}
13、异步回调
同步:指等待资源(阻塞)
异步:指设立哨兵,资源空闲通知线程,否则该线程去做其他事情(非阻塞)
13.1 CompletableFuture
CompletableFuture
在 Java 里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。
CompletableFuture 实现了 Future, CompletionStage
接口,实现了 Future接口就可以兼容现在有线程池框架,而 CompletionStage 接口才是异步编程的接口抽象,里面定义多种异步方法,其中 异步调用没有返回值方法runAsync
,异步调用有返回值方法supplyAsync
。
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 异步调用,没返回值
CompletableFuture<Void> future1 = CompletableFuture.runAsync(()->{
System.out.println(Thread.currentThread().getName()+"future1");
});
future1.get();
// 异步调用,有返回值
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName()+"future2");
//模拟异常
int i=1/0;
return 1024;
});
future2.whenComplete((t,u)->{
System.out.println("t="+t);//t是返回值
System.out.println("u="+u);//u是异常
}).get();
}
}
输出为:
ForkJoinPool.commonPool-worker-3future1
ForkJoinPool.commonPool-worker-3future2
t=null
u=java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
13.2 Future 与 CompletableFuture
Futrue
在 Java 里面只是一个泛型接口,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个 Futrue。主要包括下面这 4 个功能:取消任务;判断任务是否被取消;判断任务是否已经执行完成;获取任务执行结果。Future
在实际使用过程中存在一些局限性比如不支持异步任务的编排组合、获取计算结果的 get() 方法为阻塞调用(同步)。
Java 8 引入CompletableFuture
类可以解决Future 的这些缺陷。CompletableFuture 除了提供了更为好用和强大的 Future 特性之外,还提供了函数式编程、异步任务编排组合(可以将多个异步任务串联起来,组成一个完整的链式调用)等能力。