💓 博客主页:从零开始的-CodeNinja之路
⏩ 收录专栏:线程池,定时器以及阻塞队列(生产者/消费者模型)
🎉欢迎大家点赞👍评论📝收藏⭐文章
实现线程池,定时器以及阻塞队列,生产者/消费者模型
- 线程池
- 线程池是什么
- Executors创建线程池的四种方式
- 线程池底层ThreadPoolExecutor的六大参数
- 拒绝策略
- 实现线程池
- 定时器
- 定时器的定义
- 标准库中的定时器
- 定时器的实现
- 阻塞队列
- 阻塞队列的概念
- 生产者消费者模型
- 阻塞队列实现(生产者/消费者模型)
线程池
线程池是什么
虽然创建线程/销毁线程的开销
线程池相当于是一个大池子,池子里放着默认的10个已创建的空闲的线程,每当使用时直接从池子里取出一个空闲的线程进行使用,使用完以后在放回池子里面,这样就减少了每次创建线程和销毁线程的资源浪费.
Executors创建线程池的四种方式
- newFixedThreadPool:创建固定线程数的线程池
- newCachedThreadPool:创建线程数目动态增长的线程池.
- newSingleThreadExecutor:创建只包含单个线程的线程池.
- newScheduledThreadPool:设定延迟时间后执行命令,或者定期执行命令.是进阶版的Timer.
线程池底层ThreadPoolExecutor的六大参数
Executors本质上是ThreadPoolExecutor类的封装.
ThreadPoolExecutor提供了更多的可选参数,可以进⼀步细化线程池行为的设定.
corePoolSize:正式员工的数量.正式员工,⼀旦录用,永不辞退)
maximumPoolSize:正式员工+临时工的数目.(临时工:⼀段时间不干活,就被辞退).
keepAliveTime:临时工允许的空闲时间.
unit:keepaliveTime的时间单位,是秒,分钟,还是其他值.
workQueue:传递任务的阻塞队列
threadFactory:创建线程的工厂,参与具体的创建线程工作.通过不同线程工厂创建出的线程相当于
对⼀些属性进行了不同的初始化设置.
拒绝策略
RejectedExecutionHandler:拒绝策略,如果任务量超出公司的负荷了接下来怎么处理.
- AbortPolicy():超过负荷,直接抛出异常.
- CallerRunsPolicy():调用者负责处理多出来的任务.
- DiscardOldestPolicy():丢弃队列中最老的任务.
- DiscardPolicy():丢弃新来的任务.
实现线程池
核心操作为submit,将任务加入线程池中
-
使⽤Worker类描述一个工作线程.使用Runnable描述⼀个任务.
-
使用一个BlockingQueue组织所有的任务
-
worker线程要做的事情:不停的从BlockingQueue中取任务并执行.
-
指定⼀下线程池中的最大线程数maxWorkerCount;当当前线程数超过这个最大值时,就不再新增 线程了
.
class MyThreadPool {
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
// 通过这个⽅法, 来把任务添加到线程池中.
public void submit(Runnable runnable) throws InterruptedException {
queue.put(runnable);
}
// n 表⽰线程池⾥有⼏个线程.
// 创建了⼀个固定数量的线程池.
public MyThreadPool(int n) {
for (int i = 0; i < n; i++) {
Thread t = new Thread(() -> {
while (true) {
try {
// 取出任务, 并执⾏~~
Runnable runnable = queue.take();
runnable.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
}
}
}
// 线程池
public class Demo {
public static void main(String[] args) throws InterruptedException {
MyThreadPool pool = new MyThreadPool(4);
for (int i = 0; i < 1000; i++) {
pool.submit(new Runnable() {
@Override
public void run() {
// 要执⾏的⼯作
System.out.println(Thread.currentThread().getName() + " hell
}
});
}
}
}
定时器
定时器的定义
定时器也是软件开发中的⼀个重要组件.类似于⼀个"闹钟".达到⼀个设定的时间之后,就执行某个指定好的代码.
定时器是⼀种实际开发中非常常用的组件.
比如网络通信中,如果对方500ms内没有返回数据,则断开连接尝试重连.
比如⼀个Map,希望里面的某个key在3s之后过期(自动删除).
标准库中的定时器
- 标准库中提供了⼀个Timer类.Timer类的核心方法为 schedule .
- schedule 包含两个参数.第⼀个参数指定即将要执行的任务代码,第二个参数指定多长时间之后执行(单位为毫秒).
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("hello");
}
}, 3000);
定时器的实现
定时器的构成
- ⼀个带优先级队列(不要使用PriorityBlockingQueue,容易死锁!)
- 队列中的每个元素是⼀个Task对象.
- Task中带有⼀个时间属性,队首元素就是即将要执行的任务
- 同时有⼀个worker线程⼀直扫描队首元素,看队首元素是否需要执行
- Timer类提供的核心接口为schedule,用于注册⼀个任务,并指定这个任务多长时间后执行.
public class MyTimer {
public void schedule(Runnable command, long after) {
}
}
- Task类用于描述⼀个任务(作为Timer的内部类).里面包含⼀个Runnable对象和⼀个time(毫秒时间戳)
这个对象需要放到优先队列中.因此需要实现 Comparable 接口.
class MyTask implements Comparable<MyTask> {
public Runnable runnable;
// 为了⽅便后续判定, 使⽤绝对的时间戳.
public long time;
public MyTask(Runnable runnable, long delay) {
this.runnable = runnable;
// 取当前时刻的时间戳 + delay, 作为该任务实际执⾏的时间戳
this.time = System.currentTimeMillis() + delay;
}
@Override
public int compareTo(MyTask o) {
// 这样的写法意味着每次取出的是时间最⼩的元素.
// 到底是谁减谁?? 俺也记不住!!! 随便写⼀个, 执⾏下, 看看效果~~
return (int)(this.time - o.time);
}
}
- Timer实例中,通过PriorityQueue来组织若干个Task对象.通过schedule来往队列中插⼀入个个Task对象.
class MyTimer {
// 核⼼结构
private PriorityQueue<MyTask> queue = new PriorityQueue<>();
// 创建⼀个锁对象
private Object locker = new Object();
public void schedule(Runnable command, long after) {
// 根据参数, 构造 MyTask, 插⼊队列即可.
synchronized (locker) {
MyTask myTask = new MyTask(runnable, delay);
queue.offer(myTask);
locker.notify();
}
}
}
- Timer类中存在⼀个worker线程,⼀直不停的扫描队首 元素,看看是否能执行这个任务.所谓"能执行"指的是该任务设定的时间已经到达了.
// 在这⾥构造线程, 负责执⾏具体任务了.
public MyTimer() {
Thread t = new Thread(() -> {
while (true) {
try {
synchronized (locker) {
// 阻塞队列, 只有阻塞的⼊队列和阻塞的出队列, 没有阻塞的查看队⾸元素.
while (queue.isEmpty()) {
locker.wait();
}
MyTask myTask = queue.peek();
long curTime = System.currentTimeMillis();
if (curTime >= myTask.time) {
// 时间到了, 可以执⾏任务了
queue.poll();
myTask.runnable.run();
} else {
// 时间还没到
locker.wait(myTask.time - curTime);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
}
阻塞队列
阻塞队列的概念
阻塞队列是⼀种特殊的队列.也遵守"先进先出"的原则.
阻塞队列能是⼀种线程安全的数据结构,并且具有以下特性:
- 当队列满的时候,继续⼊队列就会阻塞,直到有其他线程从队列中取走元素.
- 当队列空的时候,继续出队列也会阻塞,直到有其他线程往队列中插入元素.
阻塞队列的⼀个典型应用场景就是"生产者消费者模型".这是⼀种非常典型的开发模型.
生产者消费者模型
生产者消费者模式就是通过⼀个容器来解决⽣产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取.
1. 阻塞队列就相当于⼀个缓冲区,平衡了生产者和消费者的处理能力.(削峰填谷)
例如在"秒杀"场景下,服务器同⼀时刻可能会收到⼤量的⽀付请求.如果直接处理这些⽀付请求, 服务
器可能扛不住(每个⽀付请求的处理都需要⽐较复杂的流程).这个时候就可以把这些请求都放到⼀个
阻塞队列中,然后再由消费者线程慢慢的来处理每个⽀付请求. 这样做可以有效进行"削峰",防止服务器被突然到来的⼀波请求直接冲垮.
2. 阻塞队列也能使生产者和消费者之间解耦.
⽐如过年⼀家⼈⼀起包饺⼦.⼀般都是有明确分⼯,⽐如⼀个⼈负责擀饺⼦⽪,其他⼈负责包.擀饺⼦
⽪的⼈就是"⽣产者",包饺⼦的⼈就是"消费者".
擀饺⼦⽪的⼈不关⼼包饺⼦的⼈是谁(能包就⾏,⽆论是⼿⼯包,借助⼯具,还是机器包),包饺⼦的⼈也
不关⼼擀饺⼦⽪的⼈是谁(有饺⼦⽪就⾏,⽆论是⽤擀⾯杖擀的,还是拿罐头瓶擀,还是直接从超市买
的).
阻塞队列实现(生产者/消费者模型)
- • 通过"循环队列"的方式来实现.
- • 使用synchronized进行加锁控制.
- • put插入元素的时候,判定如果队列满了,就进行wait.(注意,要在循环中进行wait被唤醒时不⼀定
队列就不满了,因为同时可能是唤醒了多个线程). - • take取出元素的时候,判定如果队列为空,就进行wait.(也是循环wait)
import java.util.Random;
public class BlockingQueue {
private int[] items = new int[1000];
private volatile int size = 0;
private volatile int head = 0;
private volatile int tail = 0;
public void put(int value) throws InterruptedException {
synchronized (this) {
// 此处最好使⽤ while.
// 否则 notifyAll 的时候, 该线程从 wait 中被唤醒,
// 但是紧接着并未抢占到锁. 当锁被抢占的时候, 可能⼜已经队列满了
// 就只能继续等待
while (size == items.length) {
wait();
}
items[tail] = value;
tail = (tail + 1) % items.length;
size++;
notifyAll();
}
}
public int take() throws InterruptedException {
int ret = 0;
synchronized (this) {
while (size == 0) {
wait();
}
ret = items[head];
head = (head + 1) % items.length;
size--;
notifyAll();
}
return ret;
}
public synchronized int size() {
return size;
}
// 测试代码
public static void main(String[] args) throws InterruptedException {
BlockingQueue blockingQueue = new BlockingQueue();
Thread customer = new Thread(() -> {
while (true) {
try {
int value = blockingQueue.take();
System.out.println(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "消费者");
customer.start();
Thread producer = new Thread(() -> {
Random random = new Random();
while (true) {
try {
blockingQueue.put(random.nextInt(10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "⽣产者");
producer.start();
customer.join();
producer.join();
}
}
如果觉得文章不错,期待你的一键三连哦,你个鼓励是我创作的动力之源,让我们一起加油,顶峰相见!!!💓 💓 💓