什么是线程安全,造成线程安全的本质是什么?
什么是线程安全呢?
咱们初步去理解话记住一句话就行:如果一个对象可以安全地被多个线程同时使用,那它就是线程安全的。
为什么并发编程会导致线程不安全?
可见性问题
我们先看一个例子来和大家分析一下:
@Slf4j
public class TestUtil {
private static int count;
private static class Thread1 extends Thread {
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
count++;
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread1 t1 = new Thread1();
Thread1 t2 = new Thread1();
t1.start();
t2.start();
t1.join();
t2.join();
log.info("count == {}", count);
}
}
代码非常简单,我定义了一个 count 静态全局变量,线程里面有一个方法, for 循环进行自增 1000 次。在 main 方法中我们开启了两个线程执行,最后输出的结果是多少呢?
按照正常的情况,我们是需要它输出 2000 ,但是实际的情况可能和我们想的有点不同,最终输出的值大概率是小于 2000 的,而且每次输出的结果可能和我们想的也有点不同。
这个就是我们的线程安全的问题。线程安全是指在多线程环境下,程序可以始终执行正确的行为,符合预期的逻辑。比如我们刚刚的程序,共两个线程,每个线程对 count 变量累加 1000 次,预期的逻辑是 count 被累加了 2000 次,而代码执行的结果却不是,所以它是线程不安全的。
为什么是不安全的呢?这个是一个非常典型的因为可见性引发的线程安全问题,其核心的本质是因为 CPU 缓存可见性的问题导致的。要理解这个问题,我们得去了解一下 CPU 的运行原理:
在我们以往的单核时代,所有线程都在同一个 CPU 上执行, CPU 缓存与 内存数据的一致性容易解决。 因为所有线程都操作的是同一个 CPU 的缓存,一个线程对缓存的写,对另外一个线程来说一定是可见的。
线程 A 和 线程 B 都操作同一个 CPU 里面的缓存,所以线程 A 更新了 变量 V 的值,那么线程 B 之后访问变量 V 得到的一定是 V 的最新值。(线程 A 写入的值)。一个线程对共享变量的修改,另一个线程能立刻看到,称为「可见性」。
多核时代,每颗 CPU 都有自己的缓存,这时 CPU 缓存与内存的数据一致性就没那么容易解决了。当多个线程在不同 CPU 上执行时,这些线程操作的是不同的 CPU 缓存。 比如下图中,线程 A 操作的是 CPU-1 上的缓存,而线程 B 操作的是 CPU-2 上的缓存,很明显,这个 「线程 A」 对 「变量 V」 的操作对于 「线程 B」 来说就不具备可见性了。
原子性问题
由于 I/O 速度太慢,早期的操作系统就发明了「多进程」,即便在单核 CPU上我们也可以一边听歌一边做别的,给人一种这几件事并行的错觉,其实是 CPU 在以极小的时间分片进行切换,每个进程执行一个「时间片」的时间 例如 50ms , 这就是 多进程的功劳。
在一个时间片内,如果一个进程进行一个 I/O 操作,例如「读取一个文件」,这个时候进程可以把自己标记为 「休眠状态」 并 出让 CPU 的使用权,当文件读取进内存这个 I/O 操作结束后, 「操作系统」会把这个休眠的进程唤醒,唤醒后的进程就有机会重新获得 CPU 的使用权了。
这里的 「进程」 在等待 I/O 时之所以会释放 CPU 使用权,是为了让 CPU 在这段等待的时间里可以做别的事情,这样一来,CPU 的使用率就增加了。
如果这时另一个进程也在读取文件,读取文件的操作就会排队,磁盘驱动在完成一个进程的读操作后,发现有排队的任务,就会立即启动下一个读操作,这样 I/O 的使用率也增加了。
虽然这个逻辑看上去很简单,但是支持 「进程分时复用」 在操作系统的发展史上有着 「里程碑」 的意义,Unix 因为解决了这个问题而名噪天下。
「早期的操作系统」基于「进程」 来调度 CPU,不同的进程之间不共享「内存空间」,所以切换任务就需要切换进程,从而切换「内存映射地址」,而一个进程创建的所有线程,则共享同一个内存空间,所以线程之间的切换成本比进程间切换要低很多。 现代的操作系统都是基于更轻量的线程来调度,所以我们现在提到的「任务切换」 指的都是 「线程切换」。
Java 并发程序都是基于 多线程的,自然也会涉及到 「任务切换」。而同时任务切换也是 Java 并发中 Bug 的源头之一。
「任务切换」 的时机大多数是在 时间片结束的时候,我们现在使用的基本都是高级编程语言,而高级编程语言中的一行代码往往对应着多个 CPU 指令操作。
比如上面代码中的 count += 1 就对应着 3 个操作。
- 指令 1 读取:将 count 内存加载到 CPU 寄存器中
- 指令 2 修改:将 count 在寄存器中进行 +1 操作
- 指令 3 写入:将寄存器中的结果写入内存。(缓存机机制导致可能写入的是 CPU 缓存而不是 内存)
操作系统做任务切换,可以发生在任何一条 「CPU 指令」 执行完成的时候,而不是 高级编程语言中的一条语句。 ↑---【这里对原子性的解释要比 jcip 中更具体一些,我理解的原子性指的是一组高级语句中对应的所有 CPU 指令一起执行完成 才发生切换】
对于上面的三条指令来说,假设 count = 0,如果 线程 A 在 指令 1 执行完成后进行了线程切换,线程 A 和 线程 B 按照下图的顺序执行,那么我们会发现两个线程都执行了 count += 1 的操作,且 count 的值都是以 0 为起点,最终得到的结果是 1 而不是期望的 2。
我们将一个或者多个操作在 CPU 执行的过程中不被中断的特性称为 「原子性」
CPU 能保证的原子操作需要是 CPU 指令级别的,而不是高级语言的操作符。这是违背我们直觉的地方,因此很多时候我们需要在「高级语言层面」保证操作的原子性。
有序性问题
【也就是重排序问题】
并发编程中还有一个容易导致违背直觉性的诡异 BUG 的原因就是有序性
有序性指的是 程序按照代码的先后顺序执行。
但是编译器为了优化性能,有时候会改变程序中语句的先后顺序,例如 a=6,b=7 编译后可能变成了 b=7,a=6 这个顺序。 编译器调整了语句的顺序,但是不影响程序的最终结果。但是有时候编译器和解释器的优化可能导致意想不到的 BUG。
Java 中的一个经典的关于有序性问题的按理就是利用 「双重检查」 创建单例对象,例如下面的代码:
在获取实例 getInstnace() 方法中,首先判断 intance 是否为空,如果为空则锁定 Singleton.class 并再次检查 instance 是否为空,如果还未空则创建 Singleton 的一个实例。
public class Singleton {
static Singleton instance;
static Singleton getInstance() {
if (instance == null) {
synchronized (Singleton.class) {
if (instance == null) {
instance = new Singleton();
}
}
}
return instance;
}
}
假设有两个线程 A、B 同时调用 getInstance() 方法,则同时发现 instance == null , 于是同时对 Singleton.class 加锁,此时 JVM 保证只有一个线程能够加锁成功(假设是 线程 A),此时另外一个线程会处于等待状态,线程 A 创建一个 Singleton 实例,然后释放锁,锁被释放后 线程 B 被唤醒,线程 B 尝试加锁 ,本次加锁可以成功,然后线程 B 检查 instance == null 发现已经有实例被创建,所以线程 B 不会再创建一个 SIngleton 实例。
看上去很完美,但是实际上 getInstance() 方法存在漏洞, 问题出在 new 这个操作符上。
我们以为的使用 new 操作符构造对象的流程:
- 分配一块内存 M
- 在内存上初始化 Singleton 对象
- 将 M 的地址 赋值给 instance 变量
但是实际上被优化后的执行路径却是这样的:
- 分配一块内存 M
- 将 M 的地址 赋值给 instance 变量
- 在内存上初始化 Singleton 对象
两者区别就在于,优化后的情况当对象尚未初始化完成的时候就已经被赋值给栈中的引用,此时就已经可以获得该对象了。
这样将会导致:
线程 A 先执行 getInstance() 方法,当执行完指令 2 时发生了线程切换,此时线程 B 也执行了 getInstance() 方法,进入判断 instance != null 是 true,所以直接返回了 instance,而此时的 instance 因为指令重排序的原因,先将堆内存中的地址值赋值给栈中的引用,下面才对堆中对象真正进行初始化,所以此时的 instance 是一个尚未初始化完成的对象,如果这个时候直接使用的话,就可能触发 「空指针异常」。
所以这里的 B 存在的是可见性的问题,此时它看到的是一个未构建成功的 instance 实例
想写好并发程序,首先要知道并发程序的问题可能出现在哪里,是因为什么导致的。只要我们可以深刻的理解 可见性、原子性、有序性 在并发场景下的原理,就可以对 Bug 进行比较准确的诊断。缓存导致的可见性问题,线程切换带来的原子性问题,编译优化导致的有序性问题。这些手段的目的都是为了提高性能,但是技术在解决一个问题的同事必然会带来新的问题。
Java 创建线程的方式?
继承 Thread 类
通过继承 Thread 类,并重写它的 run 方法,我们就可以创建一个线程。
- 首先定义一个类来继承 Thread 类,重写 run 方法。
- 然后创建这个子类对象,并调用 start 方法启动线程。
public class ThreadA extends Thread {
@Override
public void run() {
log.info("这是一个新线程。。。");
}
public static void main(String[] args) {
new ThreadA().start();
}
}
实现 Runnable 接口
通过实现 Runnable ,并实现 run 方法,也可以创建一个线程。
- 首先定义一个类实现 Runnable 接口,并实现 run 方法。
- 然后创建 Runnable 实现类对象,并把它作为 target 传入 Thread 的构造函数中
- 最后调用 start 方法启动线程
public class ThreadB implements Runnable {
@Override
public void run() {
log.info("这是一个新线程。。。");
}
public static void main(String[] args) {
new Thread(new ThreadB()).start();
}
}
实现 Callable 接口,并结合 Future 实现
- 首先定义一个 Callable 的实现类,并实现 call 方法。call 方法是带返回值的。
- 然后通过 FutureTask 的构造方法,把这个 Callable 实现类传进去。
- 把 FutureTask 作为 Thread 类的 target ,创建 Thread 线程对象。
- 通过 FutureTask 的 get 方法获取线程的执行结果。
public class ThreadC implements Callable<String> {
@Override
public String call() throws Exception {
return "hello callable";
}
}
@Slf4j
public class FutureA {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> task = new FutureTask<>(new ThreadC());
new Thread(task).start();
String message = task.get();
log.info("message == {}", message);
}
}
使用 JDK 自带的 Executors 来创建线程池对象
- 首先,定一个 Runnable 的实现类,重写 run 方法
- 然后创建一个拥有固定线程数的线程池
- 最后通过 ExecutorService 对象的 execute 方法传入线程对象
@Slf4j
public class ThreadD implements Runnable {
@Override
public void run() {
log.info("{} 线程 run...", Thread.currentThread().getName());
}
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executor.execute(new ThreadD());
}
executor.shutdown();
}
}
上面是介绍了 4 种创建线程的方式,那真的就只有四种吗?我在网上看到一些说有 5 种的,有 8 种的。我用 Spring 的方式也可以创建线程、用 Timer 创建定时任务也可以创建线程。其实这都不算错,我们在回答这道面试题时,说出几种其实不算出彩,说明我们还没有抓出创建线程的根本原理。
其实只有一种方式,本质上不论是那种方式,最终都是调用了 Thread.start() 方法或是我们的 run() 方法。而 start()方法最终调用的还是我们的 run() 方法。
/**
* If this thread was constructed using a separate
* <code>Runnable</code> run object, then that
* <code>Runnable</code> object's <code>run</code> method is called;
* otherwise, this method does nothing and returns.
* <p>
* Subclasses of <code>Thread</code> should override this method.
*
* @see #start()
* @see #stop()
* @see #Thread(ThreadGroup, Runnable, String)
*/
@Override
public void run() {
if (target != null) {
target.run();
}
}
所以在这个地方不论说几种其实都是可以的,重要的是要说出你的依据,讲出他们的共同点和区别。
说说线程的生命周期和状态?
线程在生命周期中并不是固定处于某一个状态而是随着代码的执行在不同状态之间切换。
Java 线程在运行的生命周期中的指定时刻只可能处于下面 6 种不同状态的其中一个状态:
- NEW: 初始状态,线程被创建出来但没有被调用 start() 。
- RUNNABLE: 运行状态,线程被调用了 start()等待运行的状态。
- BLOCKED :阻塞状态,需要等待锁释放。
- WAITING:等待状态,表示该线程需要等待其他线程做出一些特定动作(通知或中断)。
- TIME_WAITING:超时等待状态,可以在指定的时间后自行返回而不是像 WAITING 那样一直等待。
- TERMINATED:终止状态,表示该线程已经运行完毕。
由上图可以看出:线程创建之后它将处于 NEW(新建) 状态,调用 start() 方法后开始运行,线程这时候处于 READY(可运行) 状态。可运行状态的线程获得了 CPU 时间片(timeslice)后就处于 RUNNING(运行) 状态。
在操作系统层面,线程有 READY 和 RUNNING 状态;而在 JVM 层面,只能看到 RUNNABLE 状态,所以 Java 系统一般将这两个状态统称为 RUNNABLE(运行中) 状态 。
为什么 JVM 没有区分这两种状态呢? (摘自:Java 线程运行怎么有第六种状态? - Dawell 的回答 open in new window ) 现在的时分(time-sharing)多任务(multi-task)操作系统架构通常都是用所谓的“时间分片(time quantum or time slice)”方式进行抢占式(preemptive)轮转调度(round-robin 式)。这个时间分片通常是很小的,一个线程一次最多只能在 CPU 上运行比如 10-20ms 的时间(此时处于 running 状态),也即大概只有 0.01 秒这一量级,时间片用后就要被切换下来放入调度队列的末尾等待再次调度。(也即回到 ready 状态)。线程切换的如此之快,区分这两种状态就没什么意义了。
当线程执行 wait() 方法之后,线程进入 WAITING(等待) 状态。进入等待状态的线程需要依靠其他线程的通知才能够返回到运行状态。TIMED_WAITING(超时等待) 状态相当于在等待状态的基础上增加了超时限制,比如通过 sleep(long millis)方法或 wait(long millis)方法可以将线程置于 TIMED_WAITING 状态。当超时时间结束后,线程将会返回到 RUNNABLE 状态。当线程进入 synchronized 方法/块或者调用 wait 后(被 notify)重新进入 synchronized 方法/块,但是锁被其它线程占有,这个时候线程就会进入 BLOCKED(阻塞) 状态。线程在执行完了 run()方法之后将会进入到 TERMINATED(终止) 状态。
为什么阿里禁止使用 Executors 来创建线程?
阿里开发手册关于线程池有这样一条规定:线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
简单来说:Executors 会导致资源耗尽
ExecutorService executor = Executors.newSingleThreadExecutor();
我们直接看这张图是不是就可以看到 Executors 实例化的几种线程池的方案了对吧。手册里面直接规定不让用,那说明这几种是不是都存在导致资源耗尽的风险呢?我们一个一个来分析一下:
CachedThreadPool
Java 中 Executor 框架中的一种线程池类型,它在需要时会创建新线程来执行任务,并在任务完成后将其闲置的线程保留在池中,以备下次使用。该线程池可以根据需要动态地调整线程数量,因此适合于执行大量短期异步任务的应用程序。
CachedThreadPool 线程池的主要特点是:
- 当需要执行新任务时,如果当前可用线程数量小于 corePoolSize,则创建新线程来执行任务。
- 如果当前可用线程数量等于或大于 corePoolSize,则将任务放入工作队列中等待可用线程执行。
- 如果工作队列已满,将创建一个新线程来执行任务,即使当前已有许多线程处于空闲状态。
- 当线程空闲时间超过指定的 keepAliveTime 时,线程将被终止并从线程池中移除,以便释放系统资源。
总之,CachedThreadPool 线程池的特点是根据需要动态地创建线程,适合于执行大量短期异步任务,但如果任务执行时间过长,则可能会创建过多的线程,从而导致系统资源的浪费。
源码解读:
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
里面调用的是 ThreadPoolExecutor 这个类来创建的实例,我们发现这个类是不是很熟悉,阿里规范里面提到的建议就是用 ThreadPoolExecutor 来创建线程池对吧,那为什么不建议我们用封装好的呢?我们仔细的来看看里面的几个入参:
字段名称 | 含义 |
corePoolSize | 线程池的基本大小。在没有任务执行时,线程池的大小为 corePoolSize。如果有任务提交,就会创建线程来处理任务,直到线程池的数量达到 corePoolSize |
maximumPoolSize | 线程池允许的最大线程数。当线程池中的线程数目达到 corePoolSize 时,新任务将被放入队列中,如果队列满了,那么就会创建新的线程来处理任务,直到线程数量达到 maximumPoolSize。 |
keepAliveTime | 线程池中超过 corePoolSize 线程的空闲时间的存活时间。也就是说,如果线程池中的线程数量大于 corePoolSize,那么这些多出来的线程在空闲了 keepAliveTime 后会被销毁,直到线程池中的线程数目不超过 corePoolSize。 |
unit | keepAliveTime 的时间单位。 |
workQueue | 任务队列。用于保存等待执行的任务的阻塞队列。常用的有 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue。 |
threadFactory | 线程工厂,用于创建新线程。 |
handler | 拒绝策略,用于当线程池中的资源已经全部使用,且队列已满的情况下,如何拒绝新的任务。常用的有 AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy、DiscardPolicy。 |
FixedThreadPool
FixedThreadPool 是一种固定大小的线程池,它的特点是创建一个固定大小的线程池,当有新任务到来时,线程池中有空闲线程则立即使用空闲线程执行任务,如果没有空闲线程,则新任务会被暂时放入任务队列中等待有空闲线程时执行。
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
与 CachedThreadPool 相比,FixedThreadPool 的线程数量是固定的,不会根据任务数量的变化而动态调整。这意味着,当任务数量超过线程池大小时,多出的任务会被放入队列中等待有空闲线程时执行,因此 FixedThreadPool 可能会在任务数量较多时出现队列溢出的问题,也可能因为线程数较少而导致处理任务的速度较慢。但是,当任务数量较少时, FixedThreadPool 由于不需要频繁创建和销毁线程,相比 CachedThreadPool 可以提供更好的性能和稳定性。因此,FixedThreadPool 适用于任务数量比较稳定且需要限制线程数的情况,例如并发请求量不高的网络服务器或者后台任务处理等场景。
SingleThreadExecutor
SingleThreadExecutor 是 Java 中的一种线程池实现。它的特点是只创建一个单独的工作线程来处理任务。当任务被提交到线程池时,如果当前没有运行的线程,则会创建一个新的线程来处理该任务。如果当前有一个线程在处理任务,新提交的任务会被加入到队列中等待被执行。该线程池可以保证任务被顺序执行,而且任意时刻只有一个线程在工作,因此也避免了线程安全的问题。
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
SingleThreadExecutor 可以看做是一种特殊的 FixedThreadPool,它的线程数量为 1。由于它只创建一个线程,因此它可以避免多线程带来的竞争问题和死锁问题。但是,因为它只有一个线程在工作,因此处理速度较慢,不适合处理大量的任务。它适用于需要顺序执行任务的场景,例如顺序执行文件下载任务、顺序处理消息等场景。同时,由于只有一个线程在工作,也适合用于需要保证数据一致性或避免竞态条件的场景。
对 ThreadLocal 的理解
使用场景
使用场景一:代替参数的显式传递
使用场景二:全局存储用户信息(项目中用到)
使用场景三:解决线程安全问题
原理
ThreadLocal 的重要属性:
// 当前 ThreadLocal 的 hashCode,由 nextHashCode() 计算而来,用于 在ThreadLocal 在 ThreadLocalMap 中的索引位置
private final int threadLocalHashCode = nextHashCode();
// 哈希魔法,狐妖与斐波那契散列法以及黄金分割有关
private static final int HASH_INCREMENT = 0x61c88647;
// 返回计算出的下一个哈希值,其值 i * HASH_INCREMENT,其中 i 代表调用次数
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
// 保证了在一台机器中每个 ThreadLocal 的 threadLocalHashCode 是唯一的
private static AtomicInteger nextHashCode = new AtomicInteger();
其中的 HASH_INCREMENT 也不是随便取的,它转化为十进制是 1640531527,2654435769 转换成 int 类型就是 -1640531527,2654435769 等于 (√5-1)/2 乘以 2 的 32 次方。(√5-1)/2 就是黄金分割数,近似为 0.618,也就是说 0x61c88647 理解为一个黄金分割数乘以 2 的 32 次方,它可以保证 nextHashCode 生成的哈希值,均匀的分布在 2 的幂次方上,且小于 2 的 32 次方。
下面是 javaspecialists 中一篇文章对它的介绍:
This number represents the golden ratio (sqrt(5)-1) times two to the power of 31 ((sqrt(5)-1) * (2^31)). The result is then a golden number, either 2654435769 or -1640531527.
下面用例子来证明下:
public class Test {
private static final int HASH_INCREMENT = 0x61c88647;
public static void main(String[] args) {
int n = 5;
int max = 2 << (n -1);
for (int i = 0; i < max; i++) {
System.out.print(i * HASH_INCREMENT & (max - 1));
System.out.print(" ");
}
}
}
运行结果为:0 7 14 21 28 3 10 17 24 31 6 13 20 27 2 9 16 23 30 5 12 19 26 1 8 15 22 29 4 11 18 25 , 可以发现元素索引值完美的散列在数组当中,并没有出现冲突。
ThreadLocalMap
除了上述属性外,还有一个重要的属性 ThreadLocalMap,ThreadLocalMap 是 ThreadLocal 的静态内部类,当一个线程有多个 ThreadLocal 时,需要一个容器来管理多个 ThreadLocal,ThreadLocalMap 的作用就是管理线程中多个 ThreadLocal,源码如下:
static class ThreadLocalMap {
/**
* 键值对实体的存储结构
*/
static class Entry extends WeakReference<ThreadLocal<?>> {
// 当前线程关联的 value,这个 value 闭关没有弱引用追踪
Object value;
/**
* 构造键值对
* @param k k 作 key,作为 key 的 ThreadLocal 会被包装为一个弱引用
* @param v v 作 value
*/
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
// 初始容量,必须为 2 的幂
private static final int INITIAL_CAPACITY = 16;
// 存储 ThreadLocal 的键值对实体数组,长度必须为 2 的幂
private Entry[] table;
// ThreadLocalMap 元素数量
private int size = 0;
// 扩容的阈值,默认是数组大小的三分之二
private int threshold;
}
从源码中看到 ThreadLocalMap 其实就是一个简单的 Map 结构,底层是数组,有初始化大小,也有扩容阈值大小,数组的元素是 Entry,Entry 的 key 就是 ThreadLocal 的引用,value 是 ThreadLocal 的值。ThreadLocalMap 解决 hash 冲突的方式采用的是线性探测法,如果发生冲突会继续寻找下一个空的位置。
这样的就有可能会发生内存泄漏的问题,下面让我们进行分析:
ThreadLocal 内存泄漏
ThreadLocal 在没有外部强引用时,发生 GC 时会被回收,那么 ThreadLocalMap 中保存的 key 值就变成了 null,而 Entry 又被 threadLocalMap 对象引用,threadLocalMap 对象又被 Thread 对象所引用,那么当 Thread 一直不终结的话,value 对象就会一直存在于内存中,也就导致了内存泄漏,直至 Thread 被销毁后,才会被回收。
那么如何避免内存泄漏呢?
在使用完 ThreadLocal 变量后,需要我们手动 remove 掉,防止 ThreadLocalMap 中 Entry 一直保持对 value 的强引用,导致 value 不能被回收,其中 remove 源码如下所示:
/**
* 清理当前 ThreadLocal 对象关联的键值对
*/
public void remove() {
// 返回当前线程持有的 map
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null) {
// 从 map 中清理当前 ThreadLocal 对象关联的键值对
m.remove(this);
}
}
remove 方法的时序图如下所示:
remove 方法是先获取到当前线程的 ThreadLocalMap,并且调用了它的 remove 方法,从 map 中清理当前 ThreadLocal 对象关联的键值对,这样 value 就可以被 GC 回收了。
那么 ThreadLocal 是如何实现线程隔离的呢?
ThreadLocal 的 set 方法
ThreadLocal 的 set 方法,源码如下:
/**
* 为当前 ThreadLocal 对象关联 value 值
* @param value 要存储在此线程的线程副本的值
*/
public void set(T value) {
// 返回ThreadLocal当前所在的线程
Thread t = Thread.currentThread();
// 返回当前线程持有的map
ThreadLocalMap map = getMap(t);
if (map != null) {
// 如果 ThreadLocal 不为空,则直接存储<ThreadLocal, T>键值对
map.set(this, value);
} else {
// 否则,需要为当前线程初始化ThreadLocalMap , 并存储键值对<this, firstValue>
createMap(t, value);
}
}
set 方法的作用是把我们想要存储的 value 给保存进去。set 方法的流程主要是:
先获取到当前线程的引用
- 利用这个引用来获取到 ThreadLocalMap
- 如果 map 为空,则去创建一个 ThreadLocalMap
- 如果 map 不为空,就利用 ThreadLocalMap 的 set 方法将 value 添加到 map 中
set 方法的时序图如下所示:
其中 map 就是我们上面讲到的 ThreadLocalMap,可以看到它是通过当前线程对象获取到的 ThreadLocalMap,接下来我们看 getMap 方法的源代码:
/**
* 返回当前线程 thread 持有的 ThreadLocalMap
* @param t 当前线程
* @return ThreadLocalMap
*/
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
getMap 方法的作用主要是获取当前线程内的 ThreadLocalMap 对象,原来这个 ThreadLocalMap 是线程的一个属性,下面让我们看看 Thread 中的相关代码:
/**
* ThreadLocal 的 ThreadLocalMap 是线程的一个属性,所以在多线程环境下 threadLocals 是线程安全的
*/
ThreadLocal.ThreadLocalMap threadLocals = null;
可以看出每个线程都有 ThreadLocalMap 对象,被命名为 threadLocals,默认为 null,所以每个线程的 ThreadLocals 都是隔离独享的。
调用 ThreadLocalMap.set() 时,会把当前 threadLocal 对象作为 key,想要保存的对象作为 value,存入 map。
其中 ThreadLocalMap.set() 的源码如下:
/**
* 在 map 中存储键值对<key, value>
*
* @param key threadLocal
* @param value 要设置 value 的值
*/
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
// 计算 key 在数组中的下标
int i = key.threadLocalHashCode & (len-1);
// 遍历一段连续的元素,以查找匹配的 对象 ThreadLocal
for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
// 获取该哈希值处的ThreadLocal对象
ThreadLocal<?> k = e.get();
// 键值ThreadLocal匹配,直接更改map中的value
if (k == key) {
e.value = value;
return;
}
// 若 key 是 null,说明 ThreadLocal 被清理了,直接替换掉
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
// 直到遇见了空槽也没找到匹配的ThreadLocal对象,那么在此空槽处安排ThreadLocal对象和缓存value
tab[i] = new ThreadLocal.ThreadLocalMap.Entry(key, value);
int sz = ++size;
// 如果没有元素被清理,那么就要检查当前元素数量是否超过了容量阈值(数组大小的三分之二),以便决定是否扩容
if (!cleanSomeSlots(i, sz) && sz >= threshold)
// 扩容的过程也是对所有的 key 重新哈希的过程
rehash();
}
到这里,应该对 Thread、ThreadLocal 以及 ThreadLocalMap 的关系有了进一步的理解,下图为三者之间的关系:
ThreadLocal 的 get 方法
ThreadLocal 的 get 方法,源码如下:
/**
* 返回当前 ThreadLocal 对象关联的值
* @return
*/
public T get() {
// 返回当前 ThreadLocal 所在的线程
Thread t = Thread.currentThread();
// 从线程中拿到 ThreadLocalMap
ThreadLocalMap map = getMap(t);
if (map != null) {
// 从 map 中拿到 entry
ThreadLocalMap.Entry e = map.getEntry(this);
// 如果不为空,读取当前 ThreadLocal 中保存的值
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
// 若 map 为空,则对当前线程的 ThreadLocal 进行初始化,最后返回当前的 ThreadLocal 对象关联的初值,即 value
return setInitialValue();
}
get 方法的主要流程为:
- 先获取到当前线程的引用
- 获取当前线程内部的 ThreadLocalMap
- 如果 map 存在,则获取当前 ThreadLocal 对应的 value 值
- 如果 map 不存在或者找不到 value 值,则调用 setInitialValue() 进行初始化
get 方法的时序图如下所示:
其中每个 Thread 的 ThreadLocalMap 以 threadLocal 作为 key,保存自己线程的 value 副本,也就是保存在每个线程中,并没有保存在 ThreadLocal 对象中。
/**
* 返回 key 关联的键值对实体
* @param key threadLocal
* @return
*/
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
// 若 e 不为空,并且 e 的 ThreadLocal 的内存地址和 key 相同,直接返回
if (e != null && e.get() == key)
return e;
else
// 从 i 开始向后遍历找到键值对实体
return getEntryAfterMiss(key, i, e);
}
ThreadLocalMap 的 resize 方法
当 ThreadLocalMap 中的 ThreadLocal 的个数超过容量阈值时,ThreadLocalMap 就要开始扩容了,源码如下:
/**
* 扩容,重新计算索引,标记垃圾值,方便 GC 回收
*/
private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
int newLen = oldLen * 2;
// 新建一个数组,安扎2倍长度扩容
Entry[] newTab = new Entry[newLen];
int count = 0;
// 将旧数组的值拷贝到新数组上
for (int j = 0; j < oldLen; ++j) {
Entry e = oldTab[j];
if (e != null) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null; // Help the GC
} else {
// 计算 ThreadLocal
int h = k.threadLocalHashCode & (newLen - 1);
// 如果发生冲突,使用线性探测往后寻找合适的位置
while (newTab[h] != null)
h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}
// 设置新的扩容阈值,为数组长度的三分之二
setThreshold(newLen);
size = count;
table = newTab;
}
resize 方法主要是进行扩容,同时会将垃圾值标记方便 GC 回收,扩容后数组大小是原来数组的两倍