文章目录
- 问题
- 思路
- 线程池的默认行为
- 自定义线程池扩容策略
- Code实现
- 小结
问题
Java 线程池是先用工作队列来存放来不及处理的任务,满了之后再扩容线程池。当我们的工作队列设置得很大时,最大线程数这个参数显得没有意义,因为队列很难满,或者到满的时候再去扩容线程池已经于事无补了。
那有没有办法让线程池更激进一点,优先开启更多的线程,而把队列当成一个后备方案呢?
举个例子,任务执行得很慢,需要 10 秒,如果线程池可以优先扩容到 5个最大线程,那么这些任务最终都可以完成,而不会因为线程池扩容过晚导致慢任务来不及处理
思路
- 由于线程池在工作队列满了无法入队的情况下会扩容线程池,那么我们是否可以重写队列的 offer 方法,造成这个队列已满的假象呢?
- 由于我们 Hack 了队列,在达到了最大线程后势必会触发拒绝策略,那么能否实现一个自定义的拒绝策略处理程序,这个时候再把任务真正插入队列呢
线程池的默认行为
Java 的线程池默认行为如下:
- 仅在有任务到来时才初始化核心线程。
- 当核心线程满后,任务会被放入工作队列。
- 如果工作队列满,线程池会扩容直到达到最大线程数。
- 超过最大线程数的任务会根据拒绝策略处理。
- 当线程数超过核心线程数时,超出线程会在空闲时被回收。
自定义线程池扩容策略
为了使线程池在任务到来时更激进地扩容,我们可以考虑以下两步:
-
重写工作队列的
offer
方法:
通过创建一个自定义工作队列,重写offer
方法,使其在插入任务时总是返回false
,从而模拟队列已满的状态。 -
实现自定义拒绝策略:
在达到最大线程数后,我们需要定义一个拒绝策略,在这个策略中,再把任务插入到自定义工作队列中。
Code实现
public int elasticTP() throws InterruptedException {
//这里开始是激进线程池的实现
//创建一个容量为10的阻塞队列,用于存储待执行的任务
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(10) {
@Override
public boolean offer(Runnable e) {
//先返回false,造成队列满的假象,让线程池优先扩容
return false;
}
};
//创建一个线程池,核心线程数为2,最大线程数为5,空闲线程存活时间为5秒
//使用自定义的线程工厂设置线程名称格式,拒绝执行处理器为尝试再次提交任务到队列
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2, 5,
5, TimeUnit.SECONDS,
queue, new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").build(), (r, executor) -> {
try {
//等出现拒绝后再加入队列
//如果希望队列满了阻塞线程而不是抛出异常,那么可以注释掉下面三行代码,修改为executor.getQueue().put(r);
if (!executor.getQueue().offer(r, 0, TimeUnit.SECONDS)) {
throw new RejectedExecutionException("ThreadPool queue full, failed to offer " + r.toString());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
//激进线程池实现结束
//定期打印线程池的状态信息
printStats(threadPool);
//每秒提交一个任务,每个任务耗时10秒执行完成,一共提交20个任务
//任务编号计数器
AtomicInteger atomicInteger = new AtomicInteger();
//循环提交20个任务到线程池
IntStream.rangeClosed(1, 20).forEach(i -> {
try {
//每秒提交一个任务
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//获取当前任务编号
int id = atomicInteger.incrementAndGet();
try {
//提交任务到线程池执行
threadPool.submit(() -> {
//任务开始时的日志记录
log.info("{} started", id);
try {
//模拟任务执行时间
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
//任务被中断时不做处理
}
//任务结束时的日志记录
log.info("{} finished", id);
});
} catch (Exception ex) {
//提交任务到线程池执行时发生错误
log.error("error submitting task {}", id, ex);
//任务提交失败,编号计数器回退
atomicInteger.decrementAndGet();
}
});
//等待所有任务完成
TimeUnit.SECONDS.sleep(60);
//返回最终的任务编号计数
return atomicInteger.intValue();
}
private void printStats(ThreadPoolExecutor threadPool) {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
log.info("=========================");
log.info("Pool Size: {}", threadPool.getPoolSize());
log.info("Active Threads: {}", threadPool.getActiveCount());
log.info("Number of Tasks Completed: {}", threadPool.getCompletedTaskCount());
log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());
log.info("=========================");
}, 0, 1, TimeUnit.SECONDS);
}
BlockingQueue
的 put
和 offer
方法在行为上有一些重要的区别,主要体现在任务添加失败时的处理方式和返回值上。以下是详细的对比:
-
put:
- 用于将元素添加到队列中,如果队列已满,
put
会阻塞当前线程,直到队列有空间可以添加元素。 - 语法:
void put(E e) throws InterruptedException;
- 用于将元素添加到队列中,如果队列已满,
-
offer:
- 尝试将元素添加到队列中,如果队列已满,则根据不同的实现可能会立即返回
false
,或在某些情况下也可以选择阻塞(例如offer(E e, long timeout, TimeUnit unit)
)。 - 语法:
boolean offer(E e); boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
- 尝试将元素添加到队列中,如果队列已满,则根据不同的实现可能会立即返回
返回值
-
put:
put
方法没有返回值,只在成功添加元素后返回。如果操作被中断,会抛出InterruptedException
。
-
offer:
offer
方法返回一个布尔值:- 返回
true
表示成功添加了元素。 - 返回
false
表示队列已满,未能添加元素。
- 返回
offer(E e, long timeout, TimeUnit unit)
方法在指定时间内等待,如果在超时之前队列有空间可以添加元素,则返回true
,否则返回false
。
- 适用场景
-
put:
- 更适合用于需要确保任务被添加到队列中的场景,且在队列满时允许线程等待。
-
offer:
- 更适合用于希望尽量避免阻塞的场景,尤其是在需要快速尝试添加任务但不希望等待的情况下。
示例代码
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(2);
// 使用 put 方法
try {
queue.put(1); // 成功
queue.put(2); // 成功
queue.put(3); // 阻塞,直到有空间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 使用 offer 方法
if (queue.offer(1)) {
System.out.println("成功添加1");
} else {
System.out.println("队列已满");
}
总结
put
方法适用于需要保证添加成功的场景,可能会导致线程阻塞。offer
方法适用于希望快速检查并添加元素的场景,不会阻塞,但可能会失败。
小结
通过重写 offer
方法来让工作队列显示为已满,迫使线程池在任务到来时优先扩容,是一个很巧妙的思路,哈哈哈。
线程池的实现分析
-
自定义工作队列:
- 通过重写
LinkedBlockingQueue
的offer
方法,总是返回false
,让线程池认为工作队列已满。这样,线程池会优先扩容到最大线程数。
- 通过重写
-
自定义拒绝策略:
- 在拒绝策略中,使用
executor.getQueue().offer(r, 0, TimeUnit.SECONDS)
来将任务添加到队列。如果队列已满,会抛出RejectedExecutionException
。 或者选择executor.getQueue().put(r)
- 在拒绝策略中,使用
-
任务提交:
- 通过
AtomicInteger
来计数已提交的任务。由于每个任务需要 10 秒才能完成,每秒提交一个任务,可以模拟任务积压的情况。
- 通过
进一步的改进建议
-
动态调整核心线程数:可以在任务数量超过某个阈值时,考虑动态增加核心线程数,以便更好地利用资源。
-
更智能的拒绝策略:可以根据具体场景实现一个更加复杂的拒绝策略,比如优先选择某些类型的任务进入队列,或者将任务缓存到内存中。
-
监控和反馈机制:增加监控机制,实时跟踪线程池的状态,包括活跃线程数、队列长度等,可以帮助在运行时做出更好的决策。