阻塞队列
谈到队列,大家就能想到队列的先进先出原则,但有些特殊的队列,虽然也是先进先出的,但是带有阻塞功能,我们把这种队列叫做阻塞队列.
★如果队列为空,执行出队操作就会阻塞,阻塞到另外一个线程往队列里添加元素(队列不为空)为止.
★如果队列满了,执行入队操作时,也会阻塞,阻塞到另外一个线程从队列里取出元素(队列不满).
想要了解阻塞队列,先了解生产者-消费者模型,这个模型给我们程序带来了非常重要的好处
1.生产者-消费者模型
在一个餐馆中,有多名厨师(生产者)负责烹饪菜品,并将其放在共享的出菜区(缓冲区)上。而服务员(消费者)则从出菜区上取出菜品并端给顾客。这里的厨师可以看作是生产者,出菜区是缓冲区,服务员是消费者。
1. 实现了生产者和消费者之间的 "解耦"
我们写代码要追求低耦合,所谓解耦操作,就是降低耦合的过程.
比如过年一家人一起包饺子. 一般都是有明确分工 , 比如一个人负责擀饺子皮 , 其他人负责包 . 擀饺 子皮的人就是 "生产者", 包饺子的人就是 " 消费者 ".擀饺子皮的人不关心包饺子的人是谁( 能包就行 , 无论是手工包 , 借助工具 , 还是机器包 ), 包饺子的人 也不关心擀饺子皮的人是谁( 有饺子皮就行 , 无论是用擀面杖擀的 , 还是拿罐头瓶擀 , 还是直接从超市买的)
2.可以"削峰填谷",保证系统的稳定性
阻塞队列就相当于一个缓冲区,平衡了生产者消费者的处理能力
比如在 " 秒杀 " 场景下 , 服务器同一时刻可能会收到大量的支付请求 . 如果直接处理这些支付请求 , 服务器可能扛不住( 每个支付请求的处理都需要比较复杂的流程 ). 这个时候就可以把这些请求都放 到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求 .这样做可以有效进行 " 削峰 ", 防止服务器被突然到来的一波请求直接冲垮
2.标准库中的阻塞队列
java标准库中内置了阻塞队列,在使用的时候直接使用标准库中的就行
1.BlockingDeque是一个接口
2.真正实现BlockingDeque的类有三种
3.带有阻塞功能的方法.put()方法用于阻塞队列的入队,take()方法用于阻塞队列的出队.在使用take的时候需要处理异常
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
//创建两个线程,来作为生产者和消费者
Thread customer = new Thread(() -> {
while (true) {
try {
Integer result = blockingQueue.take();
System.out.println("消费元素: " + result);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
customer.start();
Thread producer = new Thread( () -> {
int count = 0;
while (true) {
try {
blockingQueue.put(count);
System.out.println("生产元素: " + count );
count++;
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
}
上面执行的结果是
上面代码先执行消费元素是因为当customer线程执行到blockingQueue,take()时发生了阻塞,等到producer线程执行blockingQueue.put()后,两个线程并发执行,谁先打印都有可能.
4. 不带阻塞功能的方法 入队列 offer(), 出队列 poll(), 取队首元素 peek().
3.模拟实现阻塞队列
实现阻塞队列的前提是我们要先实现一个普通的队列,实现普通队列的方法有两种,一个是基于链表的队列,采用头删法和尾插法实现一个阻塞队列,另外一个方法是基于数组的循环队列,两个方法的具体实现在下面这篇文章里面.
循环队列的实现.
要模拟实现阻塞队列,前提是在多线程的基础上,通过synchronized关键字对put()和take()方法中的代码进行加锁,然后通过wait()和notify()方法对插入元素和取出元素进行阻塞等待.具体方法是,当插入元素时如果队列满了,就产生阻塞等待,直到有元素取出后唤醒put中的wait().当取出元素的时候,如果队列为空,让take()方法产生阻塞等待,只到有元素插入后,唤醒take()中的wait().
class MyBlockingQueue {
private int[] item = new int[1000];
private int head = 0;
private int tail = 0;
private int size = 0;
//插入元素
public void put(int value) throws InterruptedException {
synchronized (this) {
while(size == item.length) {
//队列满了,产生阻塞等待
this.wait();
}
item[tail] = value;
tail++;
if(tail >= item.length) {
tail = 0;
}
size++;
//唤醒take中的wait()
this.notify();
}
}
//取出元素
public Integer take() throws InterruptedException {
int result = 0;
synchronized (this) {
while(size == 0) {
//队列为空,产生阻塞等待
this.wait();
}
result = item[head];
head++;
if(head >= item.length) {
head = 0;
}
size--;
//唤醒put中的wait()
this.notify();
}
return result;
}
}
定时器
定时器就好比是我们生活中的闹钟,当我们被早八的闹钟叫醒后想在床上在躺十分钟,我们设置闹钟就可以让十分钟再响,而我们的计时器具有类似的功能,是指定一段之间后执行一段代码.
在Java的标准库中有一个Timer类,Timer类的核心方法是schedule()
schedule()中有连个参数,第一个参数指定定时器要执行的任务,第二个参数指定多长时间后执行这个任务
模拟实现定时器
主要实现下面几种功能
1.让被注册的任务,能在指定时间后执行.为了实现这个功能可以单独在定时器内部建一个线程,让这个线程周期性的扫描,判断任务时间是否到了,如果到了,就执行,若没动,就继续等待.这n个任务,可以用优先级队列来保存,按照指定时间小的优先级高,就放在队头,此时扫描线程,就只需要扫描队头元素,对头元素时间没到的话,其他时间更不可能到.
在优先级队列中,我们是基于时间进行比较的,可以实现 Comparable接口来进行比较
在扫描线程中会出现下面的bug.
2.一个定时器可以指定N个任务,N个任务会按照指定时间按顺序执行.
具体实现代码
class MyTask implements Comparable<MyTask>{
//要执行的任务
private Runnable runnable;
//任务在什么时间执行(用毫秒时间戳表示)
private long time;
public MyTask(Runnable runnable, long time) {
this.runnable = runnable;
this.time = time;
}
//获取当前任务的时间
public long getTime() {
return time;
}
//执行任务
public void run() {
runnable.run();
}
@Override
public int compareTo(MyTask o) {
return (int) (this.time - o.time);
}
}
//定时器
class MyTimer {
//扫描线程
private Thread t = null;
//有一个阻塞优先级队列
private PriorityBlockingQueue<MyTask> queue = new PriorityBlockingQueue<>();
public MyTimer() {
t = new Thread(() -> {
while (true) {
try {
synchronized (this) {
MyTask myTask = queue.take();
//取出队头元素观察判断是否到时间了
long curTime = System.currentTimeMillis();
if(curTime < myTask.getTime()) {
//时间没到放回队列
queue.put(myTask);
} else {
//时间到了,执行任务
myTask.run();
this.wait(myTask.getTime() - curTime);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} );
t.start();
}
public void schedule (Runnable runnable,long after) {
//第一个参数 是指定的任务
// 第二个参数是在多少毫秒后执行
MyTask myTask = new MyTask(runnable,System.currentTimeMillis()+ after);
//把任务放到队列里
queue.put(myTask);
synchronized (this) {
this.notify();
}
}