生产者-消费者模型的核心思想是通过阻塞队列和线程的等待和通知机制实现生产者和消费者之间的协作,确保生产者不会向满队列中添加消息,消费者不会从空队列中获取消息,从而有效地解决了多线程间的同步问题。
需要实现两个方法。方法1向队列中放入消息,如果队列中的数据满了就线程等待,方法2从队列中拿数据,如果队列空了就等待
-
生产者方法:这个方法用于将消息放入队列。如果队列已满,生产者需要等待,直到队列有足够的空间。在等待时,生产者线程通过调用
wait()
方法释放锁,并处于等待状态,直到队列中有足够的空间。一旦队列有空间,其他线程会通过notify()
或notifyAll()
方法通知生产者可以继续执行。 -
消费者方法:这个方法用于从队列中获取消息。如果队列为空,消费者需要等待,直到队列有消息可供消费。与生产者一样,消费者线程在等待时也会通过
wait()
方法释放锁,并处于等待状态,直到队列中有消息可供消费。一旦有消息可供消费,其他线程会通过notify()
或notifyAll()
方法通知消费者可以继续执行。
首先定义一个消息类
/**
* 消息实体类
* @date 2024/02/01
*/
final class Message {
private int id;
private Object value;
public Message(int id, Object value) {
this.id = id;
this.value = value;
}
public int getId() {
return id;
}
public Object getValue() {
return value;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", value=" + value +
'}';
}
然后实现上面的两个方法
/**
* 利用阻塞队列原理实现
* @date 2024/02/01
*/
class ZuseQueue {
/** 消息队列集合*/
private LinkedList<Message> list = new LinkedList<>();
/** 消息队列容量*/
private int capcity;
/** 存入消息*/
public void put(Message m){
synchronized (list) {
while (list.size() == capcity) {
try{
list.wait();
}catch (Exception e){
e.printStackTrace();
}
}
// 将消息加入队列尾部
list.addLast(m);
list.notifyAll();
}
}
/** 获取消息*/
public Message take() {
synchronized (list) {
while (list.isEmpty()) {
try{
list.wait();
}catch (Exception e){
e.printStackTrace();
}
}
// 从队列头部获取消息
Message message = list.removeFirst();
list.notifyAll();
return message;
}
}
// 定义队列容量
public ZuseQueue(int capcity) {
this.capcity = capcity;
}
}
之后进行代码测试,这里提供两种方式开启两个线程
-
使用线程(
Thread
)类:在第一种写法中,显式地创建了两个线程(producerThread
和consumerThread
),并通过start()
方法启动它们。这是传统的 Java 多线程编程方式,通过创建并启动线程来执行任务。 -
使用
CompletableFuture
:在第二种写法中,使用了 Java 8 中的CompletableFuture
来创建和管理异步任务。CompletableFuture
提供了更高级的异步编程方式,可以更灵活地组合和管理多个异步操作。
性能上的差异主要取决于具体的应用场景和需求。如果需要更高级的控制和管理异步任务,以及更多的组合和操作,使用 CompletableFuture
可能更有优势。它允许你更容易地处理异常、组合多个异步操作、等待所有任务完成等。
如果你的需求比较简单,只是需要创建和管理一些基本的线程,第一种方式可能更简单和直观。但要注意,在某些情况下,使用 CompletableFuture
可能会更具可读性和维护性,因为它提供了更多的工具和方法来处理异步编程中的常见问题。
public class MessageQueue {
public static void main(String[] args) {
ZuseQueue zuseQueue = new ZuseQueue(3);
// Thread producerThread = new Thread(() -> {
// for (int i = 0; i < 5; i++) {
// Message message = new Message(i, "a");
// zuseQueue.put(message);
// System.out.println("消息存入:" + message.toString());
// }
// });
//
// Thread consumerThread = new Thread(() -> {
// while (true) {
// try {
// Thread.sleep(1); // 为了不过于频繁的消费
// Message message = zuseQueue.take();
// System.out.println("消费成功 消费id:" + message.getId());
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
// });
//
// producerThread.start();
// consumerThread.start();
CompletableFuture<Void> producerFuture = CompletableFuture.runAsync(() -> {
for (int i = 0; i < 5; i++) {
Message message = new Message(i, "a");
zuseQueue.put(message);
System.out.println("消息存入:" + message.toString());
}
});
CompletableFuture<Void> consumerFuture = CompletableFuture.runAsync(() -> {
while (true) {
try {
Thread.sleep(1);
Message message = zuseQueue.take();
System.out.println("消费成功 消费id:" + message.getId());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(producerFuture, consumerFuture);
combinedFuture.join();
}
}
完整代码如下:
package test;
import java.util.LinkedList;
import java.util.concurrent.CompletableFuture;
/**
* @author 小如
* @date 2024/02/01
*/
public class MessageQueue {
public static void main(String[] args) {
ZuseQueue zuseQueue = new ZuseQueue(3);
// Thread producerThread = new Thread(() -> {
// for (int i = 0; i < 5; i++) {
// Message message = new Message(i, "a");
// zuseQueue.put(message);
// System.out.println("消息存入:" + message.toString());
// }
// });
//
// Thread consumerThread = new Thread(() -> {
// while (true) {
// try {
// Thread.sleep(1); // 为了不过于频繁的消费
// Message message = zuseQueue.take();
// System.out.println("消费成功 消费id:" + message.getId());
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
// });
//
// producerThread.start();
// consumerThread.start();
CompletableFuture<Void> producerFuture = CompletableFuture.runAsync(() -> {
for (int i = 0; i < 5; i++) {
Message message = new Message(i, "a");
zuseQueue.put(message);
System.out.println("消息存入:" + message.toString());
}
});
CompletableFuture<Void> consumerFuture = CompletableFuture.runAsync(() -> {
while (true) {
try {
Thread.sleep(1);
Message message = zuseQueue.take();
System.out.println("消费成功 消费id:" + message.getId());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(producerFuture, consumerFuture);
combinedFuture.join();
}
}
/**
* 利用阻塞队列原理实现
* @date 2024/02/01
*/
class ZuseQueue {
/** 消息队列集合*/
private LinkedList<Message> list = new LinkedList<>();
/** 消息队列容量*/
private int capcity;
/** 存入消息*/
public void put(Message m){
synchronized (list) {
while (list.size() == capcity) {
try{
list.wait();
}catch (Exception e){
e.printStackTrace();
}
}
// 将消息加入队列尾部
list.addLast(m);
list.notifyAll();
}
}
/** 获取消息*/
public Message take() {
synchronized (list) {
while (list.isEmpty()) {
try{
list.wait();
}catch (Exception e){
e.printStackTrace();
}
}
// 从队列头部获取消息
Message message = list.removeFirst();
list.notifyAll();
return message;
}
}
// 定义队列容量
public ZuseQueue(int capcity) {
this.capcity = capcity;
}
}
/**
* 消息实体类
* @date 2024/02/01
*/
final class Message {
private int id;
private Object value;
public Message(int id, Object value) {
this.id = id;
this.value = value;
}
public int getId() {
return id;
}
public Object getValue() {
return value;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", value=" + value +
'}';
}
}
测试截图: