基于消息中间件的异步通信机制在系统解耦中的优化与实现


✨✨谢谢大家捧场,祝屏幕前的小伙伴们每天都有好运相伴左右,一定要天天开心哦!✨✨ 
🎈🎈作者主页: 喔的嘛呀🎈🎈
✨✨ 帅哥美女们,我们共同加油!一起进步!✨✨ 

目录

引言

一. 选择合适的消息中间件

二. 定义消息格式和通信协议

1. 定义消息格式

消息头

消息体

2. 定义通信协议

发送消息

接收消息

消息处理

3. 示例代码

定义消息格式

发送消息

接收消息

三、发布-订阅模式

1. 定义发布-订阅模式

2. 示例代码

发布消息

订阅消息

3. 运行示例

4. 异步处理消息

5. 解耦系统

6. 实现步骤

7. 实例场景

实例场景:电商系统订单处理

场景描述

实现步骤

示例代码

订单服务发送消息

库存服务接收消息

物流服务接收消息


引言

在现代分布式系统中,异步通信和解耦是非常重要的设计原则。通过使用消息中间件,可以实现系统间的异步通信和解耦,提高系统的可扩展性和可靠性。本文将介绍如何使用消息中间件来实现系统间的异步通信和解耦,并通过一个实际场景来演示。

一. 选择合适的消息中间件

选择合适的消息中间件需要考虑多个因素,包括项目需求、性能要求、可靠性、社区支持等。常见的消息中间件包括 RabbitMQ、Kafka、ActiveMQ、Redis 等,下面针对不同的需求给出一些选择建议:

  1. 消息传递模式

    • 点对点:适合使用 RabbitMQ、ActiveMQ 等传统消息中间件。
    • 发布-订阅:适合使用 RabbitMQ、Kafka 等支持广播消息的中间件。
  2. 可靠性

    • 如果对消息的可靠性要求较高,需要确保消息不会丢失,可以考虑使用 RabbitMQ、Kafka 等提供消息持久化和高可靠性的中间件。
  3. 性能

    • 如果需要处理大量的消息并且需要低延迟,可以考虑使用 Kafka,它是一个高吞吐量的消息中间件,适合大数据场景。
    • 如果对延迟要求较低,可以选择 RabbitMQ、ActiveMQ 等传统消息中间件。
  4. 社区支持和生态系统

    • 考虑选择一个有活跃社区支持和完善生态系统的消息中间件,这样可以更容易地解决问题和扩展功能。
  5. 技术栈兼容性

    • 考虑选择一个与你的技术栈兼容的消息中间件,避免出现集成上的问题。

综合考虑以上因素,可以选择最适合项目需求的消息中间件。

二. 定义消息格式和通信协议

定义消息格式和通信协议是使用消息中间件的关键步骤之一,它涉及到消息的结构、内容和交互方式。下面以 RabbitMQ 为例,演示如何定义消息格式和通信协议。

1. 定义消息格式

在 RabbitMQ 中,消息通常由两部分组成:消息头和消息体。消息头包含一些元数据信息,如消息的类型、路由键等;消息体包含实际的业务数据。

消息头
  • Content-Type:消息体的类型,如 application/jsontext/plain 等。
  • DeliveryMode:消息持久性标志,标识消息是否需要持久化存储,可选值为 1(持久化)和 2(非持久化)。
  • CorrelationId:消息关联标识,用于关联一组相关消息。
  • 其他自定义的消息头字段,根据业务需求定义。
消息体
  • 消息体可以是任意格式的数据,如 JSON、XML、文本等,根据业务需求定义。

2. 定义通信协议

通信协议定义了消息的交互方式,包括消息的发送、接收和处理流程。通信协议可以包括以下几个方面:

发送消息
  • 客户端向消息队列发送消息,包括指定交换机(Exchange)、路由键(Routing Key)和消息体。
接收消息
  • 服务端从消息队列接收消息,根据消息的交换机和路由键接收对应的消息。
消息处理
  • 客户端接收到消息后,根据消息的内容执行相应的业务逻辑。

3. 示例代码

定义消息格式
public class Message {
    private String content;
    private String contentType;
    private int deliveryMode;
    private String correlationId;

