rabbitMq举例

新来个技术总监,把 RabbitMQ 讲的那叫一个透彻,佩服!

生产者

代码举例

public String sendMsg(final String exchangeName,final String routingKey,final String msg) {}

/**
	 * 发送消息
	 * @param exchangeName	exchangeName
	 * @param routingKey	routingKey
	 * @param msg	mq message
	 * @return msgId
	 */
	public String sendMsg(final String exchangeName,final String routingKey,final String msg) {
		final CorrelationData correlationDataInfo = new CorrelationData();
		final String msgId = String.join(":", "yl:dps", exchangeName, routingKey,
				String.valueOf(IdUtil.getSnowflake(1, 3).nextId()));
		correlationDataInfo.setId(msgId);

		//send over callback log
		rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
			if(correlationData!=null) {
				final String id = correlationData.getId();
				if (ack) {
					log.info("消息投递成功,消息Id[{}]【{}】", id, JSON.toJSONString(correlationData));
				} else {
					log.error("消息投递失败,消息Id[{}] [{}]", id, cause);
				}
			}else {
				log.error("消息投递失败,correlationData为null!");
			}
		});
		rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, correlationDataInfo);
		

分析代码

这段代码的功能是向 RabbitMQ 发送消息,并且在发送消息后通过回调函数来确认消息的投递状态。下面是对代码的详细分析:

1. 方法签名

java

public String sendMsg(final String exchangeName, final String routingKey, final String msg)

  • 该方法接收三个参数:
    • exchangeName:消息发送的交换机名称。
    • routingKey:消息的路由键,用来确定消息将被路由到哪个队列。
    • msg:待发送的消息内容。

2. CorrelationData 创建与消息 ID 设置

java

final CorrelationData correlationDataInfo = new CorrelationData(); final String msgId = String.join(":", "yl:dps", exchangeName, routingKey, String.valueOf(IdUtil.getSnowflake(1, 3).nextId())); correlationDataInfo.setId(msgId);

  • CorrelationData:RabbitMQ 提供的一个类,用于设置和传递消息的相关信息,主要用来在消息确认回调中标识消息。
  • 生成 msgIdmsgId 是通过拼接一个前缀 "yl:dps"、交换机名 exchangeName、路由键 routingKey,以及通过 IdUtil.getSnowflake(1, 3).nextId() 生成的雪花算法(Snowflake)ID来创建的唯一标识符。这个 ID 用来唯一标识这条消息。
  • setId(msgId):将生成的 msgId 设置到 correlationDataInfo 对象中。

3. 设置回调函数

java

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if(correlationData != null) { final String id = correlationData.getId(); if (ack) { log.info("消息投递成功,消息Id[{}]【{}】", id, JSON.toJSONString(correlationData)); } else { log.error("消息投递失败,消息Id[{}] [{}]", id, cause); } } else { log.error("消息投递失败,correlationData为null!"); } });

  • setConfirmCallback:通过 rabbitTemplate.setConfirmCallback() 方法设置消息确认回调函数。在消息发送之后,RabbitMQ 会调用该回调函数来通知消息是否成功投递到交换机。
  • 参数说明
    • correlationData:携带消息相关信息的对象(这里是 correlationDataInfo),包含了消息的 ID 等。
    • ack:布尔值,表示消息是否成功投递(true 表示成功,false 表示失败)。
    • cause:如果 ack 为 falsecause 会包含失败的原因。
  • 回调内容
    • 如果 correlationData 不为空:
      • 如果 ack 为 true,则表示消息投递成功,记录日志。
      • 如果 ack 为 false,则表示消息投递失败,记录失败日志,并包含失败原因。
    • 如果 correlationData 为 null,记录错误日志。

4. 发送消息

 

java

rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, correlationDataInfo);

  • convertAndSend:该方法用于将消息发送到指定的交换机(exchangeName)、路由键(routingKey)和消息内容(msg)。同时,将 correlationDataInfo(包含 msgId)附带发送,以便在回调中进行确认和记录。
  • rabbitTemplate 是 RabbitTemplate 对象,用来发送和接收消息。它是 Spring AMQP 的核心组件,封装了与 RabbitMQ 的交互。

