震惊!!原来阻塞队列&&消息队列这样理解会更简单!!!
- 一:阻塞队列
- 二:消息队列
- 2.1:生产者消费者模型
- 2.1.1:解耦合:
- 2.1.2:削峰填谷:
- 三:消息队列代码
- 3.1.1
- 3.1.2:
- 3.1.3:生产慢,消费快,消费阻塞
- 3.1.3:生产快,消费慢,生产阻塞
- 二级目录
- 二级目录
一:阻塞队列
阻塞队列:先进先出,线程安全,并且带有阻塞功能.
1:队列为空时,当尝试出队列,出队列操作就会阻塞,一直阻塞到队列 不为空为止
2:队列满了,当尝试入队时,入队列就会阻塞,一直阻塞到队列不为满为止.
BlockingQueue就是标准库提供的阻塞队列
二:消息队列
不是普通的先进先出,而是通过topic这样的参数来进行分类,分类后,根据某个topic进行先进先出(消息队列往往也会带有阻塞特性).
举个例子:
即使上图:检查心脏的人,排在第二位,但医生没有喊"检查心脏的患者,那检查心脏的患者"只能继续等待,而检查肾,做产检的患者,按照顺序看医生.
由于消息队列这样的数据结构(本身就是一种数据结构)太好用了,因此在实际开发中,经常把这样的数据结构,封装成单独的服务器程序,单独部署,这样的服务器程序,同样也称为消息队列.
消息队列能够起到的作用,就是实现"生产者消费者模型"
普通的阻塞队列,也可以实现生产者消费者模型.
如果在一个进程内,实现生产者消费者模型,直接使用阻塞队列即可
如果需要在分布式系统中,实现生产者消费者模型,则需要使用单独部署的消息队列服务器
2.1:生产者消费者模型
生产者消费者模型,在开发中主要有两方面的意义:
1:能够让程序解耦合
2:能够让程序"削峰填谷"
2.1.1:解耦合:
2.1.2:削峰填谷:
客户端发来的请求,个数多少,没办法提前预知,遇到某些突发事件,就可能导致客户端给服务器的请求激增
正常情况下,A收到一个客户端请求,就同样请求一次B,A收到的请求多了,B的请求也就会增多,但由于A做的工作比较简单,消耗的资源比较少,B做的工作复杂,消耗的资源比较多,一旦请求量大了,B服务器就容易崩(服务器每次处理一个请求,都会消耗一定的系统资源,如果同一时刻,要处理的请求太多,消耗的总资源数目超出机器能提供的上限,机器可能就会卡死).
通过引入消息队,无论A给队列写多快,B都可以按照固有的节奏来消费数据,B的节奏,就不一定完全跟着A了,相当于队列把B保护起来了.
三:消息队列代码
3.1.1
public class Demo2 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue=new ArrayBlockingQueue<>(100);
queue.put("aaa");
String str=queue.take();
System.out.println("取出的数据是 :"+str);
// String str2=queue.take();
// System.out.println("取出的数据是 :"+str2);
}
}
3.1.2:
public class Demo2 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue=new ArrayBlockingQueue<>(100);
queue.put("aaa");
String str=queue.take();
System.out.println("取出的数据是 :"+str);
String str2=queue.take();
System.out.println("取出的数据是 :"+str2);
}
}
当第二次取数据的时候,由于队列中没有数据了,只能进行阻塞等待,等待到队列中又存有数据,
3.1.3:生产慢,消费快,消费阻塞
public class Demo3 {
public static int count=1;
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
Thread t1 = new Thread(() -> {
while(true) {
try {
queue.put(count);
System.out.println("t1 生产:" + count);
count++;
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread t2 = new Thread(()->{
while(true) {
int num = 0;
try {
num = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("t2 消费:" + num);
}
});
t1.start();
t2.start();
}
}
3.1.3:生产快,消费慢,生产阻塞
public class Demo3 {
public static int count=1;
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
Thread t1 = new Thread(() -> {
while(true) {
try {
queue.put(count);
System.out.println("t1 生产:" + count);
count++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread t2 = new Thread(()->{
while(true) {
int num = 0;
try {
num = queue.take();
System.out.println("t2 消费:" + num);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
t2.start();
}
}