文章目录
- 线程池介绍
- 线程池核心参数
- 核心线程数(Core Pool Size)
- 最大线程数(Maximum Pool Size)
- 队列(Queue)
- 线程空闲超时时间(KeepAliveTime)
- 拒绝策略(RejectedExecutionHandler)
- 线程池执行流程
- 快速消费线程池
- 快速消费线程池组件
- 相关依赖
- 快速消费队列
- 快速消费线程池
- 获取配置文件的配置
- 配置线程池Bean到容器中
- 说明
线程池介绍
线程池作为多线程编程中的重要工具,旨在通过复用已创建的线程来减少线程创建与销毁的开销,提升系统资源利用率和并发性能。要有效地使用线程池,理解和配置其核心参数至关重要。
线程池核心参数
创建一个线程池的代码如下,可以看到构造方法需要传递几个参数,下文会详细展示每个参数的含义:
// 导包
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
// 创建线程池
return new ThreadPoolExecutor(poolConfigProperties.getCoreSize(),
poolConfigProperties.getMaxSize(),
poolConfigProperties.getKeepAliveTime(),
TimeUnit.SECONDS,
//队列的最大容量
new LinkedBlockingDeque<>(600),
//使用默认的工程
Executors.defaultThreadFactory(),
//使用拒绝新来的拒绝策略
new ThreadPoolExecutor.CallerRunsPolicy()
);
核心线程数(Core Pool Size)
核心线程数是指线程池在初始化时创建并保持活动状态的线程数量。即使这些线程当前没有任务执行,它们也不会被回收。核心线程数通常根据系统资源、预期并发负载和任务特性来设定。核心线程在池中长期存在,能够快速响应新提交的任务,减少任务提交后的等待时间。
最大线程数(Maximum Pool Size)
最大线程数限制了线程池能同时容纳的线程总数。当核心线程数无法满足当前任务需求时,线程池会创建额外的线程直至达到最大线程数。超过这个阈值后,线程池将采取拒绝策略处理新提交的任务。合理设置最大线程数,既能防止资源过度消耗导致系统过载,又能确保在高并发场景下有足够的线程处理任务。
队列(Queue)
线程池通常配合任务队列使用,用于暂存待处理的任务。当所有核心线程都处于忙碌状态且未达到最大线程数时,新提交的任务会被放入队列中等待。常见的队列类型包括无界队列(如 LinkedBlockingQueue)、有界队列(如 ArrayBlockingQueue)和优先级队列(如 PriorityBlockingQueue)。队列的选择和容量大小直接影响线程池的阻塞策略和任务调度效率。
线程空闲超时时间(KeepAliveTime)
当线程池中存在超出核心线程数的非核心线程,并且这些线程在一段时间内(即 KeepAliveTime)没有执行任何任务,则会自动终止。这个参数有助于释放闲置资源,避免资源浪费。对于长期存在大量任务的系统,可以适当增大或关闭这个超时时间。
拒绝策略(RejectedExecutionHandler)
当线程池和队列都无法接纳新任务时,需要采用拒绝策略来处理。常见的拒绝策略有:
- AbortPolicy:默认策略,直接抛出 RejectedExecutionException。
- CallerRunsPolicy:由提交任务的线程自行执行任务。
- DiscardPolicy:默默地丢弃任务,不抛出异常也不执行。
- DiscardOldestPolicy:丢弃队列中最旧的任务,尝试提交新任务。
线程池执行流程
-
初始阶段
:线程池创建并启动核心线程数指定数量的线程。此时,如果有任务提交,直接由这些核心线程执行。 -
核心线程饱和
:当所有核心线程都在执行任务且任务队列尚未满时,新提交的任务被放入队列等待。 -
队列满载
:若任务提交速率持续高于线程处理速度,队列达到其容量上限。此时,线程池开始创建新的线程(不超过最大线程数),直接执行新提交的任务。 -
达到最大线程数
:若任务增长仍然无法遏制,线程池达到最大线程数。此时,新提交的任务将触发拒绝策略。 -
任务减少与线程收缩
:当任务提交速率降低,线程池中的线程开始完成任务并变得空闲。对于非核心线程,若在 KeepAliveTime 时间内未获得新任务,将被终止。系统逐渐回归到更低的线程数,直至仅保留核心线程。
在任务量增长的过程中,线程池通过动态调整线程数量和利用任务队列,既保证了系统的响应能力,又防止了资源过度消耗。
快速消费线程池
快速消费线程池通过对上述线程池进行改造,当核心线程饱和时,再提交的任务不是先加入到队列中,而是直接创建非核心线程来执行新提交任务。快速消费线程池可以加快任务的执行,减少任务的堆积。
快速消费线程池组件
相关依赖
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
快速消费队列
该类继承自LinkedBlockingQueue,并对其offer方法进行定制,以配合EagerThreadPoolExecutor实现更灵活的任务调度策略。主要目的是在满足特定条件时,促使线程池创建非核心线程以快速处理任务,而非直接将任务放入队列等待处理。
import lombok.Data;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
/**
* 快速消费任务队列
*/
@Data
public class EagerTaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {
private EagerThreadPoolExecutor executor;
/**
* 构造函数,传入队列容量参数,用于初始化LinkedBlockingQueue。
*
* @param capacity 队列的最大容量
*/
public EagerTaskQueue(int capacity) {
super(capacity);
}
/**
* 重写父类LinkedBlockingQueue的offer方法,实现自定义的任务入队逻辑
* 当没有到达最大线程时,返回false,让其创建非核心线程
*
* @param runnable 待添加的任务对象
* @return 如果任务成功加入队列或触发线程池创建非核心线程,则返回true;否则返回false
*/
@Override
public boolean offer(Runnable runnable) {
// 获取当前线程池的线程数量
int currentPoolThreadSize = executor.getPoolSize();
// 检查是否有核心线程处于空闲状态(已提交任务数小于当前线程数)
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
// 如果有核心线程正在空闲,将任务加入阻塞队列,由核心线程进行处理任务
return super.offer(runnable);
}
// 检查当前线程池线程数量是否小于最大线程数
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
// System.out.println("线程池线程数量小于最大线程数,返回 False,线程池会创建非核心线程");
// 当前线程池线程数量小于最大线程数,返回false,触发线程池创建非核心线程处理任务
return false;
}
// 如果当前线程池数量大于最大线程数,任务加入阻塞队列,等待线程池中的已有线程处理
return super.offer(runnable);
}
/**
*
* @param runnable 待添加的任务对象
* @param timeout 等待加入队列的超时时间
* @param timeUnit 超时时间单位
* @return 如果任务成功加入队列或触发线程池创建非核心线程,则返回true;否则返回false
* @throws InterruptedException 如果在等待过程中线程被中断
* @throws RejectedExecutionException 如果线程池已关闭
*/
public boolean retryOffer(Runnable runnable, long timeout, TimeUnit timeUnit) throws InterruptedException {
// 如果线程池已关闭,则抛出RejectedExecutionException异常。
if (executor.isShutdown()) {
throw new RejectedExecutionException("Executor is shutdown!");
}
return super.offer(runnable, timeout, timeUnit);
}
}
快速消费线程池
该类继承自ThreadPoolExecutor,并对其进行定制,以实现更灵活的任务调度策略。主要特点包括:
- 使用自定义的EagerTaskQueue作为工作队列,支持根据线程池状态动态调整任务入队逻辑。
- 维护正在处理的任务数量计数器(submittedTaskCount),以便EagerTaskQueue判断是否有核心线程处于空闲状态。
- 在execute方法中,处理任务提交失败的情况,尝试将任务重新投递到队列或使用拒绝策略。
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 快速消费线程池
*/
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
/**
* 使用AtomicInteger记录当前正在处理的任务数量,提供线程安全的计数操作。
*/
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
/**
* 构造函数,接受线程池相关的配置参数,包括核心线程数、最大线程数、线程存活时间、时间单位、工作队列、线程工厂和拒绝策略。
* 工作队列类型为自定义的EagerTaskQueue,用于实现特殊的任务入队逻辑。
*
* @param corePoolSize 核心线程数
* @param maximumPoolSize 最大线程数
* @param keepAliveTime 线程空闲后的存活时间
* @param unit 时间单位
* @param workQueue 工作队列,类型为EagerTaskQueue
* @param threadFactory 线程工厂,用于创建新线程
* @param handler 拒绝策略,当线程池和队列无法接受新任务时的处理方式
*/
public EagerThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
EagerTaskQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
/**
* 创建一个EagerThreadPoolExecutor实例的便捷方法
* 包括创建EagerTaskQueue并设置其与线程池的关联
*
* @param corePoolSize 核心线程数
* @param maximumPoolSize 最大线程数
* @param keepAliveTime 线程空闲后的存活时间
* @param unit 时间单位
* @param queueCapacity 队列容量
* @param threadFactory 线程工厂,用于创建新线程
* @param handler 拒绝策略,当线程池和队列无法接受新任务时的处理方式
* @return 创建的EagerThreadPoolExecutor实例
*/
public static EagerThreadPoolExecutor createEagerThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
int queueCapacity,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
EagerTaskQueue eagerTaskQueue = new EagerTaskQueue(queueCapacity);
EagerThreadPoolExecutor eagerThreadPoolExecutor = new EagerThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, eagerTaskQueue, threadFactory, handler);
eagerTaskQueue.setExecutor(eagerThreadPoolExecutor);
return eagerThreadPoolExecutor;
}
/**
* 获取当前正在处理的任务数量。
*
* @return 当前正在处理的任务数量
*/
public int getSubmittedTaskCount() {
return submittedTaskCount.get();
}
/**
* 重写父类的afterExecute方法,当任务执行完成后,将正在执行的任务数量减一。
* 这是ThreadPoolExecutor提供的钩子方法,用于在任务执行结束后进行清理或其他操作。
*
* @param r 执行完毕的任务
* @param t 执行过程中抛出的异常(如果有的话)
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
// 任务执行完成,将正在执行数量-1
submittedTaskCount.decrementAndGet();
}
/**
* 重写父类的execute方法,用于提交任务到线程池。
* 在提交任务之前,先将正在执行的任务数量加一。若提交失败,根据具体情况尝试重新投递任务或使用拒绝策略。
*
* @param command 待提交的任务
* @throws RejectedExecutionException 如果任务无法被接受,且无法重新投递到队列
*/
@Override
public void execute(Runnable command) {
// System.out.println("使用快速消费线程池执行任务");
// 将正在执行任务数量 + 1
submittedTaskCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException ex) {
// 任务被拒绝,间隔一定时间,将任务重新投递到队列
EagerTaskQueue eagerTaskQueue = (EagerTaskQueue) super.getQueue();
try {
// 将任务重新投递到队列
if (!eagerTaskQueue.retryOffer(command, 10, TimeUnit.MILLISECONDS)) {
// 队列已满,使用拒绝策略,并减少计数
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.", ex);
}
} catch (InterruptedException iex) {
// 重试失败,将正在执行任务数量 - 1
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(iex);
}
} catch (Exception ex) {
// 执行失败,将正在执行任务数量 - 1
submittedTaskCount.decrementAndGet();
throw ex;
}
}
}
获取配置文件的配置
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@ConfigurationProperties(prefix = "sss.thread")
@Component//将该配置放到容器中
@Data
public class ThreadPoolConfigProperties {
private Integer coreSize;
private Integer maxSize;
private Integer keepAliveTime;
}
配置线程池Bean到容器中
import com.dam.eager.EagerThreadPoolExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Configuration
public class MyThreadConfig {
/**
* @param poolConfigProperties 如果需要使用到ThreadPoolConfigProperties,一定要使用Component将其加入到容器中
* @return
*/
@Bean
public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties poolConfigProperties) {
// 普通线程池
// return new ThreadPoolExecutor(poolConfigProperties.getCoreSize(),
// poolConfigProperties.getMaxSize(),
// poolConfigProperties.getKeepAliveTime(),
// TimeUnit.SECONDS,
// //队列的最大容量
// new LinkedBlockingDeque<>(600),
// //使用默认的工程
// Executors.defaultThreadFactory(),
// //使用拒绝新来的拒绝策略
// new ThreadPoolExecutor.CallerRunsPolicy()
// );
// 快速消费线程池
return EagerThreadPoolExecutor.createEagerThreadPoolExecutor(
poolConfigProperties.getCoreSize(),
poolConfigProperties.getMaxSize(),
poolConfigProperties.getKeepAliveTime(),
TimeUnit.SECONDS,
// 队列的最大容量
600,
// 使用默认的工程
Executors.defaultThreadFactory(),
// 使用拒绝新来的拒绝策略
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
}
说明
快速线程池的实现参考马哥 12306 的代码,代码仓库为12306,该项目含金量较高,有兴趣的同学可以去学习一下。