    // 省略getter和setter方法
}
发送消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class SendMessage {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            Message message = new Message();
            message.setContent("Hello, RabbitMQ!");
            message.setContentType("text/plain");
            message.setDeliveryMode(1); // 持久化
            message.setCorrelationId("123456");
            String messageJson = toJson(message);
            channel.basicPublish("", QUEUE_NAME, null, messageJson.getBytes());
            System.out.println(" [x] Sent '" + messageJson + "'");
        }
    }

    private static String toJson(Message message) {
        // 将 message 对象转换成 JSON 格式的字符串
        return "{ \"content\": \"" + message.getContent() + "\", \"contentType\": \"" + message.getContentType() + "\", \"deliveryMode\": " + message.getDeliveryMode() + ", \"correlationId\": \"" + message.getCorrelationId() + "\" }";
    }
}
接收消息
import com.rabbitmq.client.*;

public class ReceiveMessage {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press Ctrl+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String messageJson = new String(delivery.getBody(), "UTF-8");
                Message message = fromJson(messageJson, Message.class);
                System.out.println(" [x] Received '" + messageJson + "'");
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
            });
        }
    }

    private static <T> T fromJson(String json, Class<T> clazz) {
        // 将 JSON 格式的字符串转换成指定类型的对象
        // 这里可以使用 JSON 框架(如 Jackson、Gson)来实现
        return null;
    }
}

通过以上步骤,可以定义消息格式和通信协议,并使用 RabbitMQ 实现消息的发送和接收。

三、发布-订阅模式

发布-订阅模式是一种常见的消息传递模式,用于实现消息的广播和订阅。在发布-订阅模式中,消息发布者将消息发布到一个主题(Topic),而消息订阅者可以订阅感兴趣的主题,从而接收到相关消息。下面以 RabbitMQ 为例,演示如何使用发布-订阅模式。

1. 定义发布-订阅模式

在发布-订阅模式中,有一个交换机(Exchange)用来接收发布者发布的消息,并根据订阅者的绑定关系将消息路由到对应的队列。订阅者可以创建自己的队列,并将队列绑定到交换机上,从而接收到发布者发布的消息。

2. 示例代码

发布消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Publisher {

    private final static String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            String message = "Hello, subscribers!";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}
订阅消息
import com.rabbitmq.client.*;

public class Subscriber {

    private final static String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName, EXCHANGE_NAME, "");

            System.out.println(" [*] Waiting for messages. To exit press Ctrl+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
            });
        }
    }
}

3. 运行示例

  1. 先运行订阅者 Subscriber,它会创建一个队列并绑定到交换机上,开始监听消息。
  2. 然后运行发布者 Publisher,它会向交换机发布一条消息。
  3. 订阅者会接收到发布者发布的消息,并输出到控制台。

通过以上步骤,可以实现基于 RabbitMQ 的发布-订阅模式。

4. 异步处理消息

通过消息中间件实现异步处理消息,即发送消息后不需要立即等待结果,而是继续执行其他任务。这样可以提高系统的响应速度和吞吐量。

5. 解耦系统

通过消息中间件,系统之间的通信变成了基于消息的方式,系统不再直接依赖于对方的接口和实现细节,从而实现了系统之间的解耦。

6. 实现步骤

  • 定义消息格式和通信协议:确定消息的格式和通信协议,包括消息的内容结构、消息的生命周期等。
  • 配置消息中间件:在系统中配置和启动消息中间件,确保消息中间件正常运行。
  • 消息的发布和订阅:编写代码实现消息的发布和订阅逻辑,将消息发布到指定的主题,并订阅感兴趣的主题。
  • 处理接收到的消息:编写代码处理接收到的消息,根据消息的内容执行相应的业务逻辑。
  • 测试和验证:对系统进行测试和验证,确保消息的发布、订阅和处理功能正常运行。

7. 实例场景

实例场景:电商系统订单处理
场景描述

假设有一个电商系统,包含订单服务、库存服务和物流服务。当用户下单时,订单服务需要通知库存服务减少库存,通知物流服务发货。为了提高系统的可扩展性和可靠性,我们可以使用消息中间件来实现订单处理的异步通信和解耦。

实现步骤
  1. 定义消息格式和通信协议:定义订单消息的格式,包括订单号、商品信息等,并确定消息的交换机和队列名称。

  2. 配置消息中间件:在消息中间件中配置交换机和队列,并确保消息的持久化。

  3. 订单服务发送消息:订单服务在用户下单后,将订单消息发送到消息队列中。

  4. 库存服务订阅消息:库存服务订阅订单消息队列,接收并处理订单消息,减少库存。

  5. 物流服务订阅消息:物流服务也订阅订单消息队列,接收并处理订单消息,进行发货。

