文章目录
- 1. 简介
- 2. 代码实现
1. 简介
生产者消费者模式与保护性暂停模式的GuardObject不同,它不需要产生结果和消费结果的线程一一对应。它使用一个消息队列来平衡生产者和消费者的线程资源。其中生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理数据结果。注意,消息队列的容量是有限的,满时就不会再加入新的数据,空时就不会再消耗数据。JDK中各种阻塞队列采用的实现方式就是这种模式。
2. 代码实现
- 实现消息队列和消息类
final class Message{
private int id;
private Object message;
public Message(int id, Object message) {
this.id = id;
this.message = message;
}
public int getId() {
return id;
}
public Object getMessage() {
return message;
}
@Override
public String toString() {
return "message{" +
"id=" + id +
", message=" + message +
'}';
}
}
//消息队列的类
@Slf4j
class MessageQueue{
private LinkedList<Message> list=new LinkedList<>();
private int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
}
//获取消息的方法
public Message take(){
synchronized (list) {
while (list.isEmpty()) {
log.info("队列为空,消费者线程等待");
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Message message = list.removeFirst();
log.info("已经消费消息,消息为:{}",message.getMessage());
list.notifyAll();
return message;
}
}
//存入消息的方法
public void put(Message e){
synchronized (list){
while(list.size()==capacity){
log.info("队列已满,生产者线程等待");
try {
list.wait();
} catch (InterruptedException m) {
m.printStackTrace();
}
}
list.addLast(e);
log.info("已经生产消息,消息为:{}",e.getMessage());
list.notifyAll();
}
}
}
- 测试代码
@Slf4j
public class jvm {
public static void main(String[] args) throws InterruptedException {
MessageQueue messageQueue=new MessageQueue(2);
for (int i = 0; i < 3; i++) {
int d=i;
new Thread(()->{
messageQueue.put(new Message(d,"value"+d));
},"生产者-"+i).start();
}
new Thread(()->{
while(true){
try {
Thread.sleep(1000);
Message take = messageQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"消费者").start();
}
}