一、自定义线程池
1)背景:
- 在 QPS 量比较高的情况下,我们不可能说所有的访问都创建一个线程执行,这会导致内存占用过高,甚至有可能出现 out of memory
- 另外也要考虑 cpu 核数,如果请求超过了cpu核数,那么有一部分线程就会收到限制,然后等cpu时间片结束会进行一个上下文切换,频繁的上下文也会影响性能。
总和以上,我们可以引出线程池的概念,也就是结合前面的享元模式,创建一批线程,复用线程,既可以较少内存占用,又可以较少线程上下文切换。
2)任务数量放不满 Blocking Queue
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j(topic = "c.TestPool")
public class Test1AndPoolHandle1 {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10);
for (int i = 0; i < 5; i ++ ) {
int j = i;
threadPool.execute(() -> {
log.debug("{}", j);
});
}
}
}
@Slf4j(topic = "c.TestPool")
class ThreadPool {
// 线程池大小
private int capacity;
// 队列
private BlockQueue<Runnable> blockQueue;
// 线程队列
private HashSet<Worker> workers = new HashSet<>();
private long timeout;
private TimeUnit timeUnit;
public ThreadPool(int capacity, long timeout, TimeUnit unit, int queueSize) {
this.timeout = timeout;
this.timeUnit = unit;
this.capacity = capacity;
blockQueue = new BlockQueue<>(queueSize);
}
// 执行任务
public void execute(Runnable runnable) {
synchronized (workers) {
if (workers.size() < capacity) {
Worker worker = new Worker(runnable);
log.debug("开始创建线程...{}", worker);
workers.add(worker);
worker.start();
} else {
log.debug("线程已满...加入队列");
blockQueue.put(runnable);
}
}
}
class Worker extends Thread{
private Runnable runnable;
public Worker(Runnable runnable) {
this.runnable = runnable;
}
@Override
public void run() {
//执行任务
// 1) 当任务不为空是,执行任务
// 2) 当任务执行完毕,接着从队列中获取任务执行
//这里有赋值不用上锁,是因为poll跟take实现都加了锁
// while ((runnable != null) || (runnable = blockQueue.take()) != null) {
while ((runnable != null) || (runnable = blockQueue.poll(timeout, timeUnit)) != null) {
try {
log.debug("正在执行。。。{}", runnable);
runnable.run();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
runnable = null;
}
}
synchronized (workers) {
log.debug("{} 被移除了", this);
workers.remove(this);
}
}
}
}
@Slf4j(topic = "c.TestPool")
class BlockQueue<T> {
// 队列大小
private int capacity;
//存放队列
private Deque<T> deque = new ArrayDeque<>();
// 一把锁
private ReentrantLock lock = new ReentrantLock();
// 等待条件
private Condition fullCondition = lock.newCondition();
private Condition emptyCondition = lock.newCondition();
public BlockQueue(int capacity) {
this.capacity = capacity;
}
public int size() {
return deque.size();
}
// 获取任务 设置超时时间
public T poll(long time, TimeUnit unit) {
try {
lock.lock();
long nanos = unit.toNanos(time);
while (deque.isEmpty()) {
try {
if (nanos <= 0) {
log.debug("超时结束");
return null;
}
//进行等待
nanos = emptyCondition.awaitNanos(nanos);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return deque.removeFirst();
} finally {
fullCondition.signal();
lock.unlock();
}
}
// 获取任务
public T take() {
try {
lock.lock();
while (deque.isEmpty()) {
try {
emptyCondition.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// log.debug("删除队列头...");
return deque.removeFirst();
} finally {
fullCondition.signal();
lock.unlock();
}
}
// 阻塞添加存放任务
public void put(T runnable) {
try {
lock.lock();
while (deque.size() == capacity) {
log.debug("等待加入任务队列 {} ...", runnable);
try {
fullCondition.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
log.debug("存放到队列尾");
deque.addLast(runnable);
emptyCondition.signal();
} finally {
lock.unlock();
}
}
}
2)任务数量放满 Blocking Queue (改进)
1. 带超时时间的 阻塞添加
// 带超时时间的存放任务
public boolean offer(T runnable, long time, TimeUnit unit) {
try {
lock.lock();
long nanos = unit.toNanos(time);
while (deque.size() == capacity) {
log.debug("等待加入任务队列 {} ...", runnable);
try {
if (nanos <= 0) {
log.debug("等待加入任务队列超时 {} ...", runnable);
return false;
}
nanos = fullCondition.awaitNanos(nanos);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
log.debug("线程已满...加入队列{}",runnable);
deque.addLast(runnable);
emptyCondition.signal();
return true;
} finally {
lock.unlock();
}
}
2. 设计模式 之 策略模式
如果存放的队列已经满了,之前的版本就进入阻塞死等了,那么我们可以改进的方式
- 带超时时间的等待
- 让调用者放弃执行
- 让调用者自己执行
- 让调用者抛出异常
- ... 等
有很多方式,如果写死的话,就会有很多else if,那么定死了,没有扩展性,所以使用策略模式,将权利下放,利用函数式接口,让调用者传进来拒绝策略。
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j(topic = "c.TestPool")
public class Test1AndPoolHandle1 {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task) -> {
// 1、 死等
//queue.put(task);
// 2、 带超时时间等待
// queue.offer(task, 1500, TimeUnit.MILLISECONDS);
// 3、 让调用者放弃
// log.debug("放弃任务{}", task);
// 4、 让调用者抛出异常
// throw new RuntimeException("跑出异常" + task);
// 5、 让调用者自己执行
task.run();
});
for (int i = 0; i < 3; i ++ ) {
int j = i;
threadPool.execute(() -> {
try {
Thread.sleep(1000L);
log.debug("{}", j);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
}
}
@FunctionalInterface
interface RejectFunction<T> {
void reject(BlockQueue<T> tBlockQueue, T task);
}
@Slf4j(topic = "c.TestPool")
class ThreadPool {
// 线程池大小
private int capacity;
// 队列
private BlockQueue<Runnable> blockQueue;
// 线程队列
private HashSet<Worker> workers = new HashSet<>();
private long timeout;
private TimeUnit timeUnit;
private RejectFunction<Runnable> rejectFunction;
public ThreadPool(int capacity, long timeout, TimeUnit unit, int queueSize, RejectFunction<Runnable> rejectFunction) {
this.timeout = timeout;
this.timeUnit = unit;
this.capacity = capacity;
blockQueue = new BlockQueue<>(queueSize);
this.rejectFunction = rejectFunction;
}
// 执行任务
public void execute(Runnable runnable) {
synchronized (workers) {
if (workers.size() < capacity) {
Worker worker = new Worker(runnable);
log.debug("开始创建线程...{}", worker);
workers.add(worker);
worker.start();
} else {
// blockQueue.put(runnable);
blockQueue.tryPut(rejectFunction, runnable);
}
}
}
class Worker extends Thread{
private Runnable runnable;
public Worker(Runnable runnable) {
this.runnable = runnable;
}
@Override
public void run() {
//执行任务
// 1) 当任务不为空是,执行任务
// 2) 当任务执行完毕,接着从队列中获取任务执行
//这里有赋值不用上锁,是因为poll跟take实现都加了锁
// while ((runnable != null) || (runnable = blockQueue.take()) != null) {
while ((runnable != null) || (runnable = blockQueue.poll(timeout, timeUnit)) != null) {
try {
log.debug("正在执行。。。{}", runnable);
runnable.run();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
runnable = null;
}
}
synchronized (workers) {
log.debug("{} 被移除了", this);
workers.remove(this);
}
}
}
}
@Slf4j(topic = "c.BlockQueue")
class BlockQueue<T> {
// 队列大小
private int capacity;
//存放队列
private Deque<T> deque = new ArrayDeque<>();
// 一把锁
private ReentrantLock lock = new ReentrantLock();
// 等待条件
private Condition fullCondition = lock.newCondition();
private Condition emptyCondition = lock.newCondition();
public BlockQueue(int capacity) {
this.capacity = capacity;
}
public int size() {
return deque.size();
}
// 获取任务 设置超时时间
public T poll(long time, TimeUnit unit) {
try {
lock.lock();
long nanos = unit.toNanos(time);
while (deque.isEmpty()) {
try {
if (nanos <= 0) {
log.debug("超时结束");
return null;
}
//进行等待
nanos = emptyCondition.awaitNanos(nanos);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return deque.removeFirst();
} finally {
fullCondition.signal();
lock.unlock();
}
}
// 获取任务
public T take() {
try {
lock.lock();
while (deque.isEmpty()) {
try {
emptyCondition.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// log.debug("删除队列头...");
return deque.removeFirst();
} finally {
fullCondition.signal();
lock.unlock();
}
}
// 存放任务
public void put(T runnable) {
try {
lock.lock();
while (deque.size() == capacity) {
log.debug("等待加入任务队列 {} ...", runnable);
try {
fullCondition.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
log.debug("线程已满...加入队列{}",runnable);
deque.addLast(runnable);
emptyCondition.signal();
} finally {
lock.unlock();
}
}
// 带超时时间的存放任务
public boolean offer(T runnable, long time, TimeUnit unit) {
try {
lock.lock();
long nanos = unit.toNanos(time);
while (deque.size() == capacity) {
log.debug("等待加入任务队列 {} ...", runnable);
try {
if (nanos <= 0) {
log.debug("等待加入任务队列超时 {} ...", runnable);
return false;
}
nanos = fullCondition.awaitNanos(nanos);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
log.debug("线程已满...加入队列{}",runnable);
deque.addLast(runnable);
emptyCondition.signal();
return true;
} finally {
lock.unlock();
}
}
public void tryPut(RejectFunction<T> rejectFunction , T runnable) {
lock.lock();
try {
if (deque.size() == capacity) {
rejectFunction.reject(this, runnable);
} else {
log.debug("线程已满...加入队列{}",runnable);
deque.addLast(runnable);
emptyCondition.signal();
}
} finally {
lock.unlock();
}
}
}
二、ThreadPoolExecutor
1)线程池状态
ThreadPoolExecutor 使用 int 的高3位来表示线程池状态,低29位表示线程数量。
从数字上比较 RUNNING > SHUTDOWN > STOP > TIDYING > TERMINATED
为什么111 比 00小,因为是一个整数的高3位,所以第一位为1表示负数。
问题:
为什么使用一个整数表示线程池状态和线程数量,不用两个变量来表示呢?
答:
因为要保证在对 线程池状态 和 线程数量 进行赋值操作时的原子性
那么他存在一个变量中,就只需要一次cas操作就可以保证原子性,既可以保存状态信息,也可以保存数量信息。
如果存在两个变量中,就需要两次cas操作才可以保证原子性,才可以对两个变量进行赋值操作。
2)构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
假设 核心线程= 2 救急线程=1 阻塞队列=2
- 一开始任务阻塞队列空闲,线程也空闲
- 任务1 创建 核心线程1 执行
- 任务2 创建 核心线程2 执行
- 核心线程满了,任务3 进入 阻塞队列
- 核心线程满了,任务4 进入 阻塞队列
- 核心线程满了, 阻塞队列满了,任务5 创建 救急线程1 执行
- 核心线程满了, 阻塞队列满了,救急线程满了,任务6执行拒绝策略
救急线程空闲一定时间会自动销毁,等待下次用到在创建
核心线程一直运行。
触发救急线程创建的前提 是配合有界队列来使用
如果线程到达 最大线程数量 仍然有新的任务,这时会执行拒绝策略。
拒绝策略 jdk 提供了 4 种实现,其他著名框架也提供了实现。
3)newFixedThreadPool (固定大小线程池)
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
//可以给第二参数,传线程工厂,提供创建新线程的名字。
特点:
- 核心线程 == 最大线程数 (没有救急线程),因此无需超时时间
- 阻塞队列是无界的,没有指定大小,可以放任务数量任务
评价
适用于任务量已知,相对耗时的任务
4)newCachedThreadPool (带缓冲线程池)
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
特点
- 核心线程为0,最大线程为Integer.MAX_VALUE,救急线程的空闲生存时间 60s
-
- 也就是全部都是救急线程(60s后可以回收)
- 救急线程可以无限创建
- 队列采用 SynchronousQueue ,特点是没有容量,没有线程来获取的时候放不进去任务(类似于 一手交钱、一手交货)
- 刚好当前线程池全部都是救急线程,每个任务都会创建一个新的线程来获取任务,合适
评价
整个线程池会不断创建新的线程,任务量有多少线程就会创建多少,然后空闲60s后释放线程
适用于任务量比较密集,但是任务执行时间比较短的情况
5)newSingleThreadExecutor (单线程线程池)
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
使用场景:
希望多个任务串行执行。线程数固定为1,当任务数量多于1时,会存放到阻塞队列中(无界)。任务执行完毕,线程也不会被摧毁释放。
那么跟 固定大小线程池设置成1 或者 自己创建一个线程执行 有什么区别呢
- 自己创建一个线程执行,如果中间发生了异常,那么线程销毁了,阻塞队列中的任务就不执行了,没有什么补救措施,但是如果单线程线程池还会创建一个新的线程,保证正常工作。
- newFixedThreadPool(1) 固定大小线程池设置成1,返回的对象还是可以修改的
-
- 返回的对象还是ThreadPoolExecutor对象,可以强转后调用setCorePoolSize 等方法修改。
- 单线程线程池 newSingleThreadExecutor() 始终线程数量为1,不能修改
-
- 返回的对象 通过new FinalizableDelegatedExecutorService进行了包装,应用了一种装饰器模式,只对外暴露 ExecutorService 接口, 因此不能调用 ThreadPoolExecutor 中特有方法修改。
6)提交任务
1. ALL
2. submit 演示
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j(topic = "c.pool")
public class Test2SubmitShow {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建固定大小线程池
ExecutorService pool = Executors.newFixedThreadPool(2);
//带有返回结果的submit执行,可以使用lambda表达式
Future<String> future = pool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
log.debug("执行");
Thread.sleep(1000);
return "ok";
}
});
//获取值,会等待submit执行完,相当于保护性暂停模式
log.debug("{}", future.get());
}
}
3. invokeAll 演示
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
@Slf4j(topic = "c.pool")
public class Test2Show {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建固定大小线程池
ExecutorService pool = Executors.newFixedThreadPool(3);
// 调用 invokeAll 执行一个链表中的所有任务,然后将所有返回值收集成一个list 类型是Future
// 可以设置超时时间, 如果规定时间内没有执行完所有任务,直接终止
List<Future<Object>> futures = pool.invokeAll(Arrays.asList(
() -> {
log.debug("begin");
Thread.sleep(1000);
return "1";
},
() -> {
log.debug("begin");
Thread.sleep(500);
return "2";
},
() -> {
log.debug("begin");
Thread.sleep(2000);
return "3";
}
));
// 便利所有Future,等待所有执行完毕之后输出
// 如果线程数量是2 任务3进入队列等 所以等待时间 500 + 2000
// 如果线程数量是3 任务3直接与信工 等待时间 2000
futures.forEach(f -> {
try {
log.debug("{}", f.get());
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
});
}
}
4. invokeAny 演示
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
@Slf4j(topic = "c.pool")
public class Test2SubmitShow {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建固定大小线程池
ExecutorService pool = Executors.newFixedThreadPool(3);
// 调用 invokeAny 执行一个链表中的所有任务,然后将执行最快的一个任务的返回值返回,然后其他任务不执行了
// 如果只有一个线程, 那么只有第一个任务被执行,所以只会返回第一个任务的结果
String result = pool.invokeAny(Arrays.asList(
() -> {
log.debug("begin");
Thread.sleep(1000);
log.debug("end");
return "1";
},
() -> {
log.debug("begin");
Thread.sleep(500);
log.debug("end");
return "2";
},
() -> {
log.debug("begin");
Thread.sleep(2000);
log.debug("end");
return "3";
}
));
//最后拿到 的是2 然后1 跟 3的end没输出
log.debug("{}", result);
}
}
7)关闭线程池
shutdown
showdownNow
其他方法
三、设计模式 之 异步模式 之 工作线程
1) 定义
- 让有限的工作线程(Worker Thread)可以轮流异步处理无限多的任务,也可以归类为分工模式。
- 他的典型实现是 线程池,体现了经典设计模式中的享元模式。
例如:餐厅的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客户分配一位服务员,那么成本就太高了
(对比另外一种多线程设计模式:Thread-Pre-Message 这个是一个任务创建一个新线程处理)
注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率。
例如:如果一个餐厅的工人既要招呼客人(任务类型A),又要后厨做菜(任务类型B) 显然效率很低,因为他全都要干,
这时候如果分成招呼客人一批人(线程池A),做菜一批人(线程池B)更为合理,更细致的分工效率跟更高。
2)饥饿
这里的饥饿跟 synchronized 那里的饥饿有点区别,这里不是因为锁导致的,而是因为线程不足导致的,现象类似于死锁,但是不是锁导致的。
一般是固定大小线程池有饥饿现象。
- 有两个工人在同一个线程池中表示两个线程。
- 两个工人是全能的,既能做饭,又能处理点餐,有以下两种阶段情景
-
- 1、有一个客人点餐,工人A处理点餐,等待上菜,工人B做菜,工人A上菜,一气呵成,分工合作很顺利
- 2、有两个客人同时点餐,工人A和工人B同时处理点餐,同时等待上菜,没有额外工人(线程)去做菜,这时候就死锁了。
情况1
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
@Slf4j(topic = "c.pool")
public class Test3Executor {
static List<String> list = Arrays.asList("红烧牛肉1", "红烧牛肉2", "牛肉3", "红烧牛肉4");
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(2);
// 有人点餐,执行任务
pool.execute(() -> {
log.debug("点餐");
Future<String> future = pool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
log.debug("做菜");
Thread.sleep(1000);
return list.get(0);
}
});
try {
log.debug("做完{}", future.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
}
// 21:17:54.256 [pool-1-thread-1] DEBUG c.pool - 点餐
// 21:17:54.261 [pool-1-thread-2] DEBUG c.pool - 做菜
// 21:17:55.262 [pool-1-thread-1] DEBUG c.pool - 做完红烧牛肉1
情况2:
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
@Slf4j(topic = "c.pool")
public class Test3Executor {
static List<String> list = Arrays.asList("红烧牛肉1", "红烧牛肉2", "牛肉3", "红烧牛肉4");
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(2);
// 有人点餐,执行任务
pool.execute(() -> {
log.debug("点餐");
Future<String> future = pool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
log.debug("做菜");
Thread.sleep(1000);
return list.get(0);
}
});
try {
log.debug("做完{}", future.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
// 有人点餐,执行任务
pool.execute(() -> {
log.debug("点餐");
Future<String> future = pool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
log.debug("做菜");
Thread.sleep(1000);
return list.get(0);
}
});
try {
log.debug("做完{}", future.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
}
// 21:19:36.930 [pool-1-thread-1] DEBUG c.pool - 点餐
// 21:19:36.930 [pool-1-thread-2] DEBUG c.pool - 点餐
3)解决饥饿
解决1(不全面):
- 增加核心线程来解决,但是如果同时比较多,还是没有办法解决。
解决2(全面):
- 不同功能的线程放到不同线程池
package com.itheima.test8;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
@Slf4j(topic = "c.pool")
public class Test3Executor {
static List<String> list = Arrays.asList("红烧牛肉1", "红烧牛肉2", "牛肉3", "红烧牛肉4");
public static void main(String[] args) {
ExecutorService cookPool = Executors.newFixedThreadPool(1);
ExecutorService waitPool = Executors.newFixedThreadPool(1);
// 有人点餐,执行任务
waitPool.execute(() -> {
log.debug("点餐1");
Future<String> future = cookPool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
log.debug("做菜1");
Thread.sleep(1000);
return list.get(0);
}
});
try {
log.debug("做完1{}", future.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
// 有人点餐,执行任务
waitPool.execute(() -> {
log.debug("点餐2");
Future<String> future = cookPool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
log.debug("做菜2");
Thread.sleep(1000);
return list.get(2);
}
});
try {
log.debug("做完2{}", future.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
}
// 21:35:52.924 [pool-2-thread-1] DEBUG c.pool - 点餐1
// 21:35:52.924 [pool-2-thread-2] DEBUG c.pool - 点餐2
// 21:35:52.927 [pool-1-thread-1] DEBUG c.pool - 做菜1
// 21:35:53.928 [pool-1-thread-1] DEBUG c.pool - 做菜2
// 21:35:53.928 [pool-2-thread-1] DEBUG c.pool - 做完1红烧牛肉1
// 21:35:54.930 [pool-2-thread-2] DEBUG c.pool - 做完2牛肉3
4)创建多少线程池合适?
- 过小会导致不能充分利用cpu系统资源,容易导致饥饿
- 过大会导致频繁发生线程上下文切换,占用更多内存
1. CPU 密集型运算
通常采用 cpu核数 + 1 能够实现最优的CPU利用率, +1是保证当线程由于页缺失故障(操作系统)或其他原因导致暂停时,
额外的这个线程能够顶上去,保证CPU时钟周期不被浪费。常用于数据分析时。
2. I/O 密集型运算
CPU 不总是在繁忙状态,例如,当你执行业务计算时,这时候会使用CPU资源,但是当你执行I/O操作时、远程RPC调用、数据库操作时等,这时候CPU资源就空闲了
你可以利用多线程提高他的利用率。
经验公式如下:
- 线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间 +等待时间) / CPU 计算时间
例如
4 核CPU计算时间是50%,其他等待时间是50%,期望cpu被100%利用, 套用公式
4 * 100% * 100% / 50% = 8
四、任务调度线程池
有些时候我们需要延迟执行任务(延迟几秒后执行),或者需要循环执行任务(每隔几秒执行一次),这时候就需要用到任务调度的线程池了。
JDK5的时候加入任务调度线程池
JDK5之前 可以使用java.util.Timer 来实现定时功能
1. Timer
优点:简单易用,
缺点:所有任务都由一个独立线程执行,所有任务都是串行执行的,如果中间有一个任务发生异常,后面的任务也不会运行了。
@Slf4j(topic = "c.pool")
public class Test4 {
public static void main(String[] args) {
Timer timer = new Timer();
TimerTask task1 = new TimerTask() {
@Override
public void run() {
log.debug("task 1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
TimerTask task2 = new TimerTask() {
@Override
public void run() {
log.debug("task 2");
try {
Thread.sleep(0);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
log.debug("start");
// 一秒后执行 task1 task2
timer.schedule(task1, 1000);
timer.schedule(task2, 1000);
}
}
// 16:17:24.730 [main] DEBUG c.pool - start
// 16:17:25.734 [Timer-0] DEBUG c.pool - task 1
// 16:17:27.735 [Timer-0] DEBUG c.pool - task 2
2. ScheduledExecutorService
解决了Timer的缺点,抛出异常不会终止其他任务,还可以设置多个线程参与调度。
如果ScheduledExecutorService(1) 设置成1,那么也是串行执行,但是不会终止其他任务
a. 延迟任务
package com.itheima.test8;
import lombok.extern.slf4j.Slf4j;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Slf4j(topic = "c.pool")
public class Test4 {
public static void main(String[] args) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
pool.schedule(() -> {
log.debug("task1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, 1, TimeUnit.SECONDS);
pool.schedule(() -> {
log.debug("task2");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, 1, TimeUnit.SECONDS);
}
}
// 16:23:45.724 [pool-1-thread-1] DEBUG c.pool - task1
// 16:23:45.724 [pool-1-thread-2] DEBUG c.pool - task2
b. 定时任务
ⅰ. scheduleAtFixedRate
@Slf4j(topic = "c.pool")
public class Test4 {
public static void main(String[] args) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start");
// 延迟1秒执行,然后每隔1秒执行一次
// 如果任务的延迟比定时还长,那么所有任务会在上一个任务执行完之后 紧接着运行
pool.scheduleAtFixedRate(() -> {
log.debug("task1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, 1, 1, TimeUnit.SECONDS);
}
}
// 16:33:06.263 [main] DEBUG c.pool - start
// 16:33:07.303 [pool-1-thread-1] DEBUG c.pool - task1
// 16:33:09.303 [pool-1-thread-1] DEBUG c.pool - task1
// 16:33:11.303 [pool-1-thread-1] DEBUG c.pool - task1
ⅱ. scheduleWithFixedDelay
@Slf4j(topic = "c.pool")
public class Test4 {
public static void main(String[] args) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start");
// 不管任务执行多长时间,都会等他工作完成之后再延迟一秒执行一次
pool.scheduleWithFixedDelay(() -> {
log.debug("task1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, 1, 1, TimeUnit.SECONDS);
}
}
// 16:35:55.014 [main] DEBUG c.pool - start
// 16:35:56.048 [pool-1-thread-1] DEBUG c.pool - task1
// 16:35:59.051 [pool-1-thread-1] DEBUG c.pool - task1
// 16:36:02.051 [pool-1-thread-1] DEBUG c.pool - task1
正确处理线程池异常
- try catch 手动捕捉
- 普通线程池可以使用配合feture的返回值get()等待,如果有异常,返回值会是异常描述
3. 线程池应用
如何让每周日 18:00:00 定时执行任务?
import java.time.DayOfWeek;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class TestTaskRun {
// 如何让每周日 18:00:00 定时执行任务?
public static void main(String[] args) {
// 获取当前时间
LocalDateTime now = LocalDateTime.now();
System.out.println("now = " + now);
// 获取本周周日时间
LocalDateTime time = now.withHour(18).withMinute(0).withSecond(0).withNano(0).with(DayOfWeek.THURSDAY);
// 如果当前时间 > 本周周四,那么获取的time是不对的,需要是下周的
if (now.isAfter(time)) {
time = time.plusWeeks(1);
}
System.out.println("time = " + time);
long period = 1000 * 60 * 60 * 24 * 7; // 一周时间
long initialDelay = Duration.between(now, time).toMillis(); //获取两个时间之间的毫秒差值
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
pool.scheduleAtFixedRate(() -> {
System.out.println("running...");
}, initialDelay, period, TimeUnit.MILLISECONDS);
}
}
五、Tomcat 线程池
浏览器发出一个请求
首先会经过LimitLatch(作用是限流,控制最大连接数,防止太多连接把服务器压垮),类似于J.U.C 中的 Semaphore 后面再讲
当没有超过最大连接数,到达第二个组件 Acceptor(本质上是一个线程,不断循环看有没有新的连接,主要就是负责接受socket连接,其他不管)
然后Poller (也是一个线程,死循环看连接上是否有这种可读的事件发生,但是也是负责监听是否有,如果有交给一个Executor来执行)来负责看是否有IO的读写操作
一旦Poller 发现可读,会封装一个任务对象(socketProcessor实现了runnable),提交个Executor线程池处理
Executor 线程池中的工作线程(核心救急线程模式)最终负责【处理请求】
Tomcat 线程池扩展了 ThreadPoolExecutor ,行为稍有不同
- 如果总线程数达到 最大线程数 并且队列也满了
-
- 这时不会立刻抛出 RejectedExecutionException 异常
- 而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常
源码 tomcat- 7.0.42
tomcat 线程池配置
tomcat 线程池相对于 java原生线程池,tomcat进行了一些改进,主要体现在核心在于Tomcat自己定义的任务队列。
TaskQueue —— offer(Runnable o)方法
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
private transient volatile ThreadPoolExecutor parent = null;
public TaskQueue() {
super();
}
public void setParent(ThreadPoolExecutor tp) {
parent = tp;
}
@Override
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) {
return super.offer(o);
}
//we are maxed out on threads, simply queue the object
// 如果线程数量已经达到最大数量,则进入队列等待执行
if (parent.getPoolSizeNoLock() == parent.getMaximumPoolSize()) {
return super.offer(o);
}
//we have idle threads, just add it to the queue
// 执行到这里 最大线程数 > 当前线程数 > 核心线程数。
// 如果提交的任务数 <= 当前线程数则说明存在空闲线程,则提交到任务队列,等待执行
if (parent.getSubmittedCount() <= parent.getPoolSizeNoLock()) {
return super.offer(o);
}
//if we have less threads than maximum force creation of a new thread
// 执行到这,说明提交的任务数已经大于当前线程数,需要创建新的线程
if (parent.getPoolSizeNoLock() < parent.getMaximumPoolSize()) {
return false;
}
//if we reached here, we need to add it to the queue
return super.offer(o);
}
}
- submittedCount:Tomcat自定义的ThreadPoolExecutor中使用该字段来标记当前已提交的任务数,提交加一,执行完减一。
- poolSizeNoLock:当前执行的线程数
- 判断当前是否需要创建空闲线程执行任务,而不是全放入队列,因为LinkedBlockingQueue默认是无界的,但也可以设计最大数量;
六、Fork / Join线程池
1)概念
- JDK 1.7 之后加入的新的线程池实现, 使用了分治思想,适用于能够进行任务拆分的 cpu 密集型运算。
- 所谓任务拆分,是一个大任务拆分算法上相同的小任务,直到不能再拆分然后求解,最后合并,类似于归并排序,裴波那契数列等思想
- 分治思想的基础上加入了多线程,每个任务的分解和合并交给不同线程执行,进一步提升运算效率。
- 默认会创建与 cpu 核数大小相同的线程池。
2)使用
1. 拆分不完善(导致串行)
package com.itheima.test8;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
@Slf4j(topic = "c.TestForkJoin")
public class TestForkJoin {
public static void main(String[] args) {
ForkJoinPool poll = new ForkJoinPool(4);
// 运行调用很简单
System.out.println(poll.invoke(new MyTask(5)));
}
}
@Slf4j(topic = "c.MyTask")
// 计算机1 - n之间的和
class MyTask extends RecursiveTask<Integer> {
private int n;
public MyTask(int n) {
this.n = n;
}
@Override
public String toString() {
return "MyTask{" +
"n=" + n +
'}';
}
@Override
protected Integer compute() {
// 递归结束条件
if (n == 1) {
log.debug("join() {}", n);
return 1;
}
//分解成子问题
MyTask myTask = new MyTask(n - 1);
myTask.fork(); //分配线程执行
log.debug("fork() {} + {}", n, myTask);
//获取线程返回结果
Integer join = myTask.join();
log.debug("join() {} + {} = {}", n, myTask, n + join);
return n + join; //然后拼接子问题返回答案
}
}
// 21:07:44.838 [ForkJoinPool-1-worker-1] DEBUG c.MyTask - fork() 5 + MyTask{n=4}
// 21:07:44.838 [ForkJoinPool-1-worker-2] DEBUG c.MyTask - fork() 4 + MyTask{n=3}
// 21:07:44.838 [ForkJoinPool-1-worker-0] DEBUG c.MyTask - fork() 2 + MyTask{n=1}
// 21:07:44.838 [ForkJoinPool-1-worker-3] DEBUG c.MyTask - fork() 3 + MyTask{n=2}
// 21:07:44.841 [ForkJoinPool-1-worker-0] DEBUG c.MyTask - join() 1
// 21:07:44.841 [ForkJoinPool-1-worker-0] DEBUG c.MyTask - join() 2 + MyTask{n=1} = 3
// 21:07:44.842 [ForkJoinPool-1-worker-3] DEBUG c.MyTask - join() 3 + MyTask{n=2} = 6
// 21:07:44.842 [ForkJoinPool-1-worker-2] DEBUG c.MyTask - join() 4 + MyTask{n=3} = 10
// 21:07:44.842 [ForkJoinPool-1-worker-1] DEBUG c.MyTask - join() 5 + MyTask{n=4} = 15
// 15
2. 分治思想(并行)
package com.itheima.test8;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
@Slf4j(topic = "c.TestForkJoin")
public class TestForkJoin {
public static void main(String[] args) {
ForkJoinPool poll = new ForkJoinPool(4);
// 运行调用很简单
System.out.println(poll.invoke(new AddTask(1, 5)));
}
}
@Slf4j(topic = "c.AddTask")
// 计算机1 - n之间的和
class AddTask extends RecursiveTask<Integer> {
private int start;
private int end;
public AddTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
public String toString() {
return "AddTask{" +
"start=" + start +
", end=" + end +
'}';
}
@Override
protected Integer compute() {
// 递归结束条件
if (start == end) {
log.debug("join() {}", start);
return start;
}
if (end == start + 1) {
log.debug("join() {}", end + start);
return end + start;
}
//分解成子问题
int mid = (start + end) / 2;
AddTask addTask1 = new AddTask(start, mid);
addTask1.fork(); //分配线程执行
log.debug("fork() {}", addTask1);
AddTask addTask2 = new AddTask(mid + 1, end);
addTask2.fork(); //分配线程执行
log.debug("fork() {}", addTask2);
//获取线程返回结果
Integer join = addTask1.join() + addTask2.join();
log.debug("join() {} , {} = {}", start, end, join);
return join; //然后拼接子问题返回答案
}
}
七、AQS原理
1)概述
2)自定义锁
自定义一个同步器组件,例如可重入锁那些都是这么做。在你自定义的同步器组件内部定义一个AQS类的内部类,所有操作交给这个内部类完成,我们的自定义同步组件只需要调用这个内部类的方法
这里要注意,acquire()是AQS实现的模板方法可以直接用,tryAquire()是自定义的加锁方法, 模板会调用此自定义实现方法
它的acquire、release方法是aqs的固定机制,比如怎么把线程阻塞,怎么加入队列等等,而tryxx是自己实现的,自己决定是否可重入,是否可共享等等
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@Slf4j(topic = "c.MyLock")
public class Test6Lock {
public static void main(String[] args) {
MyLock lock = new MyLock();
new Thread(() -> {
lock.lock();
log.debug("lock");
lock.lock();
log.debug("lock");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
log.debug("unlock");
lock.unlock();
}
}).start();
// new Thread(() -> {
// lock.lock();
// log.debug("lock");
// try {
// Thread.sleep(1);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// } finally {
// log.debug("unlock");
// lock.unlock();
// }
// }).start();
}
}
// 自定义锁 (不可重入锁)
class MyLock implements Lock {
class MyAQS extends AbstractQueuedSynchronizer {
// 独占锁 同步器类
@Override
protected boolean tryAcquire(int arg) {
// cas 操作是否可以获得锁
if (compareAndSetState(0, 1)) {
//然后锁的Owner 是当前线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
// 释放锁之后清空Owner
setExclusiveOwnerThread(null);
// 因为是volatile 所以因为写屏障,所以前面不会重排指令到state后面,同步到主存
setState(0);
return true;
}
@Override //是否持有独占锁
protected boolean isHeldExclusively() {
return getState() == 1;
}
public Condition newCondition() {
return new ConditionObject();
}
}
MyAQS sync = new MyAQS();
@Override //加锁 (加锁失败,进入队列)
public void lock() {
sync.acquire(1);
}
@Override // 加锁,但是可以打断
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override // 尝试加锁(一次)
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override // 尝试加锁,带超时
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override //释放锁
public void unlock() {
sync.release(1); //释放锁,底层还会唤醒等待的线程
}
@Override //创建条件变量
public Condition newCondition() {
return sync.newCondition();
}
}
八、ReentrantLock 原理
1)加锁成功流程
非公平锁实现原理:
加锁解锁流程:
构造器方法,默认是非公平锁实现
public ReentrantLock() {
sync = new NonfairSync();
}
NonfairSync 继承自 AQS,原理类似于刚才的AQS实现自定义锁
2)加锁失败流程
第一次尝试:如果已经被别人加锁了,调用acquire
第二次尝试:如果这时候加锁的人已经释放,则不需要继续往下,否则就创建AddWaiter构造一个链表(即head、tail),添加一个Node链表节点,加入队列
- 首次创建会创建两个,第一个Node称为 Dummy(哑元)或哨兵,用来占位,并不关联线程。
- 黄色三角形表示该Node的 waitStatus 状态,默认 0 为正常状态
进入acquireQueue () 逻辑
第三个尝试:循环判断当前节点的前驱节点是不是头结点,说明他是第一个节点,有资格去再次尝试获取锁
进入 shouldParkAfterFailAcquire()逻辑
把前驱节点改成状态改成 -1,代表前驱节点有资格唤醒后继结点,因为如果进行park ,需要有人唤醒,第一次会方法会返回false
第四次尝试:再次尝试获取,获取失败,再次进入shouldParkAfterFailAcquire,因为前驱节点node已经是状态-1,所以这次返回true,进行一个park,阻塞住
3)解锁竞争成功流程
如果有多个线程经历上述过程竞争失败,变成这个样子
这时候 Thread - 0 释放锁,进入 tryRelease 流程,如果成功
- 设置 exclusiveOwnerThread 为 null
- state = 0
然后判断头结点是不是不为空,并且 waitStatus 状态不为0,那么就执行 unparkSuccessor 唤醒距离头结点最近的节点,然后他有资格继续尝试获取锁
获取到锁之后,设置state = 1, 并且 exclusiveOwnerThread = 当前线程,然后将头结点替换成当前节点 】
4)解锁竞争失败流程
因为是非公平锁,此时有另外一个线程Thread4,不是队列中的,而是第一次访问的线程,那么就会发生一个竞争
Thread-1 如果竞争失败,Thread-4 的线程就会设置成 exclusiveOwnerThread ,state = 1
Thread - 1 竞争失败再次进入 acquireQueue 流程,获取锁失败,重新进入 park 阻塞
公平锁的意思也就是 新线程来的时候会不会直接加入队列中。
5)锁重入原理
6)可打断原理
不可打断锁原理:
打断不会影响到获取锁停止,只有获得锁之后才会返回打断标记
可打断原理:
如果打断之后直接抛出异常,不会继续获取锁了
7)公平锁原理
公平锁原理:新线程进来获取锁的时候,判断是不是队列中有线程等待,来判断是否可以进行竞争锁。
8)条件变量 - await
每个条件变量其实对应着一个等待队列, 其实现类是 ConditionObject
开始Thread-0持有锁,调用await,进入 COnditionObject 的 addConditionWaiter 流程
创建 新的 Node 状态为 -2(Node.CONDITION) ,关联 Thread-0,加入等待队列尾部
然后进入 AQS 的fullyRelease 流程, 释放同步器上的锁,也就是当前线程的所有锁
为什么是fullyRelease 不是 因为可能发生了锁重入,所以需要释放所有锁
state = 0, exclusiveOwnerThread = null
然后unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么Thread - 1 竞争成功。
state = 1, exclusiveOwnerThread = thread1
9)条件变量 - signal
假设 Thread- 1 调用 signal 要来唤醒 条件变量中等待的 Thread - 0
调用 signal 方法,然后唤醒 条件变量中的 第一个节点,然后条件变量中的当前点 置为null,
转移成功 然后加入到等待队列队尾等待
转移失败 因为有些时候有时限的等待超时,或者被打断,那么就没有必要加入到队列队尾
加入队列,获取前一个节点,然后设置state 为 -1,因为最后一个是0(前一个节点设置成-1,表示有资格唤醒unpark新加入的线程)
如果前一个节点的状态 > 0 ,表示被取消了,那么需要唤醒当前线程,去找能够当做前驱节点的点。