示例代码
订单服务发送消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class OrderService {

    private static final String EXCHANGE_NAME = "orders";
    private static final String QUEUE_NAME = "order_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "New order placed";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}
库存服务接收消息
import com.rabbitmq.client.*;

public class InventoryService {

    private static final String EXCHANGE_NAME = "orders";
    private static final String QUEUE_NAME = "order_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

            System.out.println(" [*] Waiting for orders. To exit press Ctrl+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                // 处理订单消息,减少库存
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        }
    }
}
物流服务接收消息
import com.rabbitmq.client.*;

public class LogisticsService {

    private static final String EXCHANGE_NAME = "orders";
    private static final String QUEUE_NAME = "order_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

            System.out.println(" [*] Waiting for orders. To exit press Ctrl+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                // 处理订单消息,发货
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        }
    }
}

通过以上步骤的简单演示,订单服务可以异步发送订单消息,库存服务和物流服务可以订阅订单消息并处理,实现了订单处理的异步通信和解耦。

通过以上步骤,可以使用消息中间件实现系统间的异步通信和解耦,提高系统的可扩展性和可维护性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/635202.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Docker Compose快速入门

本教程旨在通过指导您开发基本Python web应用程序来介绍Docker Compose的基本概念。 使用Flask框架&#xff0c;该应用程序在Redis中提供了一个命中计数器&#xff0c;提供了如何在web开发场景中应用Docker Compose的实际示例。 即使您不熟悉Python&#xff0c;这里演示的概念也…

Llama模型家族之使用 Supervised Fine-Tuning(SFT)微调预训练Llama 3 语言模型(一) LLaMA-Factory简介

LlaMA 3 系列博客 基于 LlaMA 3 LangGraph 在windows本地部署大模型 &#xff08;一&#xff09; 基于 LlaMA 3 LangGraph 在windows本地部署大模型 &#xff08;二&#xff09; 基于 LlaMA 3 LangGraph 在windows本地部署大模型 &#xff08;三&#xff09; 基于 LlaMA…

以色列人Andi Gutmans开发的php zend

虽然目前php语言不行了【相关的文章前几年已经有人发过】&#xff0c;但这不是重点&#xff0c;重点是zend引擎的东西具有极大的技术价值&#xff0c;负责zend引擎实现的大佬都现在差不多都是40&#xff0c;50岁左右了&#xff0c;从1997&#xff0c;1998&#xff0c;2000到202…

Java基础之进制转换和位运算专题

什么是进制&#xff1f; 是数学中的一个概念&#xff0c;就是数据“逢几进位”。 例如&#xff1a;生活中用的计数方法 ---- 十进制。十进制就是数字逢十就要进一位。 例如&#xff1a;一个星期有7天&#xff0c;就是逢七进一&#xff1b;一个月有30天就是逢30进一&#xff1b;…

基于单片机和蓝牙控制的智能小车设计

摘要 &#xff1a; 本文设计了一种以智能手机为平台控制小车的控制系统&#xff0c;该系统以蓝牙为通信模块&#xff0c;手机通过蓝牙发送信号给小 车上的蓝牙模块&#xff0c;从而驱动电机实现小车各种运动&#xff0c;提供了一种无线遥控小车的新思路。设计了该系统的硬件与软…

思维导图-VPN

浏览器集成了受信任的机构的证书

python+selenium - UI自动框架之封装查找元素

单一的元素定位方法不能满足所有元素的定位&#xff0c;可以根据每个元素的特点来找到合适的方法&#xff0c;可以参考下图的方法&#xff1a; elementFind.py from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_con…

汇舟问卷:海外问卷项目适合工作室做吗?

这个项目适合工作室操作&#xff0c;国外问卷调查主要是利用填写问卷来赚取奖励。只要完成得越多&#xff0c;挣得也就越多。 这个项目的本质就是在线上进行简单的工作&#xff0c;只不过结算方式是以美元计算。 即使一份问卷只值1美元&#xff0c;但这也意味着收入达到了7元…

鸿蒙ArkUI-X跨平台技术:【SDK结构介绍】

ArkUI-X SDK目录结构介绍 简介 本文档配套ArkUI-X&#xff0c;将OpenHarmony ArkUI开发框架扩展到不同的OS平台&#xff0c;比如Android和iOS平台&#xff0c;让开发者基于ArkUI&#xff0c;可复用大部分的应用代码&#xff08;UI以及主要应用逻辑&#xff09;并可以部署到相…

ngnix 入门 二,docker启动nginx, 安装ssl 证书,使用配置文件,映射后端服务 ,提供给前端项目访问

搭建生产环境真不是人做的事&#xff0c;特别是对于一知半解的人。仅以此文献给各位技术人 说一下背景&#xff1a;项目前后端分离&#xff0c;前端 vue3 、小程序端 &#xff0c;后端 go 提供服务。 微信小程序需要使用 https 请求。 这就必须让我们想到nginx 了 想要达到的…

代码随想录算法训练营第二天| 977.有序数组的平方 、209.长度最小的子数组、 59.螺旋矩阵II

977. 有序数组的平方 题目链接&#xff1a;977. 有序数组的平方 文档讲解&#xff1a;代码随想录 状态&#xff1a;so easy 刚开始看到题目第一反应就是平方之后进行排序&#xff0c;数据量在 1 0 4 10^4 104&#xff0c;可以使用O(nlogn)的排序。但是更好的方式是使用双指针&a…

ArrayList与LinkedList

内存 内存缓存 预先将数据写到容器等数据存储单元中&#xff0c;就是软件内存缓存。 内存缓存淘汰机制 FIFO&#xff08;First in ,First Out&#xff09;&#xff08;先进先出&#xff09; LFU (Least Frequently Used) (频繁的最后淘汰) LRU(Least Recently Used) &#…

Socket同步通讯

目录 引言 1. 建立连接 2. 数据传输 3. 同步机制 4. 处理延迟 5. 安全性 6、一对一Socket同步通讯 客户端 代码分析 服务端 代码分析 7、服务端操作 1、首先我们先运行客户端代码 2、服务端点击Connect连接客户端 3、服务端输入信息传输到客户端 4、断开连接 引…

【笔记】软件架构师要点记录(1)

【笔记】软件架构师要点记录 20240517 20240517 连续性&#xff1a;恢复能力&#xff1b;可用性&#xff1a;保持稳定态的时长 增量开发模式&#xff1a;在增量开发中&#xff0c;每个增量都有明确的范围和功能&#xff0c;并按照特定的功能顺序完成。增量之间的范围划分在开发…

Flask CORS: 解决跨域资源共享问题的利器

文章目录 安装和启用 CORS配置 CORS拓展 在本文中&#xff0c;我们介绍了如何使用 Flask-CORS 扩展来解决跨域问题。Flask-CORS 是一个方便的工具&#xff0c;可以帮助我们轻松地实现跨域资源共享支持。 安装和启用 CORS 要开始使用 Flask-CORS&#xff0c;我们需要先安装它。…

腹部多器官分割的眼动引导双路径网络

文章目录 标题摘要方法实验结果 标题 摘要 这项研究提出了一种新的方法&#xff0c;名为眼动引导双路径网络&#xff08;Eye-Guided Dual-Path Network&#xff0c;EG-DPN&#xff09;&#xff0c;用于腹部多器官分割。这项工作的主要目标是提高医学影像分析中的多器官分割准…

express.js--token中间件验证及token解析(三)

主要作用 访问路由接口时&#xff0c;哪些需要校验token 通过token解析身份信息&#xff0c;就可以知道是哪个人 框架基本搭建express.js--基本用法及路由模块化(一)-CSDN博客 如何生成tokenexpress.js--生成token(二)-CSDN博客 middleware/index.js const jwt require(…

Vue 离线地图实现

效果图&#xff1a; 一、获取市的地图数据 DataV.geoAtlas 获取市地图数据 点击地图缩放至想要的市区域&#xff0c;通过右侧的链接打开网址&#xff0c;复制json数据。 二、获取镇地图数据 选择你想要的镇数据&#xff0c;点击下载 选择级别&#xff08;清晰度&#xff09…

如何搭建Sphinx文档

环境准备 Linux CentOS 7 方案 搭建一个文档网站&#xff0c;本文档使用的是tomcatsphinx。 Tomcat可以快速搭建出http服务&#xff0c;也可以使用apache httpd。 Sphinx作为文档网页自动生成工具&#xff0c;可以从reStructured文档转换为html文件。 Tomcat安装 创建/…

App玩转oCPX投放,打造低成本高转化的广告模型

随着广告主考核目标逐渐深化&#xff0c;以激活、注册等浅层指标为考核已经无法满足大部分广告主的投放诉求&#xff0c;越来越多的后端深化指标成为了广告主的核心诉求。OCPX应需而生&#xff0c;更好的助力广告主优化投放&#xff0c;全面提升转化效率。 在投放实践中&#…