5. 返回消息 ID

return msgId;

  • 返回生成的消息 ID。这个 ID 可以在外部被用于追踪消息投递的状态。

总结:

这段代码的主要作用是:

  • 生成一个唯一的消息 ID,包含了交换机名、路由键和通过 Snowflake 算法生成的 ID。
  • 设置消息确认回调,当消息发送后,RabbitMQ 会通过回调通知消息是否成功投递。
  • 调用 rabbitTemplate.convertAndSend() 方法向指定的交换机发送消息,同时附带 CorrelationData(包含消息 ID)。
  • 返回生成的消息 ID,以便后续追踪或处理。

关键点:

  • 消息确认机制:使用 ConfirmCallback 确保消息是否成功投递,并且通过日志记录投递状态。
  • 消息 ID:通过 Snowflake 算法生成一个唯一的消息 ID,可以用于追踪和日志记录。
  • 回调机制:确保生产者能在消息投递过程中获得详细的确认信息。

消费者

代码举例


    @RabbitHandler
    @RabbitListener(
            bindings = @QueueBinding(value = @Queue(value = "${rabbitmq.yl.x.queue}", durable = "true"),
                    exchange = @Exchange(value = "${rabbitmq.yl.x.exchange}"),
                    key = "${rabbitmq.yl.x.routing.key}"))
    public void stateChange(@Payload String payload, @Headers Map<String, Object> headers, Channel channel) throws IOException {
        log.info("MQ-收到【x】状态变更消息:{} ", payload);

        final String objDeliveryTag = String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG));

        if (NumberUtil.isNumber(objDeliveryTag)) {
            final long deliveryTag= Long.parseLong(objDeliveryTag);
            log.info("MQ-收到【x】MQ消息进行消费【执行ack】[{}]", deliveryTag);
            channel.basicAck(deliveryTag,true);
        }

        if(CharSequenceUtil.isNotBlank(payload)) {}

        }

在这个 Java 方法 leaseStateChange 中,通常是在使用 Spring AMQP 框架与 RabbitMQ 集成时出现的消费消息的处理方法。下面逐个解析这个方法中的各个部分:

方法签名解析

public void leaseStateChange(@Payload String payload, @Headers Map<String, Object> headers, Channel channel) throws IOException

1. public void leaseStateChange
  • public: 表明这个方法是公开的,可以被外部调用。
  • void: 表明这个方法没有返回值。
  • leaseStateChange: 这是方法的名称。可以推测这个方法是用来处理与“租赁状态变化”相关的消息的。
2. @Payload String payload
  • @Payload: 这是 Spring AMQP 框架中的一个注解,用于将消息体的内容绑定到方法参数中。在这个例子中,payload 是一个 String 类型的参数,代表从 RabbitMQ 队列中接收到的消息内容。通常,这个消息体是通过 JSON 或其他格式的字符串传递的。

    • 注解的作用: @Payload 使得 Spring 能够自动将消息的主体部分注入到方法参数 payload 中。比如,如果消息体是一个 JSON 格式的字符串,Spring 会将其直接赋值给 payload 参数。

    • 示例: 假设接收到的消息体是 "{"state": "active", "leaseId": "12345"}"payload 将会是该字符串。

3. @Headers Map<String, Object> headers
  • @Headers: 这是另一个 Spring AMQP 注解,用来将消息的头部信息注入到方法参数中。RabbitMQ 消息不仅有消息体(payload),还可能包含一些头信息(比如消息的发送时间、路由信息等)。

    • 注解的作用: @Headers 会将消息头部的内容绑定到 headers 参数,这个参数是一个 Map<String, Object> 类型,其中键是头部的名称,值是相应的值。头部信息常常用于传递一些附加信息(例如消息的优先级、发送者标识等)。

    • 示例: 如果消息头包含如下信息:

      {"correlationId": "abc123", "messageType": "leaseUpdate"}

      那么 headers 将会是一个 Map,其内容是:

      {"correlationId": "abc123", "messageType": "leaseUpdate"}
