目录
- 线程池
- 使用线程池的目的
- 线程池工作原理
- 线程池常用方法
- 自定义线程池
- 等待队列
- 拒绝策略
- 线程工厂
线程池
使用线程池的目的
- 资源复用,降低开销。重复利用已创建的线程,避免线程频繁地创建和销毁带来的性能开销。
- 方便线程的可管理性。线程是稀缺资源,使用线程池可以进行统一的分配,调优和监控。
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
线程池工作原理
工作原理(借用guide哥的图):
注意:
- 刚开始创建线程池的时候不会立马创建好核心线程,而是有任务来了之后再创建核心线程
- 当线程池小于corePoolSize时,就算有线程空闲,也会一定会创建新的线程
- 当等待队列也满了的时候,又来一个任务,会新创建一个非核心线程,且用于执行刚刚到来的任务,而不是队头的任务
- 如果设置了allowCoreThreadTimeOut(true),那么核心线程空闲时间达到keepAliveTime也会关闭,但默认不会
线程池常用方法
//使用工具类创建线程池
ExecutorService executor1 = Executors.newFixedThreadPool(3);
//定义任务
Runnable task = ()->{
System.out.println("线程"+Thread.currentThread().getName()+"正在执行");
};
executor.excute(XXX); //执行任务,没有返回值
executor.submit(XXX);//执行任务,返回一个Future,用于获取线程的执行情况
executor.shutdown();//正常关闭,停止提交新任务,而已提交的任务可以正常执行,非阻塞
executor.shutdownNow();//立即强制关闭,停止提交新任务,正在运行的任务停止,等待队列也销毁,非阻塞
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); // 阻塞等待所有任务完成并且shutdown,如果超过到达时间,则返回false
当等待队列是无界队列时,非核心线程会创建吗?
不会,因为非核心线程只有在队列满的时候才会创建,使用了无界队列就相当于默认只有核心线程生效
自定义线程池
总结四种线内置程池都相对比较极端,所以一般都建议使用自定义线程池,保证一定的核心线程数(保证活跃)和一定的非核心线程数(提供伸缩)。
工作流程概述:刚开始没有线程,添加任务后创建核心线程,核心线程满了就加到等待队列,等待队列也满了就创建临时线程去执行,临时线程也满了就执行拒绝策略。
ThreadPoolExector线程池完整参数如下:
public ThreadPoolExecutor(int corePoolSize,//核心线程数
int maximumPoolSize,//最大线程数
long keepAliveTime,//非核心线程(临时线程)空闲存活时间
TimeUnit unit,//存活时间单位
BlockingQueue<Runnable> workQueue,//等待队列
ThreadFactory threadFactory,//线程工厂
RejectedExecutionHandler handler//拒绝策略
)
各个参数详解:
- corePoolSize:核心线程数,线程池中保持活跃的线程数量。即使线程空闲,核心线程也不会被回收。
- maximumPoolSize:最大线程数,线程池允许创建的最大线程数量。超过此数量的请求将被放入等待队列
- keepAliveTime:非核心线程的空闲时间,超过此时间后,非核心线程会被回收。
- unit:
keepAliveTime
的时间单位,例如秒、毫秒等。 - workQueue:等待队列,用于存放被提交但尚未被执行的任务。可以是不同类型的队列,如
LinkedBlockingQueue
、ArrayBlockingQueue
等。 - handler:饱和策略,当线程池和等待队列都满时,任务提交失败时所使用的处理策略。
- threadFactory:用于创建新线程的工厂,允许自定义线程的创建,例如设置线程名称或优先级。
注意:前面五个参数是必须指定的,而最后两个参数可以不指定,因为有默认值。
三大核心参数:核心线程用于保证快速响应和资源复用,而非核心线程用于应对突发情况,等待队列则是作为一个缓冲,同时控制处理任务的数量和顺序。
等待队列
默认
- LinkedBlockingQueue:一个基于链表的阻塞队列,支持 FIFO(先进先出)排序。可以设置容量,默认是
Integer.MAX_VALUE
。适用于任务较多且队列可以扩展的场景。 - ArrayBlockingQueue:一个基于数组的阻塞队列,容量固定,适用于任务数量可预估的场景。
- SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作都必须等待另一个线程的对应移除操作,适合高并发场景
- PriorityBlockingQueue:支持优先级排序的阻塞队列,适合需要按优先级处理任务的场景。
ArrayBlockingQueue和带有容量的LinkedBlockingQueue有什么区别,怎么选择?
LinkedBlockingQueue插入删除方便,对高并发场景友好,而ArrayBlockingQueue是用循环数组实现,插入删除的代价比较大,涉及到模运算(取余)。
ArrayBlockingQueue内存占用更小,因为是基于数组实现。更适合生产者-消费者模型中的任务数量相对固定的场景。
SynchronousQueue队列特殊在哪?什么时候用?
特殊之处:
- SynchronousQueue` 的容量为 0,这意味着它不存储任何元素。每个插入操作必须等待一个对应的移除操作才能完成,反之亦然。
- 直接交付:当一个线程试图插入一个元素到
SynchronousQueue
时,它会被阻塞,直到另一个线程尝试移除这个元素。这种设计实现了线程之间的直接交付。 - 没有内部缓冲:与其他阻塞队列(如
ArrayBlockingQueue
或LinkedBlockingQueue
)不同,SynchronousQueue
不维护任何内部元素,因此不能进行批量处理或存储任务。
使用效果:当核心线程满了的时候,每提交一个任务,的认为阻塞队列是满的,当非核心线程充足时,会马上创建一个非核心线程去执行这个任务。(内部的CachedThreadPool就是这样)
适用场景:在需要低延迟和高并发的应用场景中,SynchronousQueue
可以减少任务的等待时间,因为生产者和消费者必须在同一时间交互。
拒绝策略
当线程池和等待队列都满时,可以选择不同的拒绝策略对新任务进行处理:
- AbortPolicy(默认):拒绝任务并且抛出
RejectedExecutionException
,适合希望知道任务失败的场景。 - DiscardPolicy:丢弃任务,不抛出异常,适合对任务丢失不敏感的场景。
- CallerRunsPolicy:将任务交给线程池的调用者(比如主线程或者其它不属于本线程池管理的线程)自己去执行,适合希望减轻线程池压力的场景。
- DiscardOldestPolicy:丢弃掉等待队列中队头的任务,也就是最旧的任务,然后加入等待队列。适合希望保留最新任务的场景。
- DelayQueue:一个支持延迟处理的阻塞队列,任务必须在指定延迟后才能获取,适合需要定时执行的任务场景。
- 自定义拒绝策略,实现RejectedExecutionHandler接口。
注意:如上队列都属于阻塞队列,能够保证线程安全,并且符合生产者消费者模式,队列满的时候,会阻塞入队操作;队列空的时候,会阻塞出队操作。
自定义拒绝策略可以有哪些思路?
拒绝任务,并且打印一些有用日志。
import java.util.logging.Logger;
public class LoggingRejectPolicy implements RejectedExecutionHandler {
private static final Logger logger = Logger.getLogger(LoggingRejectPolicy.class.getName());
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
logger.warning("Task " + r.toString() + " rejected. Current pool size: " + executor.getPoolSize());
}
}
等待一段时间后再尝试提交任务。
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class RateLimitPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
Thread.sleep(100); // 限流,稍等后重试
executor.execute(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
线程工厂
线程工程用于控制线程池中的线程是如何创建的,也就是给各个程取一个名字。
这个参数一般不需要传递,因为有一个Executors有一个默认的线程工厂(是静态类,所有线程池共用)
默认规则:pool-线程池序号-thread-线程序号
实现逻辑:执行一次DefaultThreadFactory构造方法,线程池的序号就+1(原子操作),而且这个序号是static修饰,所有工厂对象共享;然后每调用一次newThread,线程的序号就+1(原子操作),这个序号不是static,各个工厂都从1开始
/**
* The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory {
//原子类,static,记录线程池的个数
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
//原子类,记录某个线程的个数
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
//创建线程的京具体方法
public Thread newThread(Runnable r) {
//拼接出线程的名字
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())//设置线程都是飞守护线程,不会随着主线程终止而终止
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
自定义线程工厂实例
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class Main {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
3,
10,
10,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1),
//自定义线程工厂
new ThreadFactory() {
AtomicInteger number = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"myThreadPool_"+number.getAndIncrement());
}
});
// 提交多个任务
for (int i = 0; i < 10; i++) {
int taskId = i;
threadPoolExecutor.execute(() -> {
System.out.println("任务 " + taskId + " 在 " + Thread.currentThread().getName() + " 执行");
try {
Thread.sleep(5000); // 模拟任务执行
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 关闭线程池
threadPoolExecutor.shutdown();
}
}
```在这里插入图片描述
### 线程池大小该如何设置?
假设核心数为n,可参考公式:
CPU密集类型(IO比较少),可以设置n+1
IO密集类型(读取文件或者数据库的IO比较多),可以设置为线程的k的倍数。比如从2 * n开始,慢慢尝试4 * n、8 * n ...
至于k取多少,可以根据如下公式确定:
![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/6b85ef0bdffa4153a660e6986ee6c85d.png)
> cpu密集型任务为什么设置n+1?
当某个线程故障或者缺页中断了,此时剩余一个线程就可以顶上
> IO密集类型为什么设置n的整数倍?
**因为对于IO密集类型的任务来说,CPU不是一直在运行的,可能在等待**比如CPU运行1秒,IO运行3秒,此时完全可以设置4个线程去执行任务,让3s的等待时间也利用起来。
> Windows电脑假设是4核心,8个逻辑处理器,那么最多支持8个线程并发执行吗?为什么java中可以创建100个线程同时执行?
不是,八个逻辑处理器意味着最多可以有八个线程**并行执行**(同一时刻),而创建的100个线程会在这八个逻辑处理器中进行并发执行(不断切换)。操作系统会通过上下文切换在不同线程之间切换,从而有效利用处理器资源。虽然同时运行的线程数有限,但通过快速切换,多个线程仍能“看起来”像是在同时执行。
> 如果cpu只有一个核心,可以支持多线程吗?
就算只有一个核心(假设逻辑处理器也是1),也是可以支持多个线程的,只不过是并发执行,线程数量越多,各个线程的等待其它线程时间片的时间就越长,性能开销就越大。