一、线程池简介
线程池是一种池化的思想,是将一些共同资源放到池中进行管理和使用,从而避免大量的创建销毁带来的资源浪费等问题,线程池主要优点体现在:
- 降低资源消耗:普通线程创建执行完任务之后即被销毁,下次执行任务又创建新的线程,而线程池可以管理线程,多个线程可以重复使用,从而避免了重复创建以及销毁线程带来的资源消耗
- 提高响应速度:通过直接创建线程的方式当有请求过来时,需要浪费一部分时间去创建线程,使用线程池则可以直接使用现成的线程执行任务,从而提高了响应速度
- 提高线程的可管理性(系统资源、系统稳定性):将线程统一进行管理有利于系统资源的统一管理,同时也增加了系统的稳定性,避免一次性创建过多的线程
- 将创建销毁放到系统空闲时间段进行:使用线程池可以将线程的创建销毁放到相对空闲的时间段进行,例如程序启动时,更大的降低了系统访问的压力
同时使用线程池还可以减小 this 逃逸的风险:线程池执行是延迟的,受当前线程池空闲线程以及任务队列等因素影响;线程池的使用本身并不直接发布对象的引用,而是将任务(通常是实现了Runnable 或 Callable 接口的对象)提交给线程池执行
- this逃逸:类构造函数在返回实例之前,线程便持有该对象的引用
- 在类的构造函数中初始化this对象以及属性,但在构造函数之外访问了该this引用,则可能由于构造器还未完全完成或者指令重排序的原因就可能会访问到该 this 还未被初始化的属性,导致出现空指针的错误
二、线程池参数
- corePoolSize : 核心线程数量;任务队列未达到队列容量时,最大可以同时运行的线程数量。
- 默认核心线程是来一个新任务且无其它空闲核心线程创建一个直到达到上限,也可设置线程池启动时提前创建并启动所有核心线程
- 执行完成后会检查队列中是否有等待任务并执行
- 已创建完成的核心线程在执行完任务之后不销毁,等待下一次执行任务。但也可设置销毁核心线程
- 核心线程执行完当前任务后,如果有新任务或者队列中有任务则继续执行任务
- 每一次执行完核心线程,会保留线程的结构、状态和数据,进行等待新任务,当有新任务会保证在执行新任务前变成可重用状态即清理旧的相关数据
- maximumPoolSize : 最大线程数;任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数
- workQueue : 任务队列;新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中
- 队列未满但核心线程已满时,新任务进来会直接放入队列中,核心线程空闲后会从队列中获取任务执行,获取的方式依照队列设置(通常是先进先出)
- 队列满但最大线程池未满时再有任务来则会创建临时线程执行新进来的任务,执行完后会检查队列中是否有等待任务并执行
- 队列满且已到达最大线程则会采用拒绝策略
- 任务队列:LinkedBlockingQueue(无界队列);SynchronousQueue(同步队列);DelayedWorkQueue(延迟阻塞队列)
- keepAliveTime : 线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,多余的空闲线程即临时线程不会立即销毁,而是会等待,直到等待的空闲时间超过了 keepAliveTime 才会被回收销毁,线程池回收线程时,会对核心线程和非核心线程一视同仁,直到线程池中线程的数量等于 corePoolSize ,回收过程才会停止
- unit : keepAliveTime 参数的时间单位
- threadFactory:线程工厂,可以定制线程对象的创建,例如设置线程名字、是否是守护线程等
- handler拒绝策略:当所有线程都在繁忙,workQueue 也放满时,会触发拒绝策略
- ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException 来拒绝新任务的处理(默认)
- ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务,也就是直接在调用 execute 方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略,提供可伸缩队列
- ThreadPoolExecutor.DiscardPolicy: 不处理新任务,直接丢弃掉
- ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求
- execute:用于提交线程,这个是通用的接口方法。在这个方法里主要实现的就是,当前提交的线程是加入到worker、队列还是放弃
- addWorker:主要是类 Worker 的具体操作,创建并执行线程。这里还包括了 getTask() 方法,也就是从队列中不断的获取未被执行的线程
线程池执行任务的顺序:
三、线程池核心类
1. Executor
线程池所有类都会实现 Executor 接口,Executor 接口中只定义了一个 execute 方法
2. ExecutorService
ExecutorService接口中定义了 shutdown 及 submit 等方法
shutdown 和 submit方法的区别:
- execute方法没有返回值,submit 可以使用 Future 类接收返回值,通过这个 Future 对象可以判断任务是否执行成功,并获取任务的返回值(get() 方法会阻塞当前线程直到任务完成,get(long timeout, TimeUnit unit) 多了一个超时时间,如果在 timeout 时间内任务还没有执行完,就会抛出 TimeoutException),可以使用 cancel 方法取消任务等
- execute 通常用于执行 Runable 线程,submit 可以提交 Runnable 或 Callable 任务
- submit 方法中如果抛出异常的话,异常会被封装到 Fature 返回对象中,可以使用get方法获取异常内容,而对于 execute 方法抛出异常的话默认处理方式是打印出堆栈信息并停止线程,也可以使用 UncaughtExceptionHandler 或者 afterExecute() 处理异常
3. ThreadPoolExecutor
ThreadPoolExecutor是最常使用也是最核心的类,ThreadPoolExecutor提供了四个不同参数的构造方法,其中最主要的构造方法如下,其他构造方法也是调用的该构造方法,只是对于部分参数进行了默认值初始化
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 || maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ? null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
4. ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor 类继承了 ThreadPoolExecutor 类,并且实现了ScheduledExecutorService 接口
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService{}
5. Executors
Executors类中封装了多种线程池,可以直接通过该类创建对应线程,这里介绍4种常见的功能线程池
封装的线程池内部都是通过调用 ThreadPoolExecutor 或者 ScheduledThreadPoolExecutor 类进行初始化的,只是传入的线程池参数不同,所以就形成了不同功能的线程
- FixedThreadPool:
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
从代码中可以看出 FixedThreadPool 线程池是传入一个数值,并且该数值作为初始化的线程的核心线程数和最大线程数,表示该线程池是一个核心线程数等于最大线程数的线程池,即不存在临时线程
由于使用的队列为 LinkedBlockingQueue 无界队列,所以表示该线程池的等待队列永远不会被放满,接收到的任务数量也是不被限制的,可能会有OOM内存溢出的风险,并且只会创建核心线程,不会创建临时线程执行任务
- SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
从代码中可以看出 SingleThreadExecutor 线程池没有传入任何参数,而核心线程数等于最大线程数为1,同时最多只能有一个线程执行任务
同时由于使用的队列也是 LinkedBlockingQueue 无界队列,所以表示该线程池的等待队列永远不会被放满,接收到的任务数量也是不被限制的,可能会有OOM内存溢出的风险,并且只会创建核心线程,不会创建临时线程执行任务
- CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
从代码中可以看出 CachedThreadPool 线程池没有传入任何参数,而核心线程数设置为0,最大线程数设置为了int最大值,表示该线程池没有核心线程,只能创建临时线程执行任务,且最大线程数为最大值,可能导致创建大量的线程,导致出现OOM内存溢出的风险
CachedThreadPool 线程池使用的 SynchronousQueue 同步队列容量为空,所以核心线程满时会直接创建大量的临时线程执行任务,从而导致出现OOM内存溢出的风险
- ScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
该线程池是通过 ScheduledThreadPoolExecutor 类创建的,而 ScheduledThreadPoolExecutor类 也是通过调用 ThreadPoolExecutor 类生成的线程池
ScheduledThreadPool 线程池使用的队列为 DelayedWorkQueue 延迟阻塞队列,会按照延迟的时间长短对任务进行排序,每次会将队列中执行时间最靠前的任务取出执行,是一个动态队列,队列元素满时会自动扩容,所以只会创建核心线程,不会创建临时线程
注意:《阿里巴巴 Java 开发手册》强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 构造函数的方式,这样的处理方式可以更好的掌握线程池的运行规则,制定合适参数的线程池,避免资源耗尽的风险
四、线程池实例
- 使用 @Bean("name") 对 ThreadPoolExecutor 进行bean注册
- 使用构造方法传入线程池参数进行 bean 实例注册
- 需要通过名称匹配到对应 bean,使用 @Autowired 搭配 @Qualifier("name") 或者 @Resource(name = "name") 注入(如果程序中只需要一个线程池的话那么可以直接使用Autowired 注解通过类型 bean 注入)
@Component
public class ThreadPoolBean {
@Bean("labelWsThreadPool")
public ThreadPoolExecutor createThreadPoolExecutor() {
return new ThreadPoolExecutor(10, 40, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5000), new ThreadPoolExecutor.CallerRunsPolicy());
}
@Autowired //线程池bean注入
@Qualifier("CustomerThreadPool")
//@Resource(name = "CustomerThreadPool")
private ThreadPoolExecutor threadPoolExecutor;
}
- 或者使用工厂 Factory 类定义一个 ExecutorService 线程池,并定义初始化方法 init() 在该方法中初始化线程池,且在系统启动时调用该方法实现初始化,并提供 execute 方法调用线程池。由于方法是定义的 static 类型所以可以直接通过类名调用
public class ThreadFactory {
private static ExecutorService businessExecutor = null; //业务处理线程池
public static void init(){
LOG.info("初始化线程池");
Iexecutor = new ThreadPoolExecutor(9, 40, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(5000),new ThreadPoolExecutor.CallerRunsPolicy());
LOG.info("初始化线程池结束");
}
public static void excute(Runnable run){
businessExecutor.execute(run);
}
public static Future submit(Callable call){
return businessExecutor.submit(call);
}
}
- 实现一个线程 Thread,线程 Thread 通过实现 run 方法自定义方法体,如果有参数传入,可在自定义 Thread 类类中定义全局变量并通过构造函数传入赋值
- 创建线程的四种方式,将创建的线程通过 submit() 或者 execute() 的参数传入执行:
- extends Thread
- executor.execute(new Runnable() { public void run() { } })
- implements Callable<String>,具有返回值Future
- implements Runnable
//@AllArgsConstructor,如果有属性不需要传入初始化,那么可自定义构造方法,将需要的参数放入构造方法
//@Component //如果该类为Spring管理的bean,且该类中只有一个构造方法,那么Spring尝试将构造函数的参数注入bean,未找到bean则会报错
public class MyThread extends Thread{
private List<String> phoneNoList;
private int num = 10000;
public PhoneNoExhaustionThread(List<String> phoneNoList) {
this.phoneNoList = phoneNoList;
}
@Override
public void run(){
//执行任务
}
}
- 再调用 execute 方法传入实现线程的对象即可
threadPoolExecutor.execute(new MyThread(phoneNoList)); //通过bean注入实现
ThreadFactory.excute(new MyThread(phoneNoList)); //通过工厂类实现
五、线程池使用方式
- 执行调用线程池,主线程后续执行其他步骤,主线程和线程池中的任务并发执行,取决于CPU和核数以及CPU时间调度等因素,主线程可直接执行程序返回操作
- 使用 CountDownLatch 计数器等工具类等待线程池中的对应任务执行完毕,可用于多个任务并发执行,但主线程需要等待所有任务执行完毕,主线程依赖多个线程的执行结果,多个线程从串行执行变成并行执行
六、关闭线程池
可在使用完线程池后选择合适的方式关闭线程池,也可通过工厂模式封装关闭线程池,后续在程序关闭前的步骤中对线程池进行关闭
- shutdown() :关闭线程池,线程池的状态变为 SHUTDOWN。线程池不再接受新任务了,但是队列里的任务得执行完毕
- shutdownNow() :关闭线程池,线程的状态变为 STOP。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List