4. Channel channel
  • Channel: 这是 RabbitMQ 的核心概念之一。Channel 代表一个与 RabbitMQ 服务的连接通道,允许你在该通道上进行消息的消费、确认等操作。

    • 作用: 在 Spring AMQP 中,Channel 通常用来进行消息的确认(acknowledge)操作,或者处理消息处理失败时的重新排队等任务。你可以使用 Channel 来手动确认消息,或者控制消息是否成功消费。

    • 示例: 如果在消息处理过程中出现异常,消费者可能需要通过 channel.basicNack() 方法来拒绝该消息并可能重新入队。

5. throws IOException
  • throws IOException: 表明这个方法可能会抛出 IOException 异常。RabbitMQ 的消息操作可能会遇到 I/O 错误,因此需要在方法签名中声明可能抛出此异常。通常,这类异常会发生在与 RabbitMQ 的连接中断、消息传输过程失败时等。

Spring AMQP 消费者代码示例

假设这是一个处理来自某个队列的消息的方法,下面是该方法的使用场景和完整代码示例:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.MessageListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;

import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;

import java.io.IOException;
import java.util.Map;

@Component
public class LeaseStateChangeListener {

    // 监听指定队列的消息
    @RabbitListener(queues = "leaseStateQueue")
    public void leaseStateChange(@Payload String payload, 
                                 @Headers Map<String, Object> headers, 
                                 Channel channel) throws IOException {
        try {
            // 处理消息体
            System.out.println("Received message: " + payload);

            // 获取消息头部信息
            String correlationId = (String) headers.get("correlationId");
            String messageType = (String) headers.get("messageType");
            System.out.println("CorrelationId: " + correlationId + ", MessageType: " + messageType);

            // 模拟处理业务逻辑
            processLeaseStateChange(payload);

            // 确认消息已成功消费
            channel.basicAck(headers.hashCode(), false);  // 手动确认消息
        } catch (Exception e) {
            // 异常处理,拒绝消息并重新入队
            System.err.println("Error processing message: " + e.getMessage());
            channel.basicNack(headers.hashCode(), false, true); // 拒绝并重新入队
        }
    }

    private void processLeaseStateChange(String payload) {
        // 假设这里是处理租赁状态更新的业务逻辑
        // 比如将消息解析为对象,进行租赁状态更新
        System.out.println("Processing lease state change for payload: " + payload);
    }
}

解析

  • @RabbitListener: 注解的作用是声明这个方法是一个 RabbitMQ 消息的消费者,并且该方法监听 leaseStateQueue 队列。当有消息到达这个队列时,这个方法会被调用。

  • 消息体 (payload): 这个方法会接收到一个消息体,@Payload 注解将该消息的内容(通常是 JSON 格式的字符串)自动绑定到方法参数 payload 上。

  • 消息头 (headers): 使用 @Headers 注解将消息的头部信息绑定到 headers 参数上,Map<String, Object> 类型。你可以从中获取如 correlationIdmessageType 等附加信息。

  • Channel: 这个参数用于消息的确认、拒绝等操作。在成功处理完消息后,调用 channel.basicAck() 来确认消息,表示该消息已经被成功消费。如果处理失败,调用 channel.basicNack() 拒绝该消息,并可以选择是否重新入队。

总结

  • 该方法是一个 RabbitMQ 消费者,用于从指定的队列中消费消息。
  • 通过 @Payload 获取消息体内容,使用 @Headers 获取消息头信息。
  • 使用 Channel 来确认消息的处理状态。
  • 使用 @RabbitListener 注解自动监听队列,并处理消费的消息。

这种方式非常适合处理队列中的业务逻辑,并能够灵活处理消息的确认、拒绝等操作。

消息怎么知道发给哪一个队列

先看队列与交换机怎么绑定的

先创建队列,然后绑定到交换机

RabbitMQ系列-6.如何通过控制台创建交换机、队列、死信队列、延迟队列 - 简书

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/936619.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

对话小系统(智能图书助手)

