文章目录
- 什么是进程/任务(Process/Task)
- 进程控制块抽象(PCB Process Control Block)
- PID(进程的 id /标识符)
- 内存指针
- 文件描述符表
- 状态
- 优先级
- 上下文
- 记账信息
- 线程(Thread)
- 进程和线程的区别
- 线程的优点:
- 多线程代码
- 代码示例(继承Thread类的方式)
- sleep(休眠当前线程)
- start(启动⼀个线程)
- 创建线程的其他方式
- 通过实现 Runnable 接口创建线程
- 改动前面的写法,使用匿名内部类来实现
- 使用lambda表达式
- Thread提供的一些属性和方法
- 构造方法
- 属性
- 终止一个线程
- 自己实现的控制线程结束的例子
- 使用Thread提供的 interrupt方法和 isInterrupted方法
- (join)线程的等待
- 线程的状态
- NEW
- TERMINATED
- RUNNABLE
- WAITING :死等进入的阻塞
- TIMED_WAITING : 带有超时时间的等进入的阻塞
- BLOCKED : 进行锁竞争的时候产生的阻塞
- 线程安全问题
- 解决上述问题(synchronized 关键字)
- synchronized 的特性
- 互斥
- 可重入
- 解决问题的代码
- 死锁
- 内存可见性引起的线程安全问题
- 线程的等待通知机制(wait 和 notify)
- 单例模式
- 饿汉模式
- 懒汉模式
- 考虑是否线程安全
- 指令重排序
- 阻塞队列
- 线程池
- 标准库线程池(ThreadPoolExecutor)
- 代码案例
- 实现一个简单的线程池
- 定时器
- 实现简单的定时器
什么是进程/任务(Process/Task)
一个程序运行起来,在操作系统中,就会出现一个对应的进程。也就是说,进程就是一个跑起来的应用程序
我们可以在任务管理器中就把当前运行的所有进程都列出来
除了自己运行的程序是进程外,还有一些系统自带的程序也是进程
右边显示的那些CPU,内存,磁盘…就是当前进程占用了多少对应的系统资源。要想让一个程序运行,就必须给这个进程分配系统资源,包括不限于CPU,内存,硬盘,网络带宽,显卡…
所以在操作系统内部,进程又是操作系统进行资源分配的基本单位。
而进程多了就需要进行管理,做法则是先描述,再组织
- 描述:通过一些 结构体/类 把一个进程的核心信息抽象提取出来并进行表示
- 组织:通过数据结构把多个这样的 结构体/类 的对象串起来,方便进一步增删查改
进程控制块抽象(PCB Process Control Block)
在操作系统中,通常使用 PCB 这样的结构体来描述进程(不同的操作系统实际中的名字是不同的,比如 Linux 的 PCB 实际名字是 task_struct)
PCB 中就需要包含一些进程的核心信息
// 以下代码是 Java 代码的伪码形式,重在说明,⽆法直接运⾏
class PCB {
// 进程的唯⼀标识 —— pid;
// 进程关联的程序信息,例如哪个程序,加载到内存中的区域等
// 分配给该资源使⽤的各个资源
// 进度调度信息
}
这样,每⼀个 PCB 对象,就代表着⼀个实实在在运行着的程序,也就是进程。
操作系统再通过数据结构,例如线性表、搜索树等将 PCB 对象组织起来,方便进行增删查改的操作。
- 任务管理器中的进程列表就是在 组织PCB的数据结构 那里搜索,获取并显示出对应的信息
- 创建新的进程(双击某个程序运行): 创建出一个对应的新的PCB, 并且添加到上述数据结构中
- 销毁某个进程(某个程序退出) 就是把数据结构上对应的 PCB节点删除掉
PCB 这个结构体中包含了很多很多的属性,本文只讨论其中几个
PID(进程的 id /标识符)
在任务管理器进程界面 在名称,状态这一行右键,把 PID勾选上,这样就可以看到进程的 PID了
同一个机器,同一时刻,进程ID 一定是不同的
内存指针
是一组内存指针,不只有一个。
进程运行时,需要消耗一定的硬件资源,内存就是一个关键的资源
程序在运行的时候,数据就会从硬盘加载到内存中
这组指针就是告诉操作系统: 该进程要运行的指令和依赖的数据都在内存哪里
文件描述符表
一个进程运行的时候会操作一些文件,就会通过一个"顺序表"这样的数据结构,记录下当前这个进程打开了哪些文件
接下来介绍的属性更抽象,也更重要,用来支撑 进程 调度
在任务管理器中可以看到,系统中包含了很多进程,每个进程都需要被执行。而执行就需要占用CPU资源去CPU上执行,虽然CPU有多个核心,每个核心干自己的活,互相之间不会影响,但进程的数量是远远多于CPU的核心数量的。操作系统会按照 并行 + 并发的方式运行所有的进程
- 并行执行:一个核心,同一时刻只能执行一个进程,假定有16个核心,同一时刻,同时运行16个进程
- 并发执行(分时复用):一个核心,不同时刻可以执行不同进程。CPU把总的执行时间,分成若干个小的部分,每个部分执行不同的进程,每个部分称为"时间片"。由于时间片较短,CPU切换进程的速度极快,人感知不到,在人的角度来看,就是若干个进程在"同时执行"
而并发执行(在日常中会把并行和并发合称为并发)需要进程调度来负责
PCB 中提供了几个属性,支持 进程调度
状态
随时能执行称为 进程处于"就绪状态"
不能立刻执行称为 进程处于"阻塞状态",往往进程在等待 IO 的时候就会进入阻塞状态,比如C程序里面有scanf , 等待输入
优先级
给不同进程安排的不一定是完全公平的,分配的时间和资源都会存在倾斜
上下文
有时候,一次"时间片"的时间没完成全部任务,下次要能够继续,就需要把这次执行结果保存好,以备下次继续
记账信息
因为有的进程分配的时间多,有的进程时间分配少
此时就需要统计每个进程分配的信息了
操作系统也要避免某个进程一直吃不到CPU资源
线程(Thread)
可以看看此文
为啥要有线程:
当前的 CPU 都是多核心 CPU, 可以通过一些特定的编程技巧,把要完成的任务拆分成多个部分,并且让他们在不同的 CPU 上执行。这种方式称为"并发编程"(并行 + 并发)
- “并发编程” 成为 “刚需”
-
单核 CPU 的发展遇到了瓶颈. 要想提高算力, 就需要多核 CPU. 而并发编程能更充分利用多核 CPU资源.
-
有些任务场景需要 “等待 IO”, 为了让等待 IO 的时间能够去做⼀些其他的工作, 也需要用到并发编程.
- 虽然多进程也能实现 并发编程, 但是线程比进程更轻量.
- 创建线程比创建进程更快.
- 销毁线程比销毁进程更快.
- 调度线程比调度进程更快.
如果是把任务拆分成一个个进程,就有新的麻烦
一个服务器很多时候要同时能够给多个客户端提供服务,每一个客户端连上服务器,服务器就会创建一个进程给客户端提供服务,对应的客户端断开了,服务器再把进程给释放掉
如果这个服务器,频繁地与客户端建立/断开连接,服务器也就需要频繁地创建/销毁进程
引入线程,就是为了解决上述"进程"创建/销毁 开销比较大的问题
线程,也称为"轻量级进程",可以理解成"进程的一部分"
⼀个线程就是⼀个 “执行流”. 每个线程之间都可以按照顺序执行自己的代码. 多个线程之间 “同时” 执行着多份代码
进程和线程的区别
- 进程是包含线程的. 每个进程至少有⼀个线程存在,即主线程。
- 进程和进程之间不共享内存空间. 同⼀个进程的线程之间共享同⼀个内存空间.
- 进程是系统分配资源的最小单位,线程是系统调度执行的最小单位。
- ⼀个进程挂了⼀般不会影响到其他进程. 但是⼀个线程挂了, 可能把同进程内的其他线程⼀起带走(整个进程崩溃).
- 由于同⼀进程的各线程间共享内存和文件资源,可以不通过内核进行直接通信
- 线程的创建、切换及终止效率更高。
事实上,更严格的说,在多线程编程中,一个 PCB其实是描述 一个线程的:
1.PID (每个线程都不一样)
2.内存指针 (在同一个进程的线程这个是一样的)
3.文件描述符表 (在同一个进程的线程这个是一样的)
4.状态,上下文,优先级,记账信息 (每个线程都不一样)
5.同一个进程的 tgid 是同一个,用来区分是不是在同一个进程中
同一个进程中的若干线程之间,是共用相同的内存资源和文件资源的
同一个Java程序中,线程1 new个对象,线程2 是可以访问到的; 线程1 打开一个文件,线程2 也是可以直接使用的。但是每个线程都是独立在CPU上调度执行的
进程拥有一个完整的资源平台,而线程只独享必不可少的资源,如寄存器和栈
上面提到的"进程调度",在多线程编程中,准确的说其实是"线程调度"。
为什么说线程比进程更轻量?为什么说线程创建/销毁的开销比进程小?
核心就在于,创建进程,可能要包含多个线程,这个过程中涉及到完整的资源分配/释放。创建线程,相当于资源已经有了,省去了资源分配/释放的步骤了。同一个进程的线程是共用资源的,只有创建第一个线程的时候(也就是创建进程的时候),去进行资源的申请操作,后续就没有申请资源的过程了
延续刚刚谈到的一个服务器给多个客户端提供服务这个例子
刚才是每个客户端连接成功,服务器就给他分配一个进程处理
引入线程了,就可以只给每个客户端分配一个线程来处理,起到优化的效果
但是如果一个线程抛出异常,并且没有很好的捕获处理好,就会使整个进程退出,其他线程也就无了.。
线程的优点:
- 创建⼀个新线程的开销要比创建⼀个新进程小得多
- 与进程之间的切换相比,线程之间的切换需要操作系统做的工作要少很多
- 线程占用的资源要比进程少很多
- 能充分利用多处理器的可并行数量
- 在等待慢速I/O操作结束的同时,程序可执行其他的计算任务
- 计算密集型应用,为了能在多处理器系统上运行,将计算分解到多个线程中实现
- I/O密集型应用,为了提高性能,将I/O操作重叠。线程可以同时等待不同的I/O操作。
多线程代码
线程是操作系统中的概念. 操作系统内核实现了线程这样的机制, 并且给用户层提供了⼀些 API 供用户使用(例如 Linux 的 pthread 库)
Java 标准库中 Thread 类可以视为是对操作系统提供的 API 进行进⼀步的抽象和封装.
代码示例(继承Thread类的方式)
先演示一个简单的线程代码:
class MyThread extends Thread {
@Override// 这个表示重写了父类的方法,提醒编译器进行更严格的检查
//这个run方法就是描述了线程具体要做什么
public void run() {
System.out.println("hello Thread");
}
}
public class Test {
public static void main(String[] args) {
Thread t = new MyThread();//创建线程实例
t.start();//启动新的线程
}
}
上述代码,其实是有 2个线程
- t 线程
- main方法所在的线程(主线程),也就是进程启动的时候,JVM自己创建的线程
上述代码不够明显,我们修改一下再来看效果
class MyThread extends Thread {
@Override// 这个表示重写了父类的方法,提醒编译器进行更严格的检查
//这个run方法就是描述了线程具体要做什么
public void run() {
while(true) {
System.out.println("Thread");
}
}
}
public class Test {
public static void main(String[] args) {
Thread t = new MyThread();//创建线程实例
t.start();//启动新的线程
while (true) {
System.out.println("main");
}
}
}
输出结果是循环交替打印 Thread 和 main
此时,t线程 和 主线程就是在并发执行了(并发编程包括并行和并发的方式, 这两个线程是并行还是并发,我们不知道)
我们也可以通过 Java提供的工具,更清楚的看到代码中的线程
JDK中包含了 jconsole工具
在JDK目录下的bin中有很多.exe程序,找到 jconsole运行
记得先运行上述的多线程代码,这样jconsole才会显示
有的电脑打开jconsole之后一个进程都没有,可以尝试使用"管理员权限"运行
选择不安全的连接
在概览那一行标签页中选择线程,就能看到当前Java进程里面包含的所有的线程情况
sleep(休眠当前线程)
咱们写的这个代码在疯狂消耗CPU,我们就可以让线程睡眠/休眠 1秒
参数的单位是ms, 1000ms 就是 1s。这行代码就是让线程主动进入"阻塞状态"(PCB上的状态属性),主动放弃去 CPU上执行。时间到了之后,线程才会解除"阻塞状态",变成"就绪状态",等待重新被调度到 CPU上执行,在执行时是"运行状态"
在main方法中处理sleep有两种选择 (1)throws (2)try-catch
在run中就只能用try - catch 进行处理
这是因为throws其实是方法签名(method signature)的一部分
方法签名包括
- 方法名字
- 方法的参数列表(类型和个数)
- 声明抛出的异常
注意:方法签名不包含 返回值, public/private
在重写方法的时候,就要求方法签名得是一样的,而父类的run中是没有抛出异常的,并且Thread是标准库的类,也改不了。此时,重写的时候,也就无法抛出异常
上述讨论的都是编译器javac的规则,和JVM没有关系
这个是编译器,把Java源文件变成.class 的字节码文件
这个用来加载运行.class,就是JVM
完整代码:
class MyThread extends Thread {
@Override// 这个表示重写了父类的方法,提醒编译器进行更严格的检查
//这个run方法就是描述了线程具体要做什么
public void run() {
while(true) {
System.out.println("Thread");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
public class Test {
public static void main(String[] args) {
Thread t = new MyThread();//创建线程实例
t.start();//启动新的线程
while (true) {
System.out.println("main");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
相比之前,循环打印的速度明显变慢许多
实际开发中,服务器程序消耗CPU资源超出预期,如何排查这个问题?
- 需要先确认哪个线程消耗的CPU比较高,一些第三方工具可以看到每个线程的的CPU消耗情况
- 确定之后,进一步排查线程中是否有类似的"非常快速"的循环
- 确认清楚这里的循环是否就应该这么快。如果应该,说明需要升级更好的CPU了;如果不应该,就说明需要在循环中引入一些"等待"操作(不一定是sleep)
每一秒打印的时候可能是 main 在前面,也可能是 Thread 在前面
多个线程的调度顺序是"无序"的, 在操作系统内部也称为"抢占式执行",也就是说,任何一个线程,在执行的过程中,都可能被其他线程抢占掉它的CPU资源,于是CPU就给其他线程执行了。正是这样的随机性使多线程程序的执行结果难以预测,甚至可能引入bug
- 主流的系统(Windows, Linux…)都属于这种"抢占式"执行方式
- 也有小众的系统(实时操作系统),通过"协商式"进行调度,比如发射卫星,火箭的系统。牺牲了很多功能,换来了调度的实时性
使用sleep控制的是"线程休眠/阻塞的时间",而不是"两个代码执行的间隔时间"
这个方法能获取到当前系统的毫秒级时间戳。
执行到Thread.sleep(1000);
的时候,1000ms之内,当前线程是不会去CPU上执行的(阻塞),当时间到了之后,线程从"阻塞状态"恢复到"就绪状态",不代表线程能立即去CPU上执行,还要等CPU调度
public class Test {
public static void main(String[] args) throws InterruptedException {
System.out.println("begin:" + System.currentTimeMillis());
Thread.sleep(1000);
System.out.println("end:" + System.currentTimeMillis());
}
}
从上面运行的结果看出,两个代码执行的间隔是超过了1秒的。所以,这个sleep只能保证
实际休眠时间是大于等于参数设置的休眠时间的。
start(启动⼀个线程)
start是Thread 类中自带的方法,本质是调用 操作系统 提供的"创建线程"的API, 在内核中创建对应的PCB, 并且把PCB加入到管理PCB的数据结构中。当系统调度到这个线程之后,就会执行run方法中的逻辑
重写的run方法只是描述了线程要做的任务,只是被定义出来,没有被调用
run方法不是 start方法调用的, 而是 在start 创建出来的线程里被调用的
调用 start 方法, 才是真的在操作系统的底层创建出⼀个线程.
我们可以通过 jconsole 看到一些调用关系
一个线程对应一个调用栈,调用栈可以认为是方法栈。 我们的两个线程是有两个不同的独立的调用栈,互不影响。调用栈能够压入/弹出栈帧。
调用栈里面有很多栈帧,每个栈帧对应一个未运行完的方法,栈帧里保存了该方法的返回地址和局部变量…,从逻辑上讲,栈帧就是一个方法执行的环境。
假设有3个方法存在这样的调用关系,方法a调用b,而b调用c。在执行c的时候,这三个方法就对应着三个不同的栈帧,调用栈里的情况就是a的栈帧在栈底,b的栈帧在中间,c的栈帧在栈顶。当c执行完,c的栈帧从调用栈弹出,执行b方法,b执行完,调用栈弹出b的栈帧,执行a。
相同的,如果想要执行b,就将b的栈帧压入调用栈,如果要执行c,就将c的栈帧压入调用栈。
如果不用start方法创建线程,而是直接调用run 方法
class MyThread extends Thread {
@Override// 这个表示重写了父类的方法,提醒编译器进行更严格的检查
//这个run方法就是描述了线程具体要做什么
public void run() {
while(true) {
System.out.println("Thread");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
public class Test {
public static void main(String[] args) {
Thread t = new MyThread();//创建线程实例
//t.start();//启动新的线程
//没有创建出新的线程,是在主线程中执行上述run方法
t.run();
while (true) {
System.out.println("main");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
此时run 方法 和 主线程的循环是"串行执行",不是"并发执行"。只有run中的循环结束,才能执行主线程的循环。
另外, 一个Thread对象,只能start一次
public class Test {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(()-> {
System.out.println("Thread");
});
t.start();
Thread.sleep(1000);
t.start();
}
}
创建线程的其他方式
上述代码中,创建Thread子类,重写run方法只是第一种创建线程的写法
通过实现 Runnable 接口创建线程
Runnable描述了一个"任务",这个任务和具体的执行机制无关,也就是无所谓是通过线程执行还是其他方式
实现的run方法也就是任务的内容
之前是线程自己描述要做的任务,现在是 Runnable 接口来描述任务是啥, Thread负责执行。引入Runnable就是为了解耦合。也就是把任务内容 和 线程 这个概念拆分开了,这样任务就能以其他方式执行,改动成本较低。例如,当前是通过多线程的方式执行,之后可以很方便地改成基于线程池或者虚拟线程的方式执行
class MyRunnable implements Runnable{
@Override
public void run() {
System.out.println("第二种创建线程的写法");
}
}
public class Test {
public static void main(String[] args) {
Thread t = new Thread(new MyRunnable());//不是创建子类实例了
t.start();
}
}
改动前面的写法,使用匿名内部类来实现
这里不是单纯的new Thread,而是把下面的几个操作合在一起了
- 创建了一个 Thread 的子类(不知道啥名字,匿名)
- 同时创建了一个该子类的实例(对于匿名内部类来说,只能创建这一个实例,这个实例创建完之后,再也拿不到这个匿名内部类了)
- 此处的子类内部重写了父类的run方法
public class Test {
public static void main(String[] args) {
Thread t = new Thread() {
@Override
public void run() {
System.out.println("第三种");
}
};
t.start();
}
}
t是定义的变量名/对象名,也可以舍去
public class Test {
public static void main(String[] args) {
new Thread() {
@Override
public void run() {
System.out.println("第三种");
}
}.start();
}
}
上面是针对Thread的匿名内部类,接下来是针对 Runnable 的匿名内部类,是把Runnable 的实例作为参数传入到Thread的构造方法中。相当于下面的几个操作:
- 创建新的类,实现Runnable接口
- 创建了这个类的实例
- 重写run方法
public class Test {
public static void main(String[] args) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("第四种");
}
});
t.start();
}
}
使用lambda表达式
lambda就是针对匿名内部类的平替,本质上是"匿名函数",一次性的函数
此处的 lambda就是要代替刚才重写的run方法
public class Test {
public static void main(String[] args) {
Thread t = new Thread(()->{
System.out.println("第五种");
});
t.start();
}
}
上述写法都是
- 把线程执行的任务内容表示出来
- 通过Thread的start方法来创建/启动线程
线程创建的方式:
- 继承 Thread
- 使用 Runnable
- 使用 lambda
- 使用线程池
- 使用 Callable
Thread提供的一些属性和方法
Thread 类是 JVM 用来管理线程的⼀个类,换句话说,每个线程都有⼀个唯⼀的 Thread 对象与之关联。
构造方法
方法 | 说明 |
---|---|
Thread () | 创建线程对象 |
Thread(Runnable target) | 使用Runnable对象创建线程对象 |
Thread(String name) | 创建线程对象,并命名 |
Thread(Runnable target,String name) | 使用Runnable对象创建线程对象,并命名 |
Thread(ThreadGroup group, Runnable target) | 线程可以被分组管理 |
是否给线程起名字,对于线程本身的运行效果是没有任何影响的。但是可以在Java进程运行过程中,通过工具看到每个线程的名字。出现问题的时候,更直观的把出现问题的线程和代码关联起来,方便调试
如果没起名字,也有默认的名字 Thread-0,Thread-1,Thread-2…
public class Test {
public static void main(String[] args) {
Thread t = new Thread(()->{
while (true) {
System.out.println("Thread");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "自定义线程");
t.start();
}
}
我们没在线程列表中看到main线程。这是因为main线程执行完毕了,而不是没有创建
属性
属性 | 获取方法 |
---|---|
ID | getId() |
名称 | getName() |
状态 | getState() |
优先级 | getPriority() |
是否后台线程(守护线程) | isDaemon() |
是否存活(运行结束) | isAlive() |
是否被中断 | isInterrupted() |
- 这里的ID和 PCB上的PID是不一样的,这里是JVM自己搞的一套ID体系(Java代码无法获取到PCB的ID),不过还是线程的唯一标识,不同线程不会重复
- 各种调试工具会用到名称
- 虽然Java提供了优先级接口,实际上就算修改了优先级也不一定真的会改。修改是一回事,系统调度是另一回事,这里的优先级修改只能算是一个"建议参考",还是以系统自身为准
- 前台线程:这样的线程如果不运行结束的话,所在的进程一定不会结束。可以有多个前台线程,必须所有前台线程执行完进程才结束
- 后台线程(守护线程):这样的线程,即使继续在执行,也不能阻止进程结束。也就是没有前台线程了,不管后台线程有没有执行完,都会把后台线程结束掉
- 在Java中,main线程就是前台线程; 另外程序猿创建出来的线程,默认情况下都是前台线程,可以通过setDaemon方法把线程设置成后台线程。在jconsole中看到的其他JVM内置的线程就属于后台线程了
public class Test {
public static void main(String[] args) {
Thread t = new Thread(()-> {
for (int i = 0; i < 5; i++) {
System.out.println("Thread");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t.start();
}
}
t默认是前台线程,在执行过程中,进程是不能结束的
main执行完start方法就结束了,时间很短,所以只剩下t 这一个前台线程决定进程结不结束
public class Test {
public static void main(String[] args) {
Thread t = new Thread(()-> {
for (int i = 0; i < 5; i++) {
System.out.println("Thread");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
//设置为后台线程
t.setDaemon(true);
t.start();
}
}
此时,进程中只有main线程是前台线程了。只要main线程结束,整个进程就结束了。
注意:关于线程各种属性的设置都要放到这个线程start之前。一旦线程启动,再设置就来不及了
- 是否存活:指的是系统中的线程(PCB)是否还存在,Thread对象的生命周期和PCB的生命周期不一定完全一样
运行这个代码就已经创建了Thread的实例,也就是Thread对象已经诞生了,但此时,内核中的PCB还没有创建
运行这行代码才是真正在系统中创建出线程(PCB才真正创建出来并且加入到管理PCB的数据结构中)
由于t 线程中的内容是空的,所以t 瞬间就执行完毕了, 内核中的线程和PCB就被销毁了。但是在sleep结束之前,t引用指向的Thread对象,仍然是存在的,并没有被GC回收掉。此时系统中的线程先结束了,但是Thread对象还存在
这个写法则是线程还没执行完毕,但是t 指向的对象就被GC回收了
由于Thread对象的生命周期和系统中线程的生命周期不一致(不能说谁长谁短),就可以用isAlive方法判断系统中的线程是否存在
public class Test {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(()-> {
});
t.start();
Thread.sleep(1000);
System.out.println(t.isAlive());
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(()-> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
t.start();
Thread.sleep(1000);
System.out.println(t.isAlive());
}
}
终止一个线程
在Java中,终止线程都只是"提醒,建议",真正要不要终止,还得线程自己来进行决定
比如:t 线程正在执行,其他线程,只是提醒一下t 是不是要终止了,t 收到提醒之后,还是得自己决定
系统原生的线程中,其实有办法让别的线程被强制终止。
自己实现的控制线程结束的例子
核心思路是让需要终止的线程的 入口方法尽快执行结束(跳出循环, return…)
变量捕获:lambda表达式能捕获到所在封闭块的局部变量和参数。捕获的变量,这个变量要么是被final修饰,要么是"事实"final
如果把下方的修改操作删除,此时 isRunning虽然没有被final修饰,但他是"事实"final(没有被修改),这样就不会报错了
下面这种写法就不是变量捕获了,而是内部类 访问 外部类的成员,lambda本质上就是一个匿名内部类,实现了函数式接口
public class Test {
private static boolean isRunning = true;
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(()-> {
while (isRunning) {
System.out.println("Thread");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("t 线程已经结束了");
});
t.start();
Thread.sleep(3000);
//3秒之后,主线程修改 isRunning 的值,从而通知 t结束
System.out.println("控制t 线程结束");
isRunning = false;
}
}
如果把while条件的isRunning换成true, 那么别的线程就只能建议,最终决定结不结束的是t 线程自己
这种写法中,假设t 线程是sleep 10s,甚至更长。此时 main线程是无法 及时的把t 线程终止掉的(要等待下次循环判定),所以推荐下面这种
使用Thread提供的 interrupt方法和 isInterrupted方法
Thread 内部包含了⼀个 boolean 类型的变量作为线程是否被中断的标记
方法 | 说明 |
---|---|
public void interrupt () | 中断对象关联的线程。如果线程没有被阻塞,这时调用 interrupt()将不起作用,直到执行到wait(),sleep(),join()时(被阻塞),才马上会抛出异常 。实际上只是给线程设置一个中断标志,线程仍会继续运行。 |
interrupt方法除了能设置 boolean值,还可以唤醒 sleep等阻塞的方法。
在sleep(10000)中,刚休眠1秒。
- 按照第一种写法,必须再等 9秒才能让线程结束(sleep结束了,才能继续进行循环判定)
- 第二种写法则会立即让sleep抛出一个 InterruptedException异常,不会再等待立即就唤醒了
public class Test {
public static void main(String[] args) {
Thread t = new Thread(()-> {
while (!Thread.currentThread().isInterrupted()) {
System.out.println("Thread");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t.start();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
t.interrupt();
}
}
唤醒sleep就是让sleep抛出异常,从而进入catch中
这段代码还会有些问题,是以抛出异常来终止程序的,这样不太好
修改下线程中catch里的内容:
throw e和e.printStackTrace()的区别
public class Test {
public static void main(String[] args) {
Thread t = new Thread(()-> {
while (!Thread.currentThread().isInterrupted()) {
System.out.println("Thread");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
t.interrupt();
}
}
明明已经修改了标志位,把while里的条件改成true了,下一轮循环,条件应该不成立了才对,但现在打印异常之后还会执行
出现这个现象是sleep在搞鬼,如果代码没有sleep,确实是直接修改了标志位就结束了。触发interrupt的时候,线程正在sleep。sleep被唤醒的同时,就会清除刚才的标志位(改回false),之所以要改回来,就是把控制权转交给程序猿自己,当前线程是要继续执行还是要立即结束,还是要等会结束.
- 不结束的代码:
- 立即结束的代码:
- 等会结束的代码:
(join)线程的等待
多个线程的调度顺序,在系统中是无序的(抢占式执行),我们就希望程序的结果是稳定的,不应该是"随机的"。
我们可以通过线程等待的方式确定线程结束的先后顺序。
public class Test {
public static void main(String[] args) {
Thread t = new Thread(()-> {
for(int i = 0; i < 5; i++) {
System.out.println("Thread");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("Thread end");
});
t.start();
for(int i = 0; i < 5; i++) {
System.out.println("main");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("main end");
}
}
上述代码中,main线程 和 t 线程 谁先结束是不确定的
如果希望t 线程先结束, main线程后结束,就可以在 main中使用线程等待(join)。
main线程调用 t.join
就是让 main 等待 t。
main中调用上述 join方法,有以下3种可能:
- 如果t 线程此时已经结束了, join就会立即返回,main继续执行
- 如果t 线程此时还没结束, join就会"阻塞等待"。一直等到t 线程结束之后,join才解除"阻塞",继续执行
- 如果t 线程没start,join就会直接返回,相当于第一种情况
阻塞:该线程暂时不参与 CPU 调度执行
public class Test {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(()-> {
for(int i = 0; i < 5; i++) {
System.out.println("Thread");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("Thread end");
});
t.start();
for(int i = 0; i < 5; i++) {
System.out.println("main");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
t.join();
System.out.println("main end");
}
}
这样t 线程就会在main线程之前结束
再举一个例子:
public class Test {
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(()-> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
for(int i = 0; i < 2; i++) {
System.out.println("t1");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("t1 end");
});
Thread t2 = new Thread(()-> {
//t2线程一启动就先等待 t1 结束
try {
t1.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
for(int i = 0; i < 3; i++) {
System.out.println("t2");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("t2 end");
});
t1.start();
t2.start();
//main线程只等待 t2
t2.join();
System.out.println("main end");
}
}
这就是:
- t2 等 t1
- main 等 t2
我们还可以尝试让t 等待main:
方法 | 说明 |
---|---|
currentThread() | 哪个线程调用,就返回哪个线程的引用 |
public class Test {
public static void main(String[] args) {
System.out.println(Thread.currentThread().getName());
}
}
注意: main线程是主线程,和main方法没有关系,只是同名而已。
public class Test {
public static void main(String[] args) throws InterruptedException {
Thread main = Thread.currentThread();//main这个引用就是指向 main线程的 Thread对象
Thread t = new Thread(()-> {
try {
main.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("t线程 结束");
});
t.start();
Thread.sleep(5000);
System.out.println("main 结束");
}
}
方法 | 说明 |
---|---|
join() | 死等 |
join(long millis) | 最多等millis毫秒 |
join(long millis, int nanos) | 最多等待millis毫秒nanos纳秒 |
public class Test {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(()-> {
for (int i = 0; i < 5; i++) {
System.out.println("Thread");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("t end");
});
t.start();
t.join(3000);
System.out.println("main end");
}
}
线程的状态
之前谈到的"进程的状态",更准确的说,应该是"线程的状态"或者是"PCB的状态"
线程的状态是⼀个枚举类型 Thread.State
public class Test {
public static void main(String[] args) {
for (Thread.State state : Thread.State.values()) {
System.out.println(state);
}
}
}
在Java,线程的状态大致分成6个
NEW
Thread对象有了,但还没调用start(系统内部的线程还没创建)
public class Test {
public static void main(String[] args) {
Thread t = new Thread(()-> {
System.out.println("Thread");
});
System.out.println(t.getState());
t.start();
}
}
TERMINATED
线程已经终止了(内核中的线程已经销毁了),但Thread对象还在
public class Test {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(()-> {
System.out.println("Thread");
});
t.start();
t.join();//确保t 线程执行完毕
System.out.println(t.getState());
}
}
RUNNABLE
就绪状态,也就是随叫随到,有两种情况:
- 这个线程正在CPU上执行
- 这个线程虽然没在CPU上执行,但随时可以被调度到CPU上执行
有些书上可能进一步区分,第一种叫做 RUNNING,第二种叫做 RUNNABLE / READY, 但这不一定是针对Java的线程进行描述的,而且区分了也干预不了,所以可以不用管
阻塞的三个状态:
WAITING :死等进入的阻塞
public class Test {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(()-> {
while (true) {
System.out.println("Thread");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t.start();
//死等版本的join
t.join();
}
}
在jconsole查看线程的状态
TIMED_WAITING : 带有超时时间的等进入的阻塞
public class Test {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(()-> {
while (true) {
System.out.println("Thread");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t.start();
t.join(3600*1000);
}
}
sleep也是带有时间的,也是TIMED_WAITING
BLOCKED : 进行锁竞争的时候产生的阻塞
下面会介绍到
只要线程出现上述三种中的任意一个状态,都是阻塞,不过是产生这几个状态的原因不一样。
如果发现"某个线程卡死了",这个时候就需要去关注线程的状态,通过状态就能看到线程是在哪一行代码卡住了(阻塞),原因大概是什么
线程安全问题
线程是随机调度,抢占式执行的。这样的随机性导致了程序的执行顺序不确定,会产生不同的结果,有时候,这就是bug(不符合需求)
多线程代码引起了bug,这样的问题就是"线程安全问题"; 存在线程安全问题的代码就认为是线程"不安全"
一个线程不安全的经典例子:
public class Test {
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(()-> {
for (int i = 0; i < 50000; i++) {
count++;
}
});
Thread t2 = new Thread(()-> {
for (int i = 0; i < 50000; i++) {
count++;
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("count = " + count);
}
}
预期是10万,但每次执行的结果都不正确且不一样,充满随机性
很明显,这就是非常严重的bug。这是典型的多线程并发导致的问题,如果让两个线程串行执行是没有问题的
public class Test {
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(()-> {
for (int i = 0; i < 50000; i++) {
count++;
}
});
Thread t2 = new Thread(()-> {
for (int i = 0; i < 50000; i++) {
count++;
}
});
t1.start();
t1.join();
t2.start();
t2.join();
System.out.println("count = " + count);
}
}
理解上述问题,就需要理解CPU指令
count++
这一行代码,可以粗略地认为是3个CPU指令
- 把内存中count的值,读取到CPU寄存器中,这个指令我们取名为load
- 把寄存器中的值+1,还是继续保存在寄存器中,这个指令我们取名为add
- 把寄存器中计算后的值写回内存的count中,这个指令我们取名为save
现在是两个线程并行或并发地执行count++
,这样就会存在变数:
- 并行执行会让指令的顺序变得不确定
- 某个线程执行指令的过程中,所在的CPU核心都有可能被其他线程抢占走(并发与抢占式执行导致的问题),这也会导致指令的顺序不确定
下列顺序就是我们假设的一种执行方式,实际上系统调度的时候可能会产生这种顺序,也可能产生其他顺序
上述执行过程,虽然是两次count++
,最后内存存储的count值仍然是1
这是因为后一次计算结果把前一次计算结果覆盖掉了
由于当前线程执行的顺序不确定,有些时候执行的结果正确,有些时候像上述这样出现bug。
结果也有可能出现小于50000的情况。因为可能t1线程执行比较快,进行了十多次count++
,而t2 线程才刚刚执行完一次count++
把count的值覆盖了,这就导致结果小了十几
- 一个线程的save在另一个线程的load之前,就是OK的
- 一个线程的save在另一个线程的load之后,都是有问题的
出现线程不安全的原因
- 线程在系统中是并行并发执行的
- 多个线程同时修改同一个变量
- 一个线程修改同一个变量=>没事
- 多个线程读取同一个变量=>没事
- 不同线程分别修改不同的变量=>没事
- 当前代码中线程针对变量的修改操作,不是"原子"的
count++
对应着CPU三个指令,不是原子操作(对应一个CPU指令)
除了我们的例子体现的这三个问题还有另外两个
- 内存可见性问题导致的线程不安全
- 指令重排序,引起的线程不安全问题
解决上述问题(synchronized 关键字)
要从原因入手,原因1无法干预,原因2是一个切入点,但这种做法不是很普适,因为可能就是要多线程同时修改同一个变量。原因3则是解决线程安全问题最普适的方案
锁,本质上也是操作系统提供的功能,Java对于这样的系统API进行了封装
关于锁的操作主要有两个方面:
- 加锁。
- 解锁。
通过"加锁"就起到了"互斥"的效果(锁竞争/锁互斥)
锁的主要特性:互斥
如果是两个线程分别对不同的对象加锁不会产生互斥,只有对同一对象加锁才有互斥,比如:
t1 线程对locker对象加上锁之后,t2 也尝试对locker对象进行加锁,t2 就会阻塞等待(BLOCKED状态), 直到t1 解锁了之后,t2 才有可能拿到锁(加锁成功)
- 之所以是可能拿到锁,是因为尝试竞争锁的线程不一定只有一个。
- 如果只有一个线程在等,那就是一定能拿到锁
synchronized 的特性
互斥
synchronized 会起到互斥效果, 某个线程执行到某个对象的 synchronized 中时, 其他线程如果也执行到同⼀个对象 synchronized 就会阻塞等待.
• 进入 synchronized 修饰的代码块, 相当于 加锁
• 退出 synchronized 修饰的代码块, 相当于 解锁
Java中随便一个对象,都可以作为加锁对象。其他大部分主流语言都只是极少数特定的对象可以当锁对象
注意:
- 上⼀个线程解锁之后, 下⼀个线程并不是立即就能获取到锁。要靠操作系统来 “唤醒”, 这也是操作系统线程调度的⼀部分工作.
- 如果有多个线程同时竞争锁, 如果不写代码指定哪个线程拿到锁,那就是随机调度(Java的synchronized是这样的非公平锁,公平锁就遵循先来后到的原则)
可重入
synchronized 同步块对同⼀条线程来说是可重入的,也就是不会出现自己把自己锁死的问题
在下面的"死锁"部分会进一步介绍
解决问题的代码
写一个代码,两个线程针对同一个对象加锁
public class Test {
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
//先创建出一个对象,使用这个对象作为锁
Object locker = new Object();
Thread t1 = new Thread(()-> {
for (int i = 0; i < 50000; i++) {
synchronized (locker) {
count++;
}
}
});
Thread t2 = new Thread(()-> {
for (int i = 0; i < 50000; i++) {
synchronized (locker) {
count++;
}
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("count = " + count);
}
}
这两个线程每次进行count++
是存在锁竞争的,会变成"串行"执行;但是判定for循环的条件和i++仍然是并发执行的。也就是此处两个线程仍然是并发执行的,少数逻辑下会"串行"
这就是最朴素的加锁操作
- synchronized是Java的关键字,不是方法,里面的功能都是JVM内部实现的
- synchronized后面的()里面就写的是"锁对象"
锁对象的用途,有且只有一个,就是用来区分两个线程是否是针对同一个对象加锁。如果是,就会出现锁竞争/锁冲突/互斥,就会引起阻塞等待。如果不是,就不会出现锁竞争,也就不会阻塞等待。
和对象具体是啥类型,和它里面有啥属性和方法,接下来是否要操作这个对象…统统没有关系
- synchronized下面的{}
当进入到代码块,就是给上述()里面的锁对象进行了加锁操作
当出了代码块,就是给上述()里面的锁对象进行解锁操作
没有形如其他语言的lock(),unlock()方法
执行过程:
这样的阻塞,就使t2 的load出现在t1 的save之后,强行构造出了"串行执行"效果
public class Test {
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
//先创建出一个对象,使用这个对象作为锁
Object locker = new Object();
Thread t1 = new Thread(()-> {
synchronized (locker) {
for (int i = 0; i < 50000; i++) {
count++;
}
}
});
Thread t2 = new Thread(()-> {
synchronized (locker) {
for (int i = 0; i < 50000; i++) {
count++;
}
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("count = " + count);
}
}
这个情况,就是t1 把5万次coun++
循环完t2 才开始循环 。此时,判定for循环的条件和i++就不能并发执行了。虽然结果对,但还不如直接写单线程
class Counter {
private int count = 0;
public void add() {
count++;
}
public int getCount() {
return count;
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter();
Thread t1 = new Thread(()-> {
for (int i = 0; i < 50000; i++) {
counter.add();
}
});
Thread t2 = new Thread(()-> {
for (int i = 0; i < 50000; i++) {
counter.add();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("count = " + counter.getCount());
}
}
这段代码依然可以通过加锁的方式解决线程安全问题
class Counter {
private int count = 0;
public void add() {
count++;
}
public int getCount() {
return count;
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter();
Object locker = new Object();
Thread t1 = new Thread(()-> {
for (int i = 0; i < 50000; i++) {
synchronized (locker) {
counter.add();
}
}
});
Thread t2 = new Thread(()-> {
for (int i = 0; i < 50000; i++) {
synchronized (locker) {
counter.add();
}
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("count = " + counter.getCount());
}
}
我们可以直接对counter对象进行加锁
具体是针对哪个对象加锁并不重要,重要的是 是否是针对同一个对象来加锁
class Counter {
private int count = 0;
public void add() {
count++;
}
public int getCount() {
return count;
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter();
Thread t1 = new Thread(()-> {
for (int i = 0; i < 50000; i++) {
synchronized (counter) {
counter.add();
}
}
});
Thread t2 = new Thread(()-> {
for (int i = 0; i < 50000; i++) {
synchronized (counter) {
counter.add();
}
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("count = " + counter.getCount());
}
}
把锁加到方法外头和加到方法里头没什么区别
class Counter {
private int count = 0;
public void add() {
synchronized (this) {
count++;
}
}
public int getCount() {
return count;
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter();
Thread t1 = new Thread(()-> {
for (int i = 0; i < 50000; i++) {
counter.add();
}
});
Thread t2 = new Thread(()-> {
for (int i = 0; i < 50000; i++) {
counter.add();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("count = " + counter.getCount());
}
}
加锁的生命周期和方法的生命周期一样的时候,就可以把synchronized写到方法上
class Counter {
private int count = 0;
//这个写法就相当于一进入方法就针对this加锁.锁对象就是this,只是省略了
synchronized public void add() {
count++;
}
public int getCount() {
return count;
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter();
Thread t1 = new Thread(()-> {
for (int i = 0; i < 50000; i++) {
counter.add();
}
});
Thread t2 = new Thread(()-> {
for (int i = 0; i < 50000; i++) {
counter.add();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("count = " + counter.getCount());
}
}
synchronized修饰普通方法,就相当于针对this加锁,this可能指向不同的对象
如果是修饰static方法(没有this),就相当于针对该类的类对象加锁。一旦有多个线程调用都会触发锁竞争
也就是
在Java中,就可以通过类名.class的方式拿到这个类对象。一个Java进程中,某个类只能有唯一的类对象
死锁
class Counter {
private int count = 0;
public void add() {
synchronized (this) {
count++;
}
}
public int getCount() {
return count;
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter();
Thread t1 = new Thread(()-> {
for (int i = 0; i < 50000; i++) {
counter.add();
}
});
Thread t2 = new Thread(()-> {
for (int i = 0; i < 50000; i++) {
synchronized (counter) {
counter.add();
}
}
});
t2.start();
t2.join();
System.out.println("count = " + counter.getCount());
}
}
方法里头加了锁, t2 线程又加了锁
相当于
假定我们不启动t1 线程,只启动t2 线程.。t2 第一次加锁肯定能成功,当尝试第二次加锁时counter对象已经处于被锁定的状态了,根据之前的理解,尝试对已经被锁定的对象加锁会阻塞等待到锁对象被解锁为止
- 第二把锁要获取到锁对象就需要执行完外层的大括号
- 要想执行完外层的大括号,就需要第二把锁获取到锁对象执行完内层的大括号
矛盾了,这种情况就叫"死锁"。实际上,Java的synchronized对这种情况做出了特殊处理而不会出现"死锁",如果是C++/Python的锁就会出现"死锁"
Java的每个锁对象里,会记录当前是哪个线程持有了自己这个锁对象。已经被加锁之后又有线程想加锁,会先判定一下当前尝试加锁的线程是否已经对自己加过锁了,如果不是就阻塞;如果是就不会阻塞。
注意:当有多层synchronized同时对一个对象加锁时,执行完最外层的大括号才会解锁
在可重入锁的内部, 包含了 “线程持有者” 和 “计数器” 两个信息.
• 如果某个线程加锁的时候, 发现锁已经被人占用, 但是恰好占用的正是自己, 那么仍然可以继续获取到锁, 并让计数器自增.
• 解锁的时候计数器递减为 0 的时候, 才真正释放锁. (才能被别的线程获取到)
synchronized这样的机制,就叫做"可重入",为了避免这样的"死锁"(条件是锁是不可重入锁,并且一个线程对同一个锁对象连续加锁多次)
还有两种典型的"死锁"
- 两个线程两把锁
先让两个线程分别获取到一把锁,然后去尝试获取对方的锁
public class Test {
public static void main(String[] args) throws InterruptedException {
Object locker1 = new Object();
Object locker2 = new Object();
Thread t1 = new Thread(()-> {
synchronized (locker1) {
try {
//引入sleep,为了更好地控制线程的执行顺序
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (locker2) {
System.out.println("t1 获取了两把锁");
}
}
});
Thread t2 = new Thread(()-> {
synchronized (locker2) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (locker1) {
System.out.println("t2 获取了两把锁");
}
}
});
t1.start();
t2.start();
t1.join();
t2.join();
}
}
进程没有结束,也没有打印任何线程中的内容,这就是"死锁"了
- t1 尝试对locker2加锁就会阻塞等待,等待t2 释放locker2
- t2 尝试对locker1加锁也会阻塞等待,等待t1 释放locker1
可以在jconsole里看到实际的效果
如果遇到死锁问题,就可以通过上述调用栈+状态进行定位了
- N个线程,M把锁
哲学家就餐问题:
下面是百度百科的介绍
死锁会使线程卡住,没法继续执行了。更可怕的是,死锁往往是概率性出现的,指不定什么时候出现
死锁的四个必要条件(缺一不可):
- 锁具有互斥特性: 一个线程拿到锁之后,其他线程尝试获取就得阻塞等待
- 锁不可抢占(剥夺): 一个线程拿到锁之后,除非它自己主动释放,否则别人抢不走
- 请求和保持:一个线程拿到一把锁之后,不释放这个锁的前提下,尝试获取其他锁(嵌套加锁)
- 循环等待:多个线程获取多个锁的过程中,出现了循环等待,比如A等待B,B又等待A
第一第二点因为synchronized固定改不了,第三点也不具有普适方法,有时候就需要嵌套加锁。只要破除循环等待,即使出现嵌套,也不会"死锁"。也就是约定好加锁的顺序,让所有的线程都按照固定的顺序来获取锁就可以了
例如,两个线程两把锁那里约定必须先获取locker1后获取locker2
public class Test {
public static void main(String[] args) throws InterruptedException {
Object locker1 = new Object();
Object locker2 = new Object();
Thread t1 = new Thread(()-> {
synchronized (locker1) {
try {
//引入sleep,为了更好地控制线程的执行顺序
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (locker2) {
System.out.println("t1 获取了两把锁");
}
}
});
Thread t2 = new Thread(()-> {
synchronized (locker1) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (locker2) {
System.out.println("t2 获取了两把锁");
}
}
});
t1.start();
t2.start();
t1.join();
t2.join();
}
}
哲学家就餐问题就可以给每一个餐叉从小到大顺时针定个编号,约定每个哲学家必须先获取编号小的筷子后获取编号大的筷子
只要约定好加锁顺序,就可以有效避免死锁了
内存可见性引起的线程安全问题
可见性: ⼀个线程对共享变量值的修改,能够及时地被其他线程看到
Java 内存模型 (JMM): Java虚拟机规范中定义了Java内存模型,目的是屏蔽掉各种硬件和操作系统的内存访问差异,以实现Java程序在各种平台下都能达到⼀致的并发效果.
- 线程之间的共享变量存储在 主内存 (Main Memory).
- 每⼀个线程都有只属于自己的 “工作内存” (Working Memory) .
- 当线程要读取⼀个共享变量的时候, 会先把变量从主内存拷贝到工作内存, 再从工作内存读取数据.
- 当线程要修改⼀个共享变量的时候, 也会先修改工作内存中的副本, 再同步回主内存
这些工作内存中的内容相当于同⼀个共享变量的 “副本”. 此时修改线程1 的工作内存中的值, 线程2 的工作内存不⼀定会及时变化.
此时引入了两个问题:
- 为啥整这么多内存?
实际上并没有这么多 “内存”. 这只是 Java 规范中的⼀个术语, 是属于 “抽象” 的叫法.
所谓的 “主内存” 才是真正硬件角度的 “内存”. 而所谓的 “工作内存”, 则是指 CPU 的寄存器和高速缓存.
- 为啥要这么麻烦的拷来拷去?
因为 CPU 访问自身寄存器的速度以及高速缓存的速度, 远远超过访问内存的速度(快了 3 ~ 4 个数量级,也就是几千倍, 上万倍).
比如某个代码中要连续 10 次读取内存中某个变量的值, 如果 10 次都从内存读, 速度是很慢的. 但是如果只是第⼀次从内存读, 读到的结果缓存到 CPU 的某个寄存器中, 那么后 9 次读数据就不必直接访问内存了,效率就大大提高了
public class Test {
private static int count = 0;
public static void main(String[] args) {
Thread t1 = new Thread(()-> {
while (count == 0) ;
System.out.println("t1 执行结束");
});
Thread t2 = new Thread(()-> {
Scanner scanner = new Scanner(System.in);
System.out.println("请输入一个整数:");
count = scanner.nextInt();
});
t1.start();
t2.start();
}
}
当t2 线程读入到一个不为0的整数时,我们预期t1 就会结束循环
但没有结束
上述代码是一个线程写,一个线程读就会出问题
这行代码大致可以认为是两个CPU指令
- load:从内存读取数据到CPU寄存器
- cmp: 比较,同时产生跳转。条件成立,继续顺序执行;条件不成立就跳转到另外一个地址执行
当前循环速度很快,短时间内会出现大量的 load 和 cmp 反复执行的过程。而load 执行消耗的时间会比 cmp 多很多
因为load速度慢很多,另外JVM还发现每次load执行的结果其实是一样的(在t2 修改之前)。
由于这两个原因,JVM就把load操作优化掉了,只有第一次进行真正的load(从主内存读取),后续再执行就不是真正的load了,而是直接读取已经load过的寄存器里的值。这样把速度慢的优化掉,使程序执行速度更快了,但是这样的优化引入了bug(上述把load优化掉,导致后续t2修改count, t1也感知不到了)
所以上述问题是多线程和JVM优化产生的问题
正常来说,优化要保证逻辑是等价的。JVM在单线程代码中的优化是靠谱的,但多线程就难说了
为什么循环体里有个打印就没问题呢?
public class Test {
private static int count = 0;
public static void main(String[] args) {
Thread t1 = new Thread(()-> {
while (count ==0) System.out.println("我在循环");
System.out.println("t1 执行结束");
});
Thread t2 = new Thread(()-> {
Scanner scanner = new Scanner(System.in);
System.out.println("请输入一个整数:");
count = scanner.nextInt();
});
t1.start();
t2.start();
}
}
如果循环体内存在IO操作或者阻塞操作(sleep…),这样就会使循环的速度大幅降低了,与之相比load花的时间就可以忽略不计了,没必要对其进行优化了
那么针对内存可见性问题,我们可以用volatile关键字不让它触发优化
给变量修饰上volatile关键字后,此时编译器就知道这个变量是不能优化的
代码在写入 volatile 修饰的变量的时候,
• 改变线程工作内存中volatile变量副本的值
• 将改变后的副本的值从工作内存刷新到主内存
代码在读取 volatile 修饰的变量的时候,
• 从主内存中读取volatile变量的最新值到线程的工作内存中
• 从工作内存中读取volatile变量的副本
线程的等待通知机制(wait 和 notify)
由于线程之间是抢占式执行,随机调度的, 因此线程之间执行的先后顺序难以预知。我们无法主动让某个线程被调度,但可以主动让某个线程等待。这样可以让线程一定程度地按照预期的顺序来执行。
join是等待线程结束,此处谈到的是等待代码中给我们进行显示的通知(不一定要结束),这样可以更精细地控制线程之间的执行顺序
可以解决"线程饿死"问题:某个线程频繁地获取释放锁,由于获取太快,以至于其他线程一直被阻塞。虽然不会像"死锁"那样卡死,但是可能会卡住一下下,影响程序效率。
等待通知机制通过条件判定当前逻辑是否能够执行,如果不能执行,就主动wait(主动阻塞等待),把执行的机会给其他线程了,避免线程进行一些无意义的重试,等到合适的时机,其他线程进行通知,让阻塞的线程被唤醒
wait方法是Object类提供的,也就是任何一个对象都有这个方法。wait方法也会被Interrupt唤醒,也能自动清空标志位
public class Test {
public static void main(String[] args) throws InterruptedException {
Object object = new Object();
System.out.println("等待之前");
object.wait();
System.out.println("等待之后");
}
}
抛出异常的原因是wait是解锁的同时进行等待;相比之下,sleep也是阻塞等待,但和锁无关。
得先加上锁,才能谈释放,所以wait必须在synchronized内部使用。哪个线程调用wait哪个线程阻塞
public class Test {
public static void main(String[] args) throws InterruptedException {
Object object = new Object();
System.out.println("等待之前");
synchronized (object) {
object.wait();
}
System.out.println("等待之后");
}
}
wait 结束等待的条件:
• 其他线程调用该对象的 notify 方法.
• wait 等待时间超时 (wait 方法提供⼀个带有 timeout 参数的版本, 来指定等待时间).
• 其他线程调用该等待线程的 interrupt 方法, 导致 wait 抛出 InterruptedException 异常.
这里我们通过另外一个线程,调用notify来唤醒wait阻塞的线程
public class Test {
public static void main(String[] args) throws InterruptedException {
Object locker = new Object();
Thread t1 = new Thread(()-> {
synchronized (locker) {
System.out.println("t1 等待之前");
try {
locker.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("t1 等待之后");
}
});
Thread t2 = new Thread(()-> {
Scanner scanner = new Scanner(System.in);
synchronized (locker) {
System.out.println("t2 通知之前");
//借助scanner控制阻塞,用户输入之前,都是阻塞状态
scanner.next();
locker.notify();
System.out.println("t2 通知之后");
}
});
t1.start();
t2.start();
}
}
用户输入内容后,执行notify,唤醒wait操作从而使t1 能够回到RUNNABLE状态,并且参与调度.
t1 被唤醒后不是立即能够执行的,t1 要重新获取到锁(就要等t2 解锁)
public class Test {
public static void main(String[] args) throws InterruptedException {
Object locker = new Object();
Thread t1 = new Thread(()-> {
synchronized (locker) {
System.out.println("t1 等待之前");
try {
locker.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("t1 等待之后");
}
});
Thread t2 = new Thread(()-> {
synchronized (locker) {
System.out.println("t2 通知之前");
locker.notify();
System.out.println("t2 通知之后");
}
});
t2.start();
Thread.sleep(1000);
t1.start();
}
}
如果locker上没有wait,此时直接notify不会有任何效果(也不会抛出异常)
这段代码先执行t2, 此时t1 还没有wait。后续执行t1 进入wait的时候没人能唤醒了。
notify只能随机唤醒多个等待线程其中一个
public class Test {
public static void main(String[] args) throws InterruptedException {
Object locker = new Object();
Thread t1 = new Thread(()-> {
synchronized (locker) {
System.out.println("t1 等待之前");
try {
locker.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("t1 等待之后");
}
});
Thread t2 = new Thread(()-> {
synchronized (locker) {
System.out.println("t2 等待之前");
try {
locker.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("t2 等待之后");
}
});
Thread t3 = new Thread(()-> {
synchronized (locker) {
System.out.println("t3 通知之前");
locker.notify();
System.out.println("t3 通知之后");
}
});
t1.start();
t2.start();
Thread.sleep(1000);
t3.start();
}
}
多个notify就可以都唤醒了,但还是无法指定唤醒哪个线程。如果要指定就需要引入更多锁对象,分别进行wait和notify
public class Test {
public static void main(String[] args) throws InterruptedException {
Object locker = new Object();
Thread t1 = new Thread(()-> {
synchronized (locker) {
System.out.println("t1 等待之前");
try {
locker.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("t1 等待之后");
}
});
Thread t2 = new Thread(()-> {
synchronized (locker) {
System.out.println("t2 等待之前");
try {
locker.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("t2 等待之后");
}
});
Thread t3 = new Thread(()-> {
synchronized (locker) {
System.out.println("t3 通知之前");
locker.notify();
locker.notify();
System.out.println("t3 通知之后");
}
});
t1.start();
t2.start();
Thread.sleep(1000);
t3.start();
}
}
notifyAll就是唤醒所有等待的线程。虽然是同时唤醒, 但是唤醒的线程需要竞争锁. 所以并不是同时执行, 仍然是有先有后的执行.
public class Test {
public static void main(String[] args) throws InterruptedException {
Object locker = new Object();
Thread t1 = new Thread(()-> {
synchronized (locker) {
System.out.println("t1 等待之前");
try {
locker.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("t1 等待之后");
}
});
Thread t2 = new Thread(()-> {
synchronized (locker) {
System.out.println("t2 等待之前");
try {
locker.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("t2 等待之后");
}
});
Thread t3 = new Thread(()-> {
synchronized (locker) {
System.out.println("t3 通知之前");
locker.notifyAll();
System.out.println("t3 通知之后");
}
});
t1.start();
t2.start();
Thread.sleep(1000);
t3.start();
}
}
wait操作也提供了带超时时间的版本,上述不加任何参数的版本是死等
超过1秒还没有被notify就自动唤醒
wait 和 sleep 的对比:
其实理论上 wait 和 sleep 完全是没有可比性的,因为⼀个是用于线程之间的通信的,⼀个是让线程阻塞⼀段时间,唯⼀的相同点就是都可以让线程放弃执行⼀段时间.
- wait 需要搭配 synchronized 使用; sleep 不需要.
- wait 是 Object 的方法 sleep 是 Thread 的静态方法.
单例模式
单例模式是一种经典的设计模式,整个进程中的某个类有且只有一个对象,这样的对象称为"单例"
单例模式能保证某个类在程序中只存在唯⼀⼀份实例, 而不会创建出多个实例
代码中有些类就不应该有多个实例
比如:服务器要从硬盘上加载很多数据到内存中,肯定要写一个类封装上述加载操作,并且写一些获取/处理数据的业务逻辑。
这样的类应该是单例的,一个实例就占很大的内存了,多个实例机器就吃不消了
需要编译器帮我们做一个强制检查。通过编码上的技巧,使编译器可以自动发现代码中是否已经存在实例,并且在尝试创建多个实例的时候直接编译出错,从而保证对象是唯一实例
饿汉模式
类加载的同时, 创建实例
唯一实例创建时机非常早,类似于饿了很久的人,看到吃的就赶紧开始吃
class Singleton {
//static 成员是在 类加载 的时候初始化的,而且只有一个
private static Singleton instance = new Singleton();
//如果需要使用这个类的实例就可以通过getInstance来获取
public static Singleton getInstance() {
return instance;
}
//类之外的代码尝试new的时候势必调用构造方法,由于构造方法是私有的无法调用,就会编译出错
//这样就禁止外部代码来创建该类的实例.
private Singleton() {
}
}
public class Test {
public static void main(String[] args) {
Singleton s1 = Singleton.getInstance();
Singleton s2 = Singleton.getInstance();
System.out.println(s1 == s2);
}
}
懒汉模式
不是 类加载 的时候创建实例,而是在第一次使用的时候创建(如果不使用,就把创建实例的开销节省下来了)
class SingletonLazy {
private static SingletonLazy instance = null;
public static SingletonLazy getInstance() {
if(instance == null) {
instance = new SingletonLazy();
}
return instance;
}
private SingletonLazy() {
}
}
public class Test {
public static void main(String[] args) {
SingletonLazy s1 = SingletonLazy.getInstance();
SingletonLazy s2 = SingletonLazy.getInstance();
System.out.println(s1 == s2);
}
}
如果代码中存在多个单例类,使用饿汉模式就会导致这些实例都是在程序启动的时候扎堆创建的,可能把程序启动时间拖慢;如果是懒汉模式,首次调用时机是分散的,不太会卡顿
考虑是否线程安全
当前只考虑getInstance这一步操作,至于拿到实例后做什么,是否安全,就另当别论
饿汉模式安全,懒汉模式不安全
- 饿汉模式
因为创建实例的时候比调用main线程还早,所以后续代码创建线程肯定比实例创建晚。后续调用getInstance方法相当于多个线程读取同一个变量,所以是安全的
- 懒汉模式
满足线程不安全的三个条件:
- 线程在系统中是并行并发执行的
- 多个线程同时修改同一个变量
- 线程针对变量的修改操作,不是"原子"的
创建实例有两步:if判断和创建实例
如果当前还没有创建实例,并且有两个线程(t1 和 t2)同时调用getInstance方法,这时候if判断都是true,就会创建两个实例,虽然第一个实例的地址会被第二个实例的地址覆盖。但上述提到过实例可能会很大,占很多内存,多个实例机器可能就会吃不消。
通过加锁的方式来保证懒汉模式下getInstance是线程安全的
class SingletonLazy {
private static SingletonLazy instance = null;
private static Object locker = new Object();
public static SingletonLazy getInstance() {
if(instance == null) {
synchronized (locker) {
instance = new SingletonLazy();
}
}
return instance;
}
private SingletonLazy() {
}
}
这样加锁也不安全,不是加了锁就线程安全,也不是不加锁,线程就不安全
如果当前还没有创建实例,并且有两个线程(t1 和 t2)同时调用getInstance方法,这时候if判断都是true,就算一个线程阻塞,等另一个线程释放锁之后还是会再创建对象。所以应该把if判断 和 new实例打包成一个"原子"操作
class SingletonLazy {
private static SingletonLazy instance = null;
private static Object locker = new Object();
public static SingletonLazy getInstance() {
synchronized (locker) {
if(instance == null) {
instance = new SingletonLazy();
}
}
return instance;
}
private SingletonLazy() {
}
}
假定是t1 线程加锁, t2 尝试加锁就会阻塞直到t1 释放锁(执行完创建实例), t2 拿到锁之后进行条件判断,此时instance非空,就不会再创建实例了
上述锁加到方法上也是可以的
class SingletonLazy {
private static SingletonLazy instance = null;
synchronized public static SingletonLazy getInstance() {
if(instance == null) {
instance = new SingletonLazy();
}
return instance;
}
private SingletonLazy() {
}
}
此时调用getInstance方法,就会先加锁再执行方法
懒汉模式只有最开始调用getInstance会存在线程安全问题.一旦把实例创建好,后续再调用就只是读操作了,不存在线程安全问题。明明没有线程安全问题却还要加锁就是画蛇添足(加锁/解锁的开销比较大)
我们就可以使用双重 if 判定, 降低锁竞争的频率。
class SingletonLazy {
private static SingletonLazy instance = null;
private static Object locker = new Object();
public static SingletonLazy getInstance() {
if(instance == null) {
synchronized (locker) {
if (instance == null) {
instance = new SingletonLazy();
}
}
}
return instance;
}
private SingletonLazy() {
}
}
- 第一个if判定是否需要加锁
- 第二个if判定是否要创建对象
可能还会触发优化(内存可见性和指令重排序),所以此处还要加上volatile
class SingletonLazy {
private static volatile SingletonLazy instance = null;
private static Object locker = new Object();
public static SingletonLazy getInstance() {
if(instance == null) {
synchronized (locker) {
if (instance == null) {
instance = new SingletonLazy();
}
}
}
return instance;
}
private SingletonLazy() {
}
}
指令重排序
编译器会根据实际情况,生成的二进制指令的执行顺序和程序猿写的代码的执行顺序可能存在差别。调整顺序主要目的是提高效率(前提是保证逻辑是等价的)
单线程下编译器进行指令重排序的操作一般都是没问题的,编译器可以准确识别出哪些操作可以重排序而不影响到线程
多线程下判定可能就不准确了,就可能出现重排序后逻辑发生了改变,引起bug
instance = new SingletonLazy();
这一行代码其实可以大致分成三个步骤
- 申请内存空间
- 调用构造方法(对内存空间进行初始化)
- 把此时内存空间的地址赋值给instance引用
在指令重排序优化的策略下,上述执行过程可能是123或者是132 (1一定先执行)
这两种执行方式在单线程中是ok的,就像买房子:
1.买了个房子
2. 装修
3. 拿到钥匙
123是精装房,开发商直接装好
132是毛坯房,得自己装修
这两种都是可以的
如果是132,在多线程下可能出现问题
这其实是双重if带来的问题
要解决上述问题,就需要引入volatile。volatile不仅能解决内存可见性问题,也能禁止针对这个变量读写操作的指令重排序的问题
很多地方都有重排序,volatile是针对某个对象的读写操作过程中不会出现重排序
阻塞队列
特点: 先进先出,线程安全,带有阻塞功能
- 队列为空,出队列操作就会阻塞到队列不空为止
- 队列为满,入队列操作就会阻塞到队列不为满为止
还有一种是消息队列,不是普通的先进先出,而是通过topic这样的参数来对数据进行归类,出队列的时候,每种topic下的数据是先进先出的,消息队列也有阻塞功能
BlockingQueue就是Java标准库提供的阻塞队列。
BlockingQueue
是个接口,可以用ArrayBlockingQueue
,LinkedBlockingQueue
,PriorityBlockingQueue
这三个任意一个创建实例
BlockingQueue
的 offer 和 poll 方法是不带阻塞的,所以我们用 put 和 take 方法(带阻塞)
阻塞过程中,如果其他线程尝试使用Interrupt来终止被阻塞的线程,此时就会抛出异常
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class Test {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
queue.put("aaa");
String elem = queue.take();
System.out.println("elem = " + elem);
elem = queue.take();
System.out.println("elem = " + elem);
}
}
简单的生产者消费者模型:
- 队列空阻塞
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class Test {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1000);
//消费者
Thread t1 = new Thread(()-> {
try {
while (true) {
Integer value = queue.take();
System.out.println("消费: " + value);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
//生产者
Thread t2 = new Thread(()-> {
try {
int count = 1;
while (true) {
queue.put(count);
System.out.println("生产: " + count);
count++;
//每隔 1s 生产一个元素
Thread.sleep(1000);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
t1.start();
t2.start();
}
}
当前代码生产速度是一秒一个,而消费速度虽然很快,但是被阻塞住,只能等生产一个消费一个
- 队列满阻塞
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class Test {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1000);
//消费者
Thread t1 = new Thread(()-> {
try {
while (true) {
Integer value = queue.take();
System.out.println("消费: " + value);
//每隔 1s 消费一个元素
Thread.sleep(1000);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
//生产者
Thread t2 = new Thread(()-> {
try {
int count = 1;
while (true) {
queue.put(count);
System.out.println("生产: " + count);
count++;
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
t1.start();
t2.start();
}
}
自己实现阻塞队列:
- 写一个普通队列
- 加上线程安全
- 引入阻塞
循环队列进行循环时有两种写法
这种效率更高,条件跳转指令速度非常快的,而且可读性更好
性能更低,求余操作涉及到除法运算,CPU计算乘除速度比计算加减(和条件跳转指令速度差不多)要慢不少
- 写一个普通队列
class MyBlockingQueue {
//创建一个循环队列, 记录两个下标,分别表示队首和队尾,还要记录元素个数区分队列满/空
private String[] elems = null;
private int head = 0;//队首下标
private int tail = 0;//队尾下标
private int size = 0;//元素个数
public MyBlockingQueue(int capacity) {
elems = new String[capacity];
}
void put(String elem) {
if(size >= elems.length) {
//队列满了,第三步处理阻塞
//...
}
elems[tail] = elem;
tail++;
if(tail >= elems.length) tail = 0;
size++;
}
String take() {
if(size == 0) {
//队列空的, 第三步处理阻塞情况
//...
}
String result = elems[head];
head++;
if(head >= elems.length) head = 0;
size--;
return result;
}
}
- 加上线程安全(synchronized和volatile)
class MyBlockingQueue {
//创建一个循环队列, 记录两个下标,分别表示队首和队尾,还要记录元素个数区分队列满/空
private String[] elems = null;
private volatile int head = 0;//队首下标
private volatile int tail = 0;//队尾下标
private volatile int size = 0;//元素个数
public MyBlockingQueue(int capacity) {
elems = new String[capacity];
}
void put(String elem) {
synchronized (this) {
if(size >= elems.length) {
//队列满了,第三步处理阻塞
//...
}
elems[tail] = elem;
tail++;
if(tail >= elems.length) tail = 0;
size++;
}
}
String take() {
synchronized (this) {
if(size == 0) {
//队列空的, 第三步处理阻塞情况
//...
}
String result = elems[head];
head++;
if(head >= elems.length) head = 0;
size--;
return result;
}
}
}
- 引入阻塞
使用wait来阻塞等待
class MyBlockingQueue {
//创建一个循环队列, 记录两个下标,分别表示队首和队尾,还要记录元素个数区分队列满/空
private String[] elems = null;
private volatile int head = 0;//队首下标
private volatile int tail = 0;//队尾下标
private volatile int size = 0;//元素个数
public MyBlockingQueue(int capacity) {
elems = new String[capacity];
}
void put(String elem) throws InterruptedException {
synchronized (this) {
if(size >= elems.length) {
this.wait();
}
elems[tail] = elem;
tail++;
if(tail >= elems.length) tail = 0;
size++;
//唤醒下面 take 中的 wait
this.notify();
}
}
String take() throws InterruptedException {
synchronized (this) {
if(size == 0) {
this.wait();
}
String result = elems[head];
head++;
if(head >= elems.length) head = 0;
size--;
//take成功一个元素,就唤醒上面 put 中的 wait
this.notify();
return result;
}
}
}
此外还有一点问题,可以看一下wait方法源码的说明文档
class MyBlockingQueue {
//创建一个循环队列, 记录两个下标,分别表示队首和队尾,还要记录元素个数区分队列满/空
private String[] elems = null;
private volatile int head = 0;//队首下标
private volatile int tail = 0;//队尾下标
private volatile int size = 0;//元素个数
public MyBlockingQueue(int capacity) {
elems = new String[capacity];
}
void put(String elem) throws InterruptedException {
synchronized (this) {
while (size >= elems.length) {
this.wait();
}
elems[tail] = elem;
tail++;
if(tail >= elems.length) tail = 0;
size++;
//唤醒下面 take 中的 wait
this.notify();
}
}
String take() throws InterruptedException {
synchronized (this) {
while (size == 0) {
this.wait();
}
String result = elems[head];
head++;
if(head >= elems.length) head = 0;
size--;
//take成功一个元素,就唤醒上面 put 中的 wait
this.notify();
return result;
}
}
}
生产者消费者模型:
class MyBlockingQueue {
//创建一个循环队列, 记录两个下标,分别表示队首和队尾,还要记录元素个数区分队列满/空
private String[] elems = null;
private volatile int head = 0;//队首下标
private volatile int tail = 0;//队尾下标
private volatile int size = 0;//元素个数
public MyBlockingQueue(int capacity) {
elems = new String[capacity];
}
void put(String elem) throws InterruptedException {
synchronized (this) {
while (size >= elems.length) {
this.wait();
}
elems[tail] = elem;
tail++;
if(tail >= elems.length) tail = 0;
size++;
//唤醒下面 take 中的 wait
this.notify();
}
}
String take() throws InterruptedException {
synchronized (this) {
while (size == 0) {
this.wait();
}
String result = elems[head];
head++;
if(head >= elems.length) head = 0;
size--;
//take成功一个元素,就唤醒上面 put 中的 wait
this.notify();
return result;
}
}
}
public class Test {
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue(1000);
Thread t1 = new Thread(()-> {
try {
int count = 1;
while (true) {
queue.put(count + "");
System.out.println("生产" + count);
count++;
Thread.sleep(1000);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
Thread t2 = new Thread(()-> {
try {
while (true) {
String result = queue.take();
System.out.println("消费" + result);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
t1.start();
t2.start();
}
}
线程池
当前频繁创建/销毁线程的开销也变的越来越明显了
优化的两种方式:
- 线程池
- 协程(纤程)
能提升效率的原因:
直接创建/销毁线程,是需要用户态+内核态配合完成的。而线程池/协程,创建/销毁只通过用户态即可,不需要内核态的配合
线程池
调用API创建/销毁线程,需要内核完成,而内核完成的工作很多时候是不可控的
如果使用线程池,提前把线程都创建好,放到用户态代码中的数据结构里,后面用的时候随时从池子里取,用完放回池子里。这个过程完全是用户态代码,不需要和内核进行交互,操作是可控的
协程
协程本质也是纯用户态操作来去规避内核操作,不是像线程池那样把线程提前创建好,而是用一个内核的线程来表示多个协程,在用户态进行协程之间的调度
标准库线程池(ThreadPoolExecutor)
构造方法的参数:
- corePoolSize: 核心线程数
- maximumPoolSize : 最大线程数(核心线程数 + 非核心线程数)
动态扩展:一个线程池,一开始被创建出来的时候,里面就会创建corePoolSize这么多的的线程。线程池会提供一个submit方法添加任务,每个任务都是一个Runnable对象。如果当前添加的任务较少,corePoolSize这么多的的线程足以处理,就不会多创建线程了; 如果添加的任务较多,corePoolSize这么多的的线程处理不过来(有很多任务在等待执行),这时线程池就会创建出新的线程来支撑更多任务,但创建出来的线程总数不超过maximumPoolSize,过一段时间任务没那么多了,多余的非核心线程就会被释放掉,至少会保证线程池中线程数目不少于corePoolSize。这样既保证任务多时的效率,也能保证任务少的时候的系统开销
设置的线程数不是固定的,这与电脑配置和程序的实际特点有关
程序的实际特点大致可以分成两类:
- CPU密集型:代码逻辑都是要通过CPU来完成的.如果代码是CPU密集型,那么线程数不应该超过CPU逻辑核心数,超过了也没什么意义,反而会因为系统调度变得更慢
- IO密集型: 代码大部分时间在等待IO(等待IO是不消耗CPU不参与调度的),如果是IO密集型的代码,瓶颈不在CPU上,每个线程只会消耗一点点核心的资源,而不会占满核心,可以有很多线程,更多的考虑其他方面提升性能(比如网卡带宽的瓶颈)
实际开发中,很多时候一个程序既包含 CPU操作,也包含 IO操作.所以我们要根据实际情况找到一个合适的线程数
- keepAliveTime表示时间数值,unit表示时间单位(秒,分,小时…)。两个组合起来就表示允许非核心线程的最长空闲时间.非核心线程在线程池不忙的时候会被回收,但不是立刻,这个就是设定的保留时间
- workQueue: 加入线程池的任务队列
- threadFactory: 标准库提供的用来创建线程的工厂类,可以把线程的属性初始化好
工厂模式,也是一种设计模式,主要是解决基于构造方法创建对象太坑了的问题,
比如有个类是坐标点,有笛卡尔坐标和极坐标两种方式
而工厂模式的核心思路是不再使用构造方法创建对象,而是给构造方法包装一层
makePointByXY和makePointByRA这样的方法就称为"工厂方法",这种写代码的套路就是"工厂模式"
如果把方法单独放在一个类中
提供方法的PointBuilder类也就是"工厂类"
- handler:此处不是句柄的意思,而是拒绝策略(其实是一个枚举类型),表示如果当前任务队列满了,仍要继续添加任务该怎么办。
标准库提供了四种拒绝策略:
ThreadPoolExecutor.AbortPolicy
:直接抛出异常,让程序猿快速知道任务处理不过来了
ThreadPoolExecutor.CallerRunsPolicy
:哪个线程添加任务,哪个线程就负责执行这个任务,线程池本身不管了
ThreadPoolExecutor.DiscardOldestPolicy
:丢弃掉最早的任务,把新的任务添加进来
ThreadPoolExecutor.DiscardPolicy
:把要新添加的任务丢弃,按照线程池里原来的任务执行
代码案例
由于标准库自己也知道 ThreadPoolExecutor
使用起来比较费劲,于是标准库自己提供了几个工厂类对于上述线程池进一步封装了
这些方法就是标准库提供的创建线程池的工厂方法
import java.util.concurrent.Executors;
public class Test {
public static void main(String[] args) {
//能够根据任务的数目,自动进行线程扩容
Executors.newCachedThreadPool();
//创建固定线程数目的线程池
Executors.newFixedThreadPool(4);
//创建只包含单个线程的线程池
Executors.newSingleThreadExecutor();
//创建一个固定线程数目,但是任务延时执行的线程池
Executors.newScheduledThreadPool(4);
}
}
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
service.submit(new Runnable() {
@Override
public void run() {
System.out.println("hello " + Thread.currentThread().getName());
}
});
}
}
当使用线程池的时候,main线程执行结束而进程没有结束,这是因为 线程池中的线程是前台线程,会阻止进程结束
这样会有变量捕获问题:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 1000; i++) {
int id = i;
service.submit(new Runnable() {
@Override
public void run() {
System.out.println("hello " + id + "," + Thread.currentThread().getName());
}
});
}
}
}
可以看到线程的数量有很多,把newCachedThreadPool改成newFixedThreadPool方法,newFixedThreadPool后面的参数就是最大的线程数
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
int id = i;
service.submit(new Runnable() {
@Override
public void run() {
System.out.println("hello " + id + "," + Thread.currentThread().getName());
}
});
}
}
}
这样线程数目就不会超过10个了
实现一个简单的线程池
线程池由下面三个部分组成:
- 有若干个线程
- 有任务队列
- 提供submit方法
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
class MyThreadPoll {
private BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000);
//初始化线程池
public MyThreadPoll(int n) {
//创建n个核心线程
for (int i = 0; i < n; i++) {
Thread t = new Thread(()-> {
try {
while (true) {
Runnable runnable = queue.take();
runnable.run();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
t.start();
}
}
//把任务添加到线程池中
void submit(Runnable runnable) throws InterruptedException {
queue.put(runnable);
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
MyThreadPoll threadPoll = new MyThreadPoll(10);
for (int i = 0; i < 1000; i++) {
int id = i;
threadPoll.submit(new Runnable() {
@Override
public void run() {
System.out.println("hello " + id + ", " + Thread.currentThread().getName());
}
});
}
}
}
加入非核心线程
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
class MyThreadPoll {
private BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000);
private int maxPoolSize = 0;//记录最大线程数
private List<Thread> threadList = new ArrayList<>();
//初始化线程池
public MyThreadPoll(int corePoolSize, int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
//创建n个核心线程
for (int i = 0; i < corePoolSize; i++) {
Thread t = new Thread(()-> {
try {
while (true) {
Runnable runnable = queue.take();
runnable.run();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
t.start();
threadList.add(t);
}
}
//把任务添加到线程池中
void submit(Runnable runnable) throws InterruptedException {
queue.put(runnable);
//此处判断当前任务队列的元素个数是否比较多
//如果队列元素比较多,说明已有线程不太处理过来了,创建新的线程
//如果队列元素不是很多,没必要创建新的线程
//这里的阈值都是随便取的数
if(queue.size() >= 500 && threadList.size() < maxPoolSize) {
//创建新的线程
Thread t = new Thread(()-> {
try {
while (true) {
Runnable task = queue.take();
task.run();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
t.start();
threadList.add(t);
}
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
MyThreadPoll threadPoll = new MyThreadPoll(10, 20);
for (int i = 0; i < 1000; i++) {
int id = i;
threadPoll.submit(new Runnable() {
@Override
public void run() {
System.out.println("hello " + id + ", " + Thread.currentThread().getName());
}
});
}
}
}
比较麻烦的是回收非核心线程,这需要引入更多的数据结构和"定时器"
定时器
定时器相当于"闹钟"的效果。指定一个任务(Runnable),并且指定一个时间,此时这个任务不会立即执行,而是在时间到了之后,再去执行
短信验证中,验证码是有时效的,可以用定时器实现这个功能。发送验证码的时候就把验证码保存起来,用定时器设定多少时间后删除保存的验证码
Java标准库的定时器:Timer
使用 schedule
方法,有两个参数
-
第一个参数TimerTask是实现了 Runnable接口的抽象类
-
第二个参数delay表示多长时间之后执行。以调用
schedule
的时刻为基准,继续等delay毫秒之后再执行
下面写一个代码是延时3秒打印:
import java.util.Timer;
import java.util.TimerTask;
public class Test {
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("hello 3000");
}
}, 3000);
}
}
打印完程序没结束也是因为Timer里面有前台线程阻止了进程结束
实现简单的定时器
需求:
- 能够延时执行任务/指定时间执行任务
- 能够管理多个任务
怎么实现:
- 定义一个类表示一个任务
- 通过一定的数据结构保存多个任务
- 需要有一个线程在指定的时间执行任务
import java.util.PriorityQueue;
class MyTimerTask implements Comparable<MyTimerTask> {
private Runnable runnable;
//此处的 time 保存的不是delay, 而是计算后的 ms 级别的时间戳
private long time;
public MyTimerTask(Runnable runnable, long delay) {
this.runnable = runnable;
//手动换算一下时间
this.time = System.currentTimeMillis() + delay;
}
public void run() {
runnable.run();
}
public long getTime() {
return time;
}
@Override
public int compareTo(MyTimerTask o) {
return (int)(this.time - o.time);
}
}
class MyTimer {
private PriorityQueue<MyTimerTask> taskQueue = new PriorityQueue<>();
public MyTimer() {
//创建一个线程负责不停地扫描堆顶元素来确定是否要执行任务
Thread t = new Thread(()-> {
try {
while (true) {
synchronized (this) {
if(taskQueue.size() == 0) this.wait();
MyTimerTask task = taskQueue.peek();
long curTime = System.currentTimeMillis();
if(curTime >= task.getTime()) {
task.run();
taskQueue.poll();
} else {
//时间没到
//此处阻塞不能用sleep,wait可以用notify正常唤醒,用Interrupt唤醒sleep会抛出异常
//而且wait睡眠的时间会释放锁
this.wait(task.getTime() - curTime);
}
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
t.start();
}
public void schedule(Runnable runnable, long delay) {
synchronized (this) {
MyTimerTask task = new MyTimerTask(runnable, delay);
taskQueue.offer(task);
//每次添加新任务都要唤醒wait,重新判定堆顶元素和新的等待时间
this.notify();
}
}
}
public class Test {
public static void main(String[] args) {
MyTimer myTimer = new MyTimer();
myTimer.schedule(new Runnable() {
@Override
public void run() {
System.out.println("hello 3000");
}
}, 3000);
myTimer.schedule(new Runnable() {
@Override
public void run() {
System.out.println("hello 2000");
}
}, 2000);
myTimer.schedule(new Runnable() {
@Override
public void run() {
System.out.println("hello 1000");
}
}, 1000);
}
}
总结:
- 创建一个类表示一个任务
- 使用堆保存任务,省去遍历的开销
- 加入执行线程
- 引入锁,把针对堆的操作都加锁了
- 解决忙等问题,引入wait和notify