目录
简介
步骤 1: 定义消息类
步骤 2: 创建发布者
步骤 3: 创建订阅者
步骤 4: 实现发布-订阅模型
前言-与正文无关
生活远不止眼前的苦劳与奔波,它还充满了无数值得我们去体验和珍惜的美好事物。在这个快节奏的世界中,我们往往容易陷入工作的漩涡,忘记了停下脚步,感受周围的世界。让我们一起提醒自己,要适时放慢脚步,欣赏生活中的每一道风景,享受与家人朋友的温馨时光,发现那些平凡日子里隐藏的幸福时刻。因为,这些点点滴滴汇聚起来的,才是构成我们丰富多彩生活的本质。希望每个人都能在繁忙的生活中找到自己的快乐之源,不仅仅为了生存而工作,更为了更好的生活而生活。
送你张美图!希望你开心!
简介
在Java中,实现发布-订阅模型可以通过多种方式完成,包括使用内置的并发工具如BlockingQueue
。这里,我们使用LinkedBlockingQueue
来演示一个简单的发布-订阅系统,其中发布者将消息放入队列,而订阅者从队列中取出消息进行处理。
步骤 1: 定义消息类
首先,定义一个简单的消息类,用于发布者和订阅者传递消息。
public class Message {
private String content;
public Message(String content) {
this.content = content;
}
public String getContent() {
return content;
}
}
步骤 2: 创建发布者
发布者(Producer)将消息放入共享的BlockingQueue
中
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
private BlockingQueue<Message> queue;
public Producer(BlockingQueue<Message> q) {
this.queue = q;
}
@Override
public void run() {
// 发送消息
for (int i = 0; i < 10; i++) {
Message msg = new Message("" + i);
try {
Thread.sleep(i);
queue.put(msg);
System.out.println("Produced " + msg.getContent());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 发送结束消息
Message msg = new Message("exit");
try {
queue.put(msg);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
步骤 3: 创建订阅者
订阅者(Consumer)从BlockingQueue
中取出消息并处理。
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable {
private BlockingQueue<Message> queue;
public Consumer(BlockingQueue<Message> q) {
this.queue = q;
}
@Override
public void run() {
try {
Message msg;
// 检查消息内容是否为"exit"
while (!(msg = queue.take()).getContent().equals("exit")) {
Thread.sleep(10);
System.out.println("Consumed " + msg.getContent());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
步骤 4: 实现发布-订阅模型
现在,使用一个LinkedBlockingQueue
来连接发布者和订阅者。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class PubSubService {
public static void main(String[] args) {
// 创建共享的阻塞队列
BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
// 创建并启动发布者和订阅者线程
Thread producerThread = new Thread(new Producer(queue));
Thread consumerThread = new Thread(new Consumer(queue));
producerThread.start();
consumerThread.start();
}
}
在这个简单的发布-订阅模型中,Producer
类生成消息并将它们放入队列,而Consumer
类从队列中取出并处理这些消息。使用LinkedBlockingQueue
使得这个过程在多线程环境中是线程安全的,同时还处理了生产者和消费者的速率不匹配问题。
------------------------------------------与正文内容无关------------------------------------
如果觉的文章写对各位读者老爷们有帮助的话,麻烦点赞加关注呗!作者在这拜谢了!
混口饭吃了!如果你需要Java 、Python毕设、商务合作、技术交流、就业指导、技术支持度过试用期。请在关注私信我,本人看到一定马上回复!
这是我全部文章所在目录,看看是否有你需要的,如果遇到觉得不对地方请留言,看到后我会查阅进行改正。
A乐神-CSDN博客