对话小系统&#xff08;智能图书助手&#xff09; 文章说明核心代码效果展示源码下载 文章说明 现在GPT的功能十分强大&#xff0c;是否可以利用开源的接口来实现自己的智能小助手呢&#xff0c;我想到可以提供一些能力接口&#xff0c;然后对问询内容进行意图识别&#xff0c;…

数智读书笔记系列006 协同进化:人类与机器融合的未来

书名:协同进化&#xff1a;人类与机器融合的未来 作者:[美]爱德华阿什福德李 译者:李杨 出版时间:2022-06-01 ISBN:9787521741476 中信出版集团制作发行 爱德华・阿什福德・李&#xff08;Edward Ashford Lee&#xff09;是一位在计算机科学与工程领域颇具影响力的学者&am…

C# 探险之旅:第二十七节 - 类型class(属性) —— 给你的类穿上“属性”的外衣

嘿&#xff0c;探险家们&#xff01;欢迎再次踏上我们的C#奇幻之旅。今天&#xff0c;我们要聊聊一个超级有趣的话题——类的“属性”。想象一下&#xff0c;如果我们要给类穿上一件酷炫的外衣&#xff0c;那属性就是这件外衣上的各种口袋和装饰&#xff0c;让类变得既实用又拉…

数据保护策略:如何保障重要信息的安全

一、什么是数据安全&#xff1f; 数据安全是保护数字信息免遭盗窃、未经授权的访问和恶意修改的过程。这是一个持续的过程&#xff0c;负责监督信息的收集、存储和传输。 机密性&#xff1a;保护数据免遭未授权方访问。 完整性&#xff1a;保护数据免遭未经授权的修改、损坏…

Mvc、Springmvc框架

一.Mvc&#xff1a; 1.概念&#xff1a; MVC它是一种设计理念。把程序按照指定的结构来划分: Model模型 、View视图 、Controller控制层&#xff1b; 结构图&#xff1a; 二.Springmvc: 1.概念&#xff1a; springmvc框架它是spring框架的一个分支。它是按照mvc架构思想设计…

【CSS in Depth 2 精译_078】12.6 调整字间距,提升可读性 + 12.7 本章小结

当前内容所在位置&#xff08;可进入专栏查看其他译好的章节内容&#xff09; 第四部分 视觉增强技术 ✔️【第 12 章 CSS 排版与间距】 ✔️ 12.1 间距设置 12.1.1 使用 em 还是 px12.1.2 对行高的深入思考12.1.3 行内元素的间距设置 12.2 Web 字体12.3 谷歌字体12.4 font-fac…

Python高性能web框架-FastApi教程:(2)路径操作装饰器方法

路径操作装饰器方法 1. fastapi支持的各种请求方式 app.get() app.post() app.put() app.patch() app.delete() app.options() app.head() app.trace()2. 定义不同请求方式的路由 # 定义GET请求的路由 app.get(/get) def get_test():return {method: get方法} app.get(/get)…

PostgreSQL 入门

下载与安装 部分国产数据库采用PostgreSQL作为基础进行研发&#xff0c;因此先尝试了解一下原始数据库情况。 PostgreSQL 简称 PG 官网&#xff1a;https://www.postgresql.org/ PostgreSQL “世界上最先进的开源关系型数据库” 这是官网上的口号。 PostgreSQL: The World…

Java-26 深入浅出 Spring - 实现简易Ioc-02 无IoC与AOP场景下实现业务

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; 大数据篇正在更新&#xff01;https://blog.csdn.net/w776341482/category_12713819.html 目前已经更新到了&#xff1a; MyBatis&#xff…

工作流审批功能全解析:提升流程效率的关键要素

1. 引言 在当今数字化时代&#xff0c;企业与组织的运营效率在很大程度上依赖于高效、精准的工作流审批系统。随着业务日益复杂且多样化&#xff0c;审批流程变得愈加细致和灵活。一个完善的工作流审批系统不仅能确保任务在组织内部有序流转、协调各方资源&#xff0c;还能实现…

API接口示例:电商商品评论数据

