SpringCloud系列教程:微服务的未来(二十五)-基于注解的声明队列交换机、消息转换器、业务改造

前言

在现代分布式系统中,消息队列是实现服务解耦和异步处理的关键组件。Spring框架提供了强大的支持,使得与消息队列(如RabbitMQ、Kafka等)的集成变得更加便捷和灵活。本文将深入探讨如何利用Spring的注解驱动方式来配置和管理队列、交换机、消息转换器等组件,从而实现一个高效且可扩展的消息处理架构。

在本博客中,我们将重点介绍:

如何使用Spring的注解方式配置RabbitMQ的队列和交换机。
如何配置消息转换器(如Jackson2JsonMessageConverter)来处理不同格式的消息。
如何根据业务需求对现有代码进行改造,将消息队列引入到系统中,从而实现消息的异步处理与解耦。
通过这篇文章,您将了解如何使用Spring框架的注解配置简化消息队列的管理,同时提升系统的可扩展性和维护性。


基于注解的声明队列交换机

利用SpringAMQP声明DirectExchange并与队列绑定
需求如下:

  1. 在consumer服务中,声明队列direct.queue1和direct.queue2
  2. 在consumer服务中,声明交换机hmall.direct,将两个队列与其绑定
  3. 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

在这里插入图片描述
基于Bean声明队列和交换机代码如下:

package com.itheima.consumer.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class DirectConfiguration {
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("hmall.direct")
    }
    @Bean
    public Queue directQueue1(){
        return new Queue("direct.queuue1");
    }
    @Bean
    public Binding directQueue1bindingRed( Queue directQueue1, DirectExchange directExchange ){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("red");

    }
    @Bean
    public Binding directQueue1bindingBlue( Queue directQueue1, DirectExchange directExchange ){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");

    }
    @Bean
    public Queue directQueue2(){
        return new Queue("direct.queuue2");
    }
    @Bean
    public Binding directQueue2bindingRed( Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("red");

    }
    @Bean
    public Binding directQueue2bindingYellow( Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");

    }
}

SpringAMOP还提供了基于@RabbitListener注解来声明队列和交换机的方式

@RabbitListener(bindings =@QueueBinding(
	value = @Queue(name =direct.queue1),
	exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
	key = {"red","blue"}
))
public void listenDirectQueuel(string msg){
	System.out.println("消费者1接收到Direct消息:【+msg+"】");
}

接收者代码如下:

	@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1",durable = "true"),
            exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
            key = {"red","blue"}
    ))
    public void listenDirectQueue1(String message)throws Exception {
        log.info("消费者1监听到direct.queue2的消息,["+message+"]");
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2",durable = "true"),
            exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
            key = {"red","yellow"}
    ))

消息转换器

消息转换器
需求:测试利用SpringAMQP发送对象类型的消息

  • 声明一个队列,名为object.queue
  • 编写单元测试,向队列中直接发送一条消息,消息类型为Map
  • 在控制台查看消息,总结你能发现的问题
// 准备消息
Map<String,0bject>msg = new HashMap<>();
msg.put("name","Jack");
msg.put("age"21);

创建队列object.queue
在这里插入图片描述
测试代码如下:

	@Test
    public void TestSendObject(){
        Map<String, Object> msg = new HashMap<>();
        msg.put("name", "Jack");
        msg.put("age", 18);
        //3.发送消息 参数分别是:交换机名称、RoutingKey(暂时为空)、消息
        rabbitTemplate.convertAndSend("object.queue",msg);
    }

在控制台上找到object.queue中得到消息
在这里插入图片描述
Spring的对消息对象的处理是由org.springframework.amgp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于IDK的ObjectOutputStream完成序列化。存在下列问题:

  • JDK的序列化有安全风险
  • JDK序列化的消息太大
  • JDK序列化的消息可读性差

建议采用JSON序列化代替默认的JDK序列化,要做两件事情:
在publisher和consumer中都要引入jackson依赖:

<dependency>
	<groupId>com.fasterxml.jackson.core</groupId>
	<artifactId>jackson-databind</artifactId>
</dependency>

在publisher和consumer中都要配置Messageconverter:

@Bean
public MessageConverter messageConverter(){
	return new Jackson2JsonMessageConverter();
}

在这里插入图片描述
在这里插入图片描述
消费者代码:

	@RabbitListener(queues = "object.queue")
    public void listenObjectQueue(Map<String,Object> msg)throws Exception {
        log.info("消费者监听到pbject.queue的消息,["+msg+"]");
    }

在这里插入图片描述
运行结果如下:
在这里插入图片描述

业务改造

需求:改造余额支付功能,不再同步调用交易服务的0penFeign接口,而是采用异步MO通知交易服务更新订单状态。
在这里插入图片描述
在trade-service微服务消费者配置和pay-service微服务发送者都配置MQ依赖

	<!--消息发送-->
  <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>

在trade-service微服务和pay-service微服务添加上RabbitMQ配置信息

spring:
  rabbitmq:
    host: 192.168.244.136
    port: 5672
    virtual-host: /hmall
    username: hmall
    password: 1234

因为消费者和发送者都需要消息转换器,故直接将代码写到hm-common服务中,在config包中创建MqConfig类

package com.hmall.common.config;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqConfig {
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

同时trade-service微服务和pay-service微服务是无法自动扫描到该类,采用SpringBoot自动装配的原理,在resource文件夹下的META-INF文件夹下的spring.factories文件中添加类路径:
在这里插入图片描述

在接收者trade-service微服务中创建PayStatusListener

package com.hmall.trade.listener;

import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class PayStatusListener {
    private final IOrderService orderService;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("trade.pay.success.queue"),
            exchange = @Exchange(value = "pay.direct"),
            key = "pay.success"
    ))
    public void ListenPaySuccess(Long orderId) {
        orderService.markOrderPaySuccess(orderId);
    }

}

修改pay-service服务下的com.hmall.pay.service.impl.PayOrderServiceImpl类中的tryPayOrderByBalance方法:

@Service
@RequiredArgsConstructor
@Slf4j
public class PayOrderServiceImpl extends ServiceImpl<PayOrderMapper, PayOrder> implements IPayOrderService {
	private final RabbitTemplate rabbitTemplate;
	...
	@Override
	@Transactional
	public void tryPayOrderByBalance(PayOrderDTO payOrderDTO) {
    	// 1.查询支付单
   		PayOrder po = getById(payOrderDTO.getId());
   		// 2.判断状态
    	if(!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())){
        	// 订单不是未支付,状态异常
        	throw new BizIllegalException("交易已支付或关闭!");
    	}
    	// 3.尝试扣减余额
    	userClient.deductMoney(payOrderDTO.getPw(), po.getAmount());
    	// 4.修改支付单状态
    	boolean success = markPayOrderSuccess(payOrderDTO.getId(), LocalDateTime.now());
    	if (!success) {
        	throw new BizIllegalException("交易已支付或关闭!");
    	}
    	// 5.修改订单状态
    	// tradeClient.markOrderPaySuccess(po.getBizOrderNo());
    	try {
        	rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo());
    	} catch (Exception e) {
        	log.error("支付成功的消息发送失败,支付单id:{}, 交易单id:{}", po.getId(), po.getBizOrderNo(), e);
    	}
	}
}

在这里插入图片描述
在这里插入图片描述


总结

本文介绍了基于Spring框架的注解方式来配置消息队列、交换机以及消息转换器的实现方法。通过注解配置,开发者可以更轻松地创建和管理RabbitMQ等消息队列的组件,而无需过多的 XML 配置或繁琐的手动配置。具体来说,我们探讨了如何:

使用 @RabbitListener 和 @EnableRabbit 注解配置消息监听器和消息队列。
配置消息转换器,特别是如何通过 Jackson2JsonMessageConverter 将消息转换为JSON格式,从而实现数据的序列化与反序列化。
结合业务需求,讲解如何对现有系统进行改造,集成消息队列,实现异步处理和服务解耦。
通过这些配置和改造,系统的消息处理能力得到了增强,性能和可扩展性也得到了显著提升。消息队列的使用不仅能够减少服务之间的紧耦合,还能够通过异步方式提高系统的响应速度和吞吐量。

希望本博客能够帮助您理解Spring在消息队列方面的强大功能,并为您的业务应用提供参考。随着系统复杂度的增加,合理的使用消息队列将成为构建高可用、高性能系统的关键之一。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/977826.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

速通HTML

目录 HTML基础 1.快捷键 2.标签 HTML进阶 1.列表 a.无序列表 b.有序列表 c.定义列表 2.表格 a.内容 b.合并单元格 3.表单 a.input标签 b.单选框 c.上传文件 4.下拉菜单 5.文本域标签 6.label标签 7.按钮标签 8.无语义的布局标签div与span 9.字符实体 HTML…

ui设计公司兰亭妙微分享:科研单位UI界面设计

科研单位的UI界面设计是一项至关重要的任务&#xff0c;它不仅关乎科研工作的效率&#xff0c;还直接影响到科研人员的用户体验。以下是对科研单位UI界面设计的详细分析&#xff1a; 一、设计目标 科研单位的UI界面设计旨在提升科研工作的效率与便捷性&#xff0c;同时确保科…

蓝桥杯刷题-dp-线性dp(守望者的逃离,摆花,线段)

[NOIP 2007 普及组] 守望者的逃离 题目描述 恶魔猎手尤迪安野心勃勃&#xff0c;他背叛了暗夜精灵&#xff0c;率领深藏在海底的娜迦族企图叛变。 守望者在与尤迪安的交锋中遭遇了围杀&#xff0c;被困在一个荒芜的大岛上。 为了杀死守望者&#xff0c;尤迪安开始对这个荒岛…

【算法设计与分析】(一)介绍算法与复杂度分析

【算法设计与分析】&#xff08;一&#xff09;介绍算法与复杂度分析 前言一、什么是算法&#xff1f;二、算法的抽象机制三、描述算法四、复杂度分析4.1 时间复杂度4.2 空间复杂度 前言 从搜索引擎的高效检索&#xff0c;到推荐系统的个性化推荐&#xff0c;再到人工智能领域…

自动驾驶两个传感器之间的坐标系转换

有两种方式可以实现两个坐标系的转换。 车身坐标系下一个点p_car&#xff0c;需要转换到相机坐标系下&#xff0c;旋转矩阵R_car2Cam&#xff0c;平移矩阵T_car2Cam。点p_car在相机坐标系下记p_cam. 方法1&#xff1a;先旋转再平移 p_cam T_car2Cam * p_car T_car2Cam 需要注…

OpenGL ES -> GLSurfaceView绘制点、线、三角形、正方形、圆(顶点法绘制)

XML文件 <?xml version"1.0" encoding"utf-8"?> <com.example.myapplication.MyGLSurfaceViewxmlns:android"http://schemas.android.com/apk/res/android"android:layout_width"match_parent"android:layout_height"…

基于springboot大学生学科竞赛管理系统(源码+lw+部署文档+讲解),源码可白嫖!

摘要 学科竞赛一直是检测学生学习能力好坏的重要手段&#xff0c;随着社会的发展&#xff0c;学科竞赛已经渗透到各个方面。但是传统方式的竞赛方式已经不能更好的胜任越来越多的需求&#xff0c;所以需要设计一个大学生学科竞赛管理系统&#xff0c;来满足日益重要的学科竞赛…

Dify私有化部署自己的AI Agent

1、下载Dify git clone gitgithub.com:langgenius/dify.git 2、创建Dify配置 进入dify目录下的docker目录中,复制.env.example为 .env 3、使用Docker命令进行部署Dify docker compose up -d 4、访问Dify http://localhost/install 5、 设置模型供应商 配置环境变量&#xff1…

【Deepseek+Browser-Use搭建 Web UI自动化】

参考文档&#xff1a;browser-use WebUI DeepSeek V3 把浏览器整成自动化了!_browser use webui 执行run agent chrome没出来-CSDN博客 1、 安装完成&#xff1a; 三、安装步骤&#xff08;适用于macOs、windows、linux&#xff09; 1、拉取WebUI项目 git clone https://gi…

DeepSeek + Mermaid编辑器——常规绘图

下面这张图出自&#xff1a;由清华大学出品的 《DeepSeek&#xff1a;从入门到精通》。 作为纯文本生成模型&#xff0c;DeepSeek虽不具备多媒体内容生成接口&#xff0c;但其开放式架构允许通过API接口与图像合成引擎、数据可视化工具等第三方系统进行协同工作&#xff0c;最终…

解决数据库建表错误:ERROR 1064 (42000) You have an error in your SQL

[TOC](解决数据库建表错误&#xff1a;ERROR 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near ‘desk tb_user’ at line 1) 运用MySQL命令运行sql语句进行建表时&am…

compare-form.vue 的 v 来源(来自父组件index.vue中的row行数据)

文章目录 compare-form.vue 的父组件compare-form.vue 的 v 来源相关代码片段1. value 的 Prop 定义2. Watch(value) 及其 watchValue 方法3. 与 value 间接相关的代码&#xff08;影响 v 的初始化或使用&#xff09; 总结 子组件 compare-form.vue父组件 index.vue 以下是关于…

【深度学习神经网络学习笔记(三)】向量化编程

向量化编程 向量化编程前言1、向量化编程2、向量化优势3、正向传播和反向传播 向量化编程 前言 向量化编程是一种利用专门的指令集或并行算法来提高数据处理效率的技术&#xff0c;尤其在科学计算、数据分析和机器学习领域中非常常见。它允许通过一次操作处理整个数组或矩阵的…

基于 SpringBoot Vue 的生鲜商城系统设计和实现(源码+文档+部署讲解)

技术范围&#xff1a;SpringBoot、Vue、SSM、HLMT、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、小程序、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容&#xff1a;免费功能设计、开题报告、任务书、中期检查PPT、系统功能实现、代码编写、论文编写和辅导、论…

电机控制的空间矢量调制 (SVPWM)

目录 概述 1 电机控制的空间矢量调制 (SVPWM)介绍 2 实现原理 2.1 设计要求 2.2 SVPWM 的实现 3 SVPWM的C语言 3.1 代码文件 3.2 STM32G4平台上验证 4 源代码文件 概述 本文主要介绍电机控制的空间矢量调制 (SVPWM)&#xff0c;空间矢量调制 (SVPWM) 是感应电机和永磁…

服务器离线部署DeepSeek

目标 本次部署的目标是在本地服务器上部署DeepSeek。但是该服务不能连接外网&#xff0c;因此只能使用离线部署的方式。为了一次完成部署。现在云服务器上进行尝试。 云服务器部署尝试 云服务器配置 CentOS72080Ti 11GB 安装准备 1、上传iso并配置为本地yum源 安装前先将…

Unity打包APK报错 using a newer Android Gradle plugin to use compileSdk = 35

Unity打包APK报错 using a newer Android Gradle plugin to use compileSdk 35 三个报错信息如下 第一个 WARNING:We recommend using a newer Android Gradle plugin to use compileSdk 35This Android Gradle plugin (7.1.2) was tested up to compileSdk 32This warning…

Ubuntu 22.04安装K8S集群

以下是Ubuntu 22.04安装Kubernetes集群的步骤概要 一、设置主机名与hosts解析 # Master节点执行 sudo hostnamectl set-hostname "k8smaster" # Worker节点执行 sudo hostnamectl set-hostname "k8sworker1"# 所有节点的/etc/hosts中添加&#xff1a; ca…

《AI 大模型 ChatGPT 的传奇》

《AI 大模型 ChatGPT 的传奇》 ——段方 某世界 100 强企业大数据/AI 总设计师 教授 北京大学博士后 助理 &#xff1a;1三6三二四61四五4 1 AI 大模型的概念和特点 1.1 什么是”大模型、多模态“&#xff1f; 1.2 大模型带来了什么&#xff1f; 1.3 大模型为什么能产生质变&am…

期权帮|股指期货多单和空单有什么区别?

锦鲤三三每日分享期权知识&#xff0c;帮助期权新手及时有效地掌握即市趋势与新资讯&#xff01; 股指期货多单和空单有什么区别&#xff1f; 一、股指期货多单和空单定义与操作方向&#xff1a; &#xff08;1&#xff09;股指期货多单定义&#xff1a;投资者买入股指期货合…