文章目录
- 线程池
- 一、线程池概述
- 1、什么是线程池?
- 2、为什么需要线程池?
- 3、线程池的优势
- 4、基本原理
- 二、线程池相关接口与方法
- 1、Executor
- 2、ExecutorService
- 3、ScheduledExecutorService
- 4、Runnable & Callable
- 5、Future & FutureTask
- 6、execute & submit
- 7、shutdown & shutdownNow
- 8、awaitTermination
- 9、isShutDown & isTerminated
- 三、ThreadPoolExecutor
- 1、构造方法与核心参数
- 2、拒接策略
- 3、线程池的 5 种状态
- 4、执行原理
- 5、源码分析
- 1)execute
- 2)addWorker
- 3)runWorker
- 4)getTask
- 6、使用示例
- 四、Executors 工具类(了解)
- 1、newFixedThreadPool
- 2、newSingleThreadExecutor
- 3、newCachedThreadPool
- 4、newScheduledThreadPool
- 五、线程池最佳实践
- 1、正确声明线程池
- 2、线程数量 怎么配置
- 3、阻塞队列 怎么选择
- 4、线程工厂 怎么配置
- 1)默认的线程工厂
- 2)自定义线程工厂
- 5、拒绝策略 怎么配置
- 6、Spring 线程池
- 8、不同业务使用不同的线程池
- 7、线程池 和 ThreadLocal 共用的坑
线程池
一、线程池概述
1、什么是线程池?
线程池其实是一种池化的技术实现,池化技术的核心思想就是实现资源的复用,避免资源的重复创建和销毁带来的性能开销。线程池可以管理一堆线程,让线程执行完任务之后不进行销毁,而是继续去处理其它线程已经提交的任务。
2、为什么需要线程池?
程序启动一个新线程成本是比较高的,因为它涉及到要与操作系统进行交互。
- 启动线程的时候,会在内存中开辟一块空间,消耗系统资源。
- 销毁线程的时候,首先要把和线程相关东西进行销毁,还要把资源还给操作系统。
因此,如果并发的线程数量很多,这样频繁创建单个线程就会大大降低系统的效率。
但是,如果一个任务的时间非常长,就没必要用线程池了,因为不能控制线程池中线程的开始、挂起、和中止。(不是不能,是不宜)
3、线程池的优势
合理利用线程池能够带来三个好处:
-
降低资源消耗。
减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。
-
提高响应速度。
当任务到达时,不需要等待线程创建,可以立即执行。
-
提高线程的可管理性。
使用线程池可以进行统一分配、调优和监控。根据系统的承受能力,调整线程池中工作线程的数目。防止消耗过多的系统资源,降低系统的稳定性。(线程开的越多,消耗的内存也就越大,最后导致死机或宕机)
4、基本原理
先了解几个核心概念:
- ThreadPool(线程池):
- 线程池是整个线程管理的核心,负责创建、管理和调度线程的执行。
- 线程池通常包含一组线程,这些线程被称为工作线程(
Worker
)。 - 线程池维护着一个任务队列(
workQueue
),用于存储待执行的任务。
- Worker(工作线程):
Worker
是线程池中的工作线程,它负责执行任务队列中的任务。- 每个
Worker
线程从任务队列中获取任务,并执行任务的run()
方法。 - 当没有任务可执行时,
Worker
线程会阻塞等待,直到任务队列中有新的任务被提交。
- workQueue(任务队列):
- 任务队列是用于存储待执行的任务的缓冲区。
- 当有任务被提交到线程池时,线程池会将任务放入任务队列中等待执行。
- Task(任务):
- 任务是需要线程池执行的工作单元。
- 任务通常实现了
Runnable
接口,表示没有返回值的任务;或者实现了Callable
接口,表示有返回值的任务。 - 每个任务被执行时,会调用其
run()
方法或call()
方法来执行具体的逻辑。
线程池的整体流程如下:
- 当有任务被提交到线程池时,线程池会将任务放入任务队列中。
Worker
线程从任务队列中获取任务,并执行任务的run()
方法或call()
方法。- 执行完任务后,
Worker
线程继续循环从任务队列中获取下一个任务执行。 - 如果任务队列为空,
Worker
线程会阻塞等待,直到有新的任务被提交到任务队列中。 - 如果任务队列已满(对于有界队列),新提交的任务可能会被拒绝或者阻塞,直到队列有空闲位置。
线程池负责管理线程的创建和销毁、工作线程的执行、任务队列的调度等工作,从而实现了任务的并发执行和资源的有效利用。
二、线程池相关接口与方法
从JDK1.5开始,为了把工作单元与执行机制分离开,Executor框架诞生了,这是异步执行任务的线程池框架。
关键类或接口 | 含义 |
---|---|
Executor | 线程池的顶级接口,提供 execute (Runnable command) 方法 |
ExecutorService | Executor的子接口,提供 submit、invoke、shutdown 等方法 |
ScheduledExecutorService | ExecutorService的子接口,提供延迟/定期指定任务的方法,比Timer更灵活 |
ThreadPoolExecutor | 线程池的核心实现类,用来创建线程池执行被提交的任务 |
ForkJoinPool | 通过分而治之的思想来并行执行任务 |
ScheduledThreadPoolExecutor | 可以执行延迟任务和周期性任务,是一个定时执行任务的线程池。 |
相关类或接口 | 含义 |
---|---|
Runnable接口 和 Callable接口 及它们的实现类 | 线程要执行的任务,Runnable无返回值,Callable有返回值 |
Future 和 FutureTask | 代表异步计算的结果 |
1、Executor
Executor
是线程池的顶级接口,定义了一个单一方法 execute(Runnable command)
,用于执行给定的任务。
public interface Executor {
void execute(Runnable command);
}
2、ExecutorService
ExecutorService
是 Executor
的子接口,扩展了 Executor
,提供 submit
、invoke
、shutdown
等方法
public interface ExecutorService extends Executor {
// 平缓关闭线程池
void shutdown();
// 立即关闭线程池
List<Runnable> shutdownNow();
// 判断线程池是否已经调用了 shutdown() 方法进行关闭。
boolean isShutdown();
// 判断线程池中的所有任务是否都已经执行完毕,并且线程池已经完全终止。
boolean isTerminated();
// 阻塞一段时间。返回值:是否在指定时间内 线程池中的所有任务都已经执行完毕,并且线程池已经完全终止。
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// 提交单个任务(支持Runnable和Callable两种任务)
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
// 提交多个Callable任务
// 将所有任务提交到线程池中并等待它们执行完毕。
// 返回所有任务的执行结果。当所有任务都执行完毕后,调用线程将继续执行。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException;
// 提交多个Callable任务
// 将所有任务提交到线程池中并等待任意一个任务执行完毕。
// 返回第一个执行完毕的任务的执行结果,并且取消其他任务的执行
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
3、ScheduledExecutorService
ScheduledExecutorService
是 ExecutorService
的子接口,它提供了延迟或周期性执行任务的功能。
public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
4、Runnable & Callable
@FunctionalInterface
public interface Runnable {
/**
* 被线程执行,没有返回值也无法抛出异常
*/
public abstract void run();
}
@FunctionalInterface
public interface Callable<V> {
/**
* 计算结果,或在无法这样做时抛出异常。
*
* @return 计算得出的结果
* @throws 如果无法计算结果,则抛出异常
*/
V call() throws Exception;
}
5、Future & FutureTask
FutureTask
是RunnableFuture
的实现类
public class FutureTask<V> implements RunnableFuture<V> {
public FutureTask(Callable<V> callable) {}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
}
RunnableFuture
是Runnable
和Future
的子接口
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
Future
接口:用来记录线程任务执行完毕后产生的结果。
public interface Future<V> {
// 获取结果(一般获取Callable任务的结果)
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
}
6、execute & submit
execute
是Executor接口的方法;submit
是ExecutorService的方法,ExecutorService接口继承了Executor接口execute
只能提交Runnable类型的任务;submit
既能提交Runnable类型任务也能提交Callable类型任务。execute
没有返回值;submit
会返回Future对象,通过Future的get方法获取返回值。execute
会直接抛出任务执行时的异常;submit
会存下异常,只有当调用Future的get方法才会将任务执行时的异常重新抛出。
看一下 AbstractExecutorService
中 submit
的实现:
public abstract class AbstractExecutorService implements ExecutorService {
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
}
可以看到:
submit
方法中,首先会判断入参是否为null,如果是就抛出空指针异常,显然线程池不允许提交null任务。submit
会将Runnable
或Callable
入参包装成RunnableFuture
对象,调用execute
方法执行并返回。
7、shutdown & shutdownNow
ExecutorService
接口提供了两个方法,用于关闭线程池。
-
shutdown
:平缓关闭线程池,线程池的状态变为SHUTDOWN
。线程池允许已经提交的任务(包括正在执行和待执行的任务)执行完毕,然后关闭线程池。
-
shutdownNow
:立即关闭线程池,线程的状态变为STOP
。线程池会终止正在执行的任务,然后关闭线程池,返回队列中未执行的任务列表。
注意:调用shutdown()
和shutdownNow()
之后,都不再接受新的任务
- 此时调用
execute()
方法提交新任务会抛出RejectedExecutionException
异常。
8、awaitTermination
awaitTermination
会阻塞调用线程,直到满足以下任一条件:
- 线程池终止,并且所有任务都已经执行完毕(返回true)
- 超过了指定的等待时间(返回false)
- 当前线程被中断
如果在指定的等待时间内线程池终止,则 awaitTermination()
方法返回 true
,否则返回 false
。
public interface ExecutorService extends Executor {
// 阻塞一段时间。返回值:是否在指定时间内 线程池中的所有任务都已经执行完毕,并且线程池已经完全终止。
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
}
一般 shutdown
会配合 awaitTermination
来使用
public class ShutdownDemo {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(5);
executor.submit(() -> {
// 执行任务
for (int i = 0; i < 5; i++) {
try {
TimeUnit.SECONDS.sleep(i);
System.out.println(i + "执行完毕");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
executor.shutdown(); // 关闭线程池
System.out.println("execute shutdown");
try {
if (!executor.awaitTermination(3, TimeUnit.SECONDS)) {
// 超时处理
System.out.println("线程池未在指定时间内终止");
}
} catch (InterruptedException e) {
// Todo Handle Exception
}
}
}
9、isShutDown & isTerminated
ExecutorService
接口提供了两个方法,用于获取线程池是否关闭。
isShutDown
:当调用shutdown()
方法后返回为 true。isTerminated
:当调用shutdown()
方法后,并且所有提交的任务完成后返回为 true
三、ThreadPoolExecutor
ThreadPoolExecutor
是线程池的核心实现类,Java 主要是通过构建 ThreadPoolExecutor
来创建线程池的。
1、构造方法与核心参数
先看一下 ThreadPoolExecutor
的构造方法,其中有7个核心参数
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 空闲线程存活时间
TimeUnit unit, // 存活时间的单位
BlockingQueue<Runnable> workQueue, // 阻塞队列 - 任务队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler) // 拒绝策略
{
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
1. 核心线程数 - corePoolSize
线程池中常驻的核心线程数。
2. 最大线程数 - maximumPoolSize
线程池允许创建的最大线程数,可以大于核心线程数。
3. 空闲线程存活时间 - keepAliveTime
任务队列没有任务了 且 工作线程数 > 核心线程数 时,
会销毁空闲时间达到keepAliveTime的临时线程,直到只剩下 corePoolSize 个线程为止。
4. 存活时间的单位 - unit
keepAliveTime的时间单位
5. 任务队列 - workQueue
阻塞队列,当线程数达到核心线程数后,会将任务存储在阻塞队列中。
6. 线程工厂 - threadFactory
生成线程池中工作线程的线程工厂(一般使用默认,即不配置)
7. 拒绝策略 - handler
当线程池关闭或饱和时,如何来拒绝请求执行的runnable的策略handler,
关闭:执行shutdown()方法关闭线程池
饱和:线程数达到maximumPoolSize 且 任务队列已满
2、拒接策略
当 线程数已达到最大线程数 并且 等待队列也已经排满了,我们就需要拒绝策略机制合理的处理这个时候过来的请求。
jdk自带的四种拒接策略:(ThreadPoolExecutor中的静态内部类)
-
AbortPolicy 中止策略(默认)
直接抛出 RejectedExecutionException 异常,阻止系统正常运行。(比较关键的业务,可以及时发现问题)
-
DiscardPolicy 丢弃策略
默默地丢弃无法处理的任务,不触发任何动作,就是一个空实现。(允许任务丢失时,可以使用)
-
DiscardOldestPolicy 丢弃最旧策略
丢弃队列中等待最久的任务,然后把当前任务加人队列中,尝试再次提交当前任务。(喜新厌旧)
-
CallerRunsPolicy 调用者运行策略
一种调节机制,既不会抛出异常,也不会丢弃任务,而是将任务退回给调用线程处理(如main主线程)。
一般在不允许失败的、对性能要求不高、并发量较小的场景下使用。(性能和效率较低)
可以实现 RejectedExecutionHandle 接口,自定义拒绝策略
- 自定义拒绝策略可以根据具体的业务需求,例如记录日志、将任务放入消息队列/缓存/数据库等。
3、线程池的 5 种状态
线程池内部有 5 个常量来代表线程池的五种状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
线程池状态具体是存在 ctl
成员变量中的,ctl
中不仅存储了线程池的状态还存储了当前线程池中线程数的大小
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { return rs | wc; }
RUNNING
:能够接收新任务,以及对已添加的任务进行处理。SHUTDOWN
:线程池不再接收新任务,但是允许已经提交的任务(包括正在执行和待执行的任务)执行完毕。STOP
:线程池不再接收新任务,也不再处理队列中已经添加的任务,并且会尝试中断正在处理的任务的线程。TIDYING
:SHUTDOWN
和STOP
的收尾工作完成,执行完terminated()
方法就会转变为TERMINATED
状态。TERMINATED
:线程池彻底终止。
4、执行原理
5、源码分析
1)execute
看一下 ThreadPoolExecutor
中 execute
方法的实现
public class ThreadPoolExecutor extends AbstractExecutorService {
public void execute(Runnable command) {
// 首先检查提交的任务是否为null,是的话则抛出NullPointerException。
if (command == null)
throw new NullPointerException();
// 获取线程池的当前状态(ctl是一个AtomicInteger,其中包含了线程池状态和工作线程数)
int c = ctl.get();
// 1. 检查当前运行的工作线程数是否少于核心线程数(corePoolSize)
if (workerCountOf(c) < corePoolSize) {
// 如果少于核心线程数,尝试添加一个新的工作线程来执行提交的任务
// addWorker方法会检查线程池状态和工作线程数,并决定是否真的添加新线程
if (addWorker(command, true))
return;
// 重新获取线程池的状态,因为在尝试添加线程的过程中线程池的状态可能已经发生变化
c = ctl.get();
}
// 2. 尝试将任务添加到任务队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 双重检查线程池的状态
if (!isRunning(recheck) && remove(command)) // 如果线程池已经停止,从队列中移除任务
reject(command);
// 如果线程池正在运行,但是工作线程数为0,尝试添加一个新的工作线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 如果任务队列满了,尝试添加一个新的非核心工作线程来执行任务
else if (!addWorker(command, false))
// 如果无法添加新的工作线程(可能因为线程池已经停止或者达到最大线程数限制),则拒绝任务
reject(command);
}
}
2)addWorker
上面提到了,通过 addWorker
方法创建线程来执行任务。
public class ThreadPoolExecutor extends AbstractExecutorService {
private boolean addWorker(Runnable firstTask, boolean core) {
// ...
Worker w = null;
try {
// 创建 Worker 对象的时候,会创建一个线程,和任务一起封装到 Worker 内部
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// ...
if (workerAdded) {
// 启动线程,执行run方法
t.start();
workerStarted = true;
}
}
} // ...
}
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// 创建 Worker 对象的时候,会把线程和任务一起封装到 Worker 内部
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
// 重写了run方法,实际调用的是runWorker方法
public void run() {
runWorker(this);
}
}
}
3)runWorker
可以看到,创建 Worker
对象时,会把线程
和任务
一起封装到 Worker
内部,然后调用 runWorker
方法来让线程执行任务
public class ThreadPoolExecutor extends AbstractExecutorService {
final void runWorker(Worker w) {
// 获取当前工作线程
Thread wt = Thread.currentThread();
// 从 Worker 中取出任务
Runnable task = w.firstTask;
w.firstTask = null;
// 解锁 Worker(允许中断)
w.unlock();
boolean completedAbruptly = true;
try {
// 当有任务需要执行 或者 能够从任务队列中获取到任务时,工作线程就会持续运行
while (task != null || (task = getTask()) != null) {
// 锁定 Worker,确保在执行任务期间不会被其他线程干扰
w.lock();
// 如果线程池正在停止,并确保线程已经中断
// 如果线程没有中断并且线程池已经达到停止状态,中断线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 在执行任务之前,可以插入一些自定义的操作
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 实际执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行任务后,可以插入一些自定义的操作
afterExecute(task, thrown);
}
} finally {
// 清空任务,并更新完成任务的计数
task = null;
w.completedTasks++;
// 解锁 Worker
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 工作线程退出的后续处理
processWorkerExit(w, completedAbruptly);
}
}
}
runWorker 内部使用了 while 循环:
- 当 Worker 内部的任务执行完毕,就会不断地通过
getTask
方法从队列中获取任务。 - 只要能获取到任务,就会调用
run
方法继续执行任务(这就是线程能够复用的主要原因) - 如果从
getTask
获取不到任务的话,就会调用finally
中的processWorkerExit
方法,将线程退出。
这里还有一个细节: Worker 继承了 AQS,每次在执行任务之前都会调用 lock 方法,执行完任务之后,会调用 unlock 方法。
因此,如果想知道线程是否正在执行任务,只需要调用 Woker 的 tryLock 方法,根据是否加锁成功就能判断,加锁成功说明当前线程没有加锁,也就没有执行任务了。调用 shutdown
方法关闭线程池的时候,就是用这种方式来判断线程有没有在执行任务,如果没有的话,会尝试打断没有执行任务的线程。
4)getTask
前面讲到,会在 while 循环中不断地通过 getTask
方法获取任务,获取不到就退出。接下来就看一看 getTask 方法的实现。
public class ThreadPoolExecutor extends AbstractExecutorService {
private Runnable getTask() {
// 标志,表示最后一个poll()操作是否超时
boolean timedOut = false;
// 无限循环,直到获取到任务或决定工作线程应该退出
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果线程池状态是SHUTDOWN或更高(如STOP)并且任务队列为空,那么工作线程应该减少并退出
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 检查工作线程是否应当在没有任务执行时,经过keepAliveTime之后被终止
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 工作线程数超出最大线程数 或者 超出核心线程数且上一次poll()超时
if ((wc > maximumPoolSize || (timed && timedOut))
// 工作线程数大于1 或 队列为空
&& (wc > 1 || workQueue.isEmpty())) {
// CAS减少工作线程数
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 从任务队列获取任务(根据timed标志,决定使用poll还是take)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 超时等待
workQueue.take(); // 阻塞等待
// 成功获取到任务
if (r != null)
return r;
// 如果poll()超时,则设置timedOut标志
timedOut = true;
} catch (InterruptedException retry) {
// 如果在等待任务时线程被中断,重置timedOut标志并重新尝试获取任务
timedOut = false;
}
}
}
}
整个 runWorker 及 getTask 方法以及线程超时退出的机制如图所示:
6、使用示例
public class ThreadPoolExecutorDemo {
public static void main(String[] args) {
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 5, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
new ThreadPoolExecutor.AbortPolicy()
);
for (int i = 0; i < 20; i++) {
int index = i + 1;
try {
// 提交任务
executor.execute(() -> {
System.out.println("任务" + index + "正在运行");
while (true) {
// 让线程一直执行
}
});
} catch (RejectedExecutionException e) {
System.out.println("丢弃任务: " + index);
}
}
System.out.println("线程池阻塞队列中的任务数:" + executor.getQueue().size());
}
}
四、Executors 工具类(了解)
Executors
线程池工具类,提供四种快捷创建线程池的静态方法,可以创建多种类型的 ThreadPoolExecutor
。
1、newFixedThreadPool
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
}
newFixedThreadPool
创建一个固定线程数的线程池
- 最大线程数 = 核心线程数 = 指定数量
- 无界阻塞队列
LinkedBlockingQueue
2、newSingleThreadExecutor
public class Executors {
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
}
newSingleThreadExecutor
创建一个只有一个线程的线程池
- 最大线程数 = 核心线程数 = 1
- 无界阻塞队列
LinkedBlockingQueue
3、newCachedThreadPool
public class Executors {
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
}
newCachedThreadPool
创建一个动态调整线程数的线程池
- 最大线程数 =
Integer.MAX_VALUE
(约21亿,可能导致OOM) - 同步阻塞队列
SynchronousQueue
(没有容量)
4、newScheduledThreadPool
public class Executors {
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
}
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
0, NANOSECONDS,
new DelayedWorkQueue());
}
}
newScheduledThreadPool
创建一个支持延迟任务和周期性任务的线程池 ScheduledThreadPoolExecutor
- 最大线程数 =
Integer.MAX_VALUE
(约21亿,可能导致OOM) - 延迟阻塞队列
DelayedWorkQueue
五、线程池最佳实践
1、正确声明线程池
《阿里巴巴 Java 开发手册》强制线程池不允许使用
Executors
去创建,而是通过ThreadPoolExecutor
构造函数的方式
首先,使用 Executors
创建线程池有很多弊端:
-
CachedThreadPool
和ScheduledThreadPool
允许创建的最大线程数为
Integer.MAX_VALUE
,可能会创建大量的线程,从而导致OOM。 -
FixedThreadPool
和SingleThreadExecutor
使用默认的
LinkedBlockingQueue
,队列长度默认为Integer.MAX_VALUE
,可能堆积大量的请求,从而导致 OOM。
除了避免 OOM 的原因之外,不推荐使用 Executors
快捷创建线程池的原因还有:
- 实际使用中需要根据自己机器的性能、业务场景来手动配置线程池的参数比如核心线程数、使用的任务队列、饱和策略等等。
- 应该显示地给我们的线程池命名,这样有助于我们定位问题。
因此,正确声明线程池,应该通过 ThreadPoolExecutor
构造函数,结合生产环境,具体问题具体分析。
2、线程数量 怎么配置
线程数的设置主要取决于业务是 IO 密集型
还是 CPU 密集型
。
CPU密集型
定义:任务需要大量的运算,并且没有阻塞,CPU一直在全速运行
配置:最大线程数 = CPU核数 + 1
分析:线程开太多也没有处理器执行,+1是为了防止线程偶发的缺页中断
I/O密集型
定义:任务存在大量的I/O,导致大量的运算能力浪费在等待I/O上,CPU使用率比较低。
配置:最大线程数 = CPU核数 * 2
分析:线程在处理I/O时,不会占用 CPU 来处理,启用其他线程继续使用 CPU,以提高 CPU 的使用率。
// Java获取 CPU 核心数的方法
Runtime.getRuntime().availableProcessors();
控制线程创建数量,避免使用 Integer.MAX_VALUE
导致OOM。
3、阻塞队列 怎么选择
尽量使用有界队列。
ArrayBlockingQueue 对比 LinkedBlockingQueue
-
ArrayBlockingQueue 使用一个单独的 ReentrantLock 来控制对队列的访问
-
LinkedBlockingQueue 使用两个锁(putLock 和 takeLock),一个用于放入操作,另一个用于取出操作。
这可以提供更细粒度的控制,并可能减少线程之间的竞争。
但是 LinkedBlockingQueue 默认构造的长度默认为 Integer.MAX_VALUE
,可能导致OOM,需要使用指定长度的构造。
4、线程工厂 怎么配置
建议通过实现 ThreadFactory
接口,自定义线程工厂(例如设置线程的名称),方便问题排查。
1)默认的线程工厂
创建线程池时,如果不指定 ThreadFactory
,默认使用 Executors.defaultThreadFactory()
public class Executors {
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
static class DefaultThreadFactory implements ThreadFactory {
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
// 默认的线程名:pool-poolNumber-thread-n
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
}
}
2)自定义线程工厂
通过实现 ThreadFactory
接口,自定义线程工厂:
/**
* 线程工厂,它设置线程名称,有利于我们定位问题。
*/
public final class CustomThreadFactory implements ThreadFactory {
private final AtomicInteger threadNum = new AtomicInteger();
private final ThreadFactory delegate;
private final String namePrefix;
/**
* 创建一个带名字的线程池生产工厂
*/
public CustomThreadFactory(ThreadFactory delegate, String namePrefix) {
this.delegate = delegate;
this.namePrefix = namePrefix;
}
@Override
public Thread newThread(Runnable r) {
Thread t = delegate.newThread(r);
t.setName(namePrefix + " [" + threadNum.incrementAndGet() + "]");
return t;
}
}
@Slf4j
public class CustomThreadFactoryTest {
private static final String namePrefix = "custom-thread-";
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 5, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
// 指定自定义的线程工厂
new CustomThreadFactory(Executors.defaultThreadFactory(), namePrefix),
new ThreadPoolExecutor.AbortPolicy()
);
for (int i = 0; i < 20; i++) {
int index = i + 1;
try {
executor.execute(() -> {
log.info("任务{}正在运行", index);
while (true) {
// 让线程一直执行
}
});
} catch (RejectedExecutionException e) {
// Todo Handle Exception
}
}
}
}
运行结果如下所示:
14:35:57.995 [custom-thread- [1]] INFO com.demo.threadpool.CustomThreadFactoryTest - 任务1正在运行
14:35:57.996 [custom-thread- [3]] INFO com.demo.threadpool.CustomThreadFactoryTest - 任务13正在运行
14:35:57.995 [custom-thread- [2]] INFO com.demo.threadpool.CustomThreadFactoryTest - 任务2正在运行
14:35:58.019 [custom-thread- [4]] INFO com.demo.threadpool.CustomThreadFactoryTest - 任务14正在运行
14:35:58.020 [custom-thread- [5]] INFO com.demo.threadpool.CustomThreadFactoryTest - 任务15正在运行
5、拒绝策略 怎么配置
- 不允许任务丢弃:CallerRunsPolicy
- 允许任务丢弃:其他几种
可以实现 RejectedExecutionHandle 接口,自定义拒绝策略
- 自定义拒绝策略可以根据具体的业务需求,例如记录日志、将任务放入消息队列/缓存/数据库等。
@Slf4j
public class CustomRejectPolicy implements RejectedExecutionHandler {
public CustomRejectPolicy() {
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
log.error("线程池正在跑主线程,请注意!");
r.run();
}
}
}
6、Spring 线程池
使用 Spring 内部线程池时,一定要手动自定义线程池,配置合理的参数,不然会出现生产问题(一个请求创建一个线程)。
@Configuration
@EnableAsync
public class ThreadPoolExecutorConfig {
@Bean(name="threadPoolExecutor")
public Executor threadPoolExecutor() {
ThreadPoolTaskExecutor threadPoolExecutor = new ThreadPoolTaskExecutor();
// 获取当前系统的处理器数量
int processNum = Runtime.getRuntime().availableProcessors();
int corePoolSize = (int) (processNum / (1 - 0.2)); // 核心线程数
int maxPoolSize = (int) (processNum / (1 - 0.5)); // 最大线程数(I/O密集型)
threadPoolExecutor.setCorePoolSize(corePoolSize);
threadPoolExecutor.setMaxPoolSize(maxPoolSize);
threadPoolExecutor.setQueueCapacity(maxPoolSize * 1000);
threadPoolExecutor.setThreadPriority(Thread.MAX_PRIORITY);
threadPoolExecutor.setDaemon(false);
threadPoolExecutor.setKeepAliveSeconds(300);
threadPoolExecutor.setThreadNamePrefix("test-Executor-"); // 线程名字前缀
return threadPoolExecutor;
}
}
8、不同业务使用不同的线程池
如果多个业务使用相同的线程池,可能会造成“死锁”。下面举个例子:
@Service
@Slf4j
public class TestService {
@Resource(name = "threadPoolExecutor")
private ExecutorService taskExecutor;
public void method1() {
taskExecutor.execute(() -> method1());
}
public void method2() {
method3();
method4();
}
public void method3() {
// other operation...
}
public void method4() {
taskExecutor.execute(() -> System.out.println("xxxx"));
}
}
可以看到,method1()
和 method4()
使用了同一个线程池。
试想这样一种极端情况:
- 线程池的核心线程数和最大线程数都为N,
method1()
的任务数也为N。 - 由于
method1()
把线程池的资源用完了,所以method4()
无法获取到线程资源,一直被阻塞在队列中无法正常执行。
此时 method1()
在等 method4()
执行,而 method4()
又在等method1()
释放线程池资源,也就形成了“死锁”
7、线程池 和 ThreadLocal 共用的坑
线程池和 ThreadLocal
共用,可能会导致线程从ThreadLocal
获取到的是旧值/脏数据。这是因为线程池会复用线程对象,与线程对象绑定的类的静态属性 ThreadLocal
变量也会被重用,这就导致一个线程可能获取到其他线程的ThreadLocal
值。
解决上述问题比较建议的办法是使用阿里巴巴开源的 TransmittableThreadLocal
(TTL
)。
TransmittableThreadLocal
类继承并加强了 JDK 内置的InheritableThreadLocal
类,在使用线程池等会池化复用线程的执行组件情况下,提供ThreadLocal
值的传递功能,解决异步执行时上下文传递的问题。