当然&#xff0c;以下是一个简化的电商商品评论数据API接口的示例。请注意&#xff0c;这只是一个示例&#xff0c;实际的API接口可能会更加复杂&#xff0c;并且会包含更多的验证、错误处理和安全措施。 API接口示例&#xff1a;电商商品评论数据 基础信息 API名称&#xf…

HCIA-Access V2.5_2_2_2网络通信基础_IP编址与路由

网络层数据封装 首先IP地址封装在网络层&#xff0c;它用于标识一台网络设备&#xff0c;其中IP地址分为两个部分&#xff0c;网络地址和主机地址&#xff0c;通过我们采用点分十进制的形式进行表示。 IP地址分类 对IP地址而言&#xff0c;它细分为五类&#xff0c;A,B,C,D,E,…

Microsemi Libero使用技巧11——CoreUARTAPB RX管脚分配时不显示

调用串口IP核CoreUARTAPB&#xff0c;并例化到顶层设计&#xff0c;发现UART_RX管脚在进行管脚分配时没有显示出来&#xff0c;最后发现是CoreAPB3总线IP核配置不对导致&#xff0c;改为如下配置后正常。

SEGGER | 基于STM32F405 + Keil - RTT组件01 - 移植SEGGER RTT

导言 RTT(Real Time Transfer)是一种用于嵌入式中与用户进行交互的技术&#xff0c;它结合了SWO和半主机的优点&#xff0c;具有极高的性能。 使用RTT可以从MCU非常快速输出调试信息和数据&#xff0c;且不影响MCU实时性。这个功能可以用于很多支持J-Link的设备和MCU&#xff0…

SpringBoot集成JWT和Redis实现鉴权登录功能

目前市面上有许多鉴权框架&#xff0c;鉴权原理大同小异&#xff0c;本文简单介绍下利用JWT和Redis实现鉴权功能&#xff0c;算是抛砖引玉吧。 主要原理就是“令牌主动失效机制”&#xff0c;主要包括以下4个步骤&#xff1a; (1)利用拦截器LoginInterceptor实现所有接口登录拦…

29.在Vue 3中使用OpenLayers读取WKB数据并显示图形

在Web开发中&#xff0c;地理信息系统&#xff08;GIS&#xff09;应用越来越重要&#xff0c;尤其是在地图展示和空间数据分析的场景中。OpenLayers作为一个强大的开源JavaScript库&#xff0c;为开发者提供了丰富的地图展示和空间数据处理能力。在本篇文章中&#xff0c;我将…

【bWAPP】 HTML Injection (HTML注入)

我们都是在一条铺满荆棘的新路上摸索着前行&#xff0c;碰个鼻青眼肿几乎不可避免&#xff0c;而问题在于&#xff0c;我们能不能在这条路上跌倒之后&#xff0c;爬起来继续走下去。 HTML Injection - Reflected (GET) get方式的html代码注入 漏洞url&#xff1a;http://ran…

内网是如何访问到互联网的(华为源NAT)

私网地址如何能够访问到公网的&#xff1f; 在上一篇中&#xff0c;我们用任意一个内网的终端都能访问到百度的服务器&#xff0c;但是这是我们在互联网设备上面做了回程路由才实现的&#xff0c;在实际中&#xff0c;之前也说过运营商是不会写任何路由过来的&#xff0c;那对于…

tomcat的优化和动静分离

tomcat的优化 1.tomcat的配置优化 2.操作系统的内核优化 注意&#xff1a;设置保存后&#xff0c;需要重新ssh连接才会看到配置更改的变化 vim /etc/security/limits.conf # 65535 为Linux系统最大打开文件数 * soft nproc 65535 * hard nproc 65535 * soft nofile 65535 *…

粗略的过一下StableDiffusion3的一些方面

什么是Stable Diffusion 3 Stable Diffusion 3是由Stability AI开发的最新且最先进的文本生成图像模型之一&#xff0c;在图像保真度、多主体处理和文本遵循性方面实现了显著提升。该模型采用了全新的多模态扩散变压器&#xff08;MMDiT&#xff09;架构&#xff0c;并为图像和…