一、线程池
提前创建一系列的线程,保存在这个线程池中,有任务要执行的时候,从线程池中取出线程来执行。没有任务的时候,线程池放回去。
二、为什么要使用线程池
线程使用上的问题:
-
线程的频繁创建 和 销毁
-
线程的数量过多,会造成CPU资源的开销
-
上下文切换(消耗CPU资源)
-
那么如何实现线程的复用呢? 池化技术
三、线程池的设计猜想
3.1线程池的设计思考?
需求: 实现线程的重复使用
分解:
-
如何使用线程的复用?
-
让线程实现复用的唯一方法,就是让线程不结束
-
那如何让线程执行新的任务呢?也就是说,任务怎么给他执行?
-
[共享内存]-> List.add()
-
-
线程一直处于运行状态,合理吗?
-
有任务来的时候,执行,没有任务的时候,阻塞
-
-
-
结论: 通过阻塞队列的方式,来实现线程池中线程的复用
思考: 通过阻塞队列的方式,如果队列满了,可以阻塞主线程/生产者线程吗?显然不能,那么怎么办:
-
1.增加消费者线程的数量(扩容) 也就是增加线程去执行任务,消费者多了,阻塞队列自然被消费的也快了,就不容易阻塞主线程了
-
2.如果扩容解决不了问题,那只能采用拒绝策略
-
报错
-
直接丢弃这个任务
-
直接普通线程调用task.run(直接重新开启一个线程)
-
队列中头部的等待最久的任务丢弃,然后把当前任务添加到阻塞队列
-
存储起来,后续等待空闲之后重试(自定义去完成)
-
结束的方法:让线程执行结束(run方法执行结束),也就是跳出while循环
3.2 线程池的核心参数
a.线程数量(核心线程数) [初始化时创建]
b. 最大线程数 [还能够扩容多少个线程 与核心线程数的差]
c. 存活时间 [扩容的线程要有一个存活时间]
d. 存活时间单位
e.阻塞队列(使用哪一种阻塞队列)
f.线程工厂(生产线程的工厂)(有默认值)
h.阻绝策略(有默认值 默认抛出异常)
四、Java中提供的线程池
Exectors
-
newScheduledThreadPool 提供周期的线程池
-
newFixedThreadPool 固定线程数量
-
newSingleThreadExecutor 只有一个线程的线程池
-
newCachedThreadPool 可以缓存的线程池->理论上来说,有多少情况,就构建多少个线程
五、自定义线程池源码分析
线程池中的核心线程是延迟初始化的
-
先初始化核心线程
-
调用阻塞队列的方法,把task存进去
-
如果true,说明当前的请求量不大,核心线程就可以搞定
-
false,增加工作线程(非核心线程)
-
如果添加失败,说明当前的工作线程数量达到了最大的线程数,直接调用拒绝策略
-
-
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 前3位记录运行状态 后29位记录线程数
int c = ctl.get();
// 1.判断当前工作线程数是否小于核心线程数(延迟初始化)
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) // 添加工作线程,并执行任务
return;
c = ctl.get();
}
// 2.添加到阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 增加工作线程
else if (!addWorker(command, false))
reject(command); // 4. 增加失败,则执行拒绝策略
}
5.1 addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
// 1. 修改工作线程记录数
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c)) //1.1 CAS操作进行修改
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 2. 创建线程并启动
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 3. 添加失败 则回滚
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
分析: addWorker主要做3件事,.a.使用CAS操作修改原子值中的工作线程数 b.将任务放到Worker对象中,并启动线程 c.如果线程启动失败,则回滚。线程启动后,则自动调用Worker对象中的run方法。
5.2 runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 1. while循环保证当前线程不结束,直到task为空
while (task != null || (task = getTask()) != null) {
// 2.这里是因为表示当前线程正在运行一个任务,其它地方要执行shutdown 你要等我执行结束
w.lock(); // worker继承了AQS 实现了互斥锁
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt(); // 是否应该中断 在gettask中处理
try {
// 空实现,方便扩展
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 空实现,方便扩展
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
5.3 getTask
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//1. 判断如果线程池已经结束,直接返回Null,需要清理掉所有的工作线程
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 2. 是否允许超时 allowCoreThreadTimeOut 为true 也就是说设置为true 核心线程也
// 可以被销毁
// 或者工作线程数大于核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c)) // cas操作减少工作线程
return null;
continue;
}
try {
//* 执行任务
Runnable r = timed ?
// 超时阻塞
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 如果阻塞队列为空,则会阻塞在这个地方
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
分析: getTask中从队列中取出任务并执行。注意:allowCoreThreadTimeOut 设置为true,则核心线程也可以被销毁
5.4 拒绝策略
a. AbortPolicy(抛出异常)
// 报错 抛出异常
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
b. CallerRunsPolicy (创建一个普通线程,并执行)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
c. DiscardOldestPolicy (从队列头部抛弃一个,执行当前的)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
d. DiscardPolicy (什么也不做)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}