java springBoot实现RabbitMq消息队列 生产者,消费者

1.RabbitMq的数据源配置文件

# 数据源配置
spring:
    rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: root
        password: root
        #消息发送和接收确认
        publisher-confirms: true
        publisher-returns: true
        listener:
            direct:
                acknowledge-mode: manual
            simple:
                acknowledge-mode: manual
                retry:
                    enabled: true #是否开启消费者重试
                    max-attempts: 5 #最大重试次数
                    initial-interval: 2000 #重试间隔时间(单位毫秒)

2.maven依赖

<!-- rabbitmq -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.RabbitMq文件目录预览

4. RabbitMq的Action文件

package com.zq.cnz.mq.constant;

public enum Action {
	ACCEPT, // 处理成功
	RETRY, // 可以重试的错误
	REJECT, // 无需重试的错误
}

5.RabbitMq的QueueContent文件

package com.zq.cnz.mq.constant;

/**
 * @ClassName: QueueContent
 * @Description: 消息队列名称
 * @author 吴顺杰
 * @date 2023年11月15日
 *
 */
public class QueueContent {
	/**
	 * 测试消息队列
	 */
	public static final String TEST_MQ_QUEUE = "test_mq_queue";


	/**
	 * 测试消息队列交换机
	 */
	public static final String TEST_MQ_QUEUE_EXCHANGE = "test_mq_queue_exchange";

	/**
	 * 测试消息延迟消费队列
	 */
	public static final String TEST_MQ_QUEUE_TIME_DELAY_EXCHANGE = "test_mq_queue_time_delay_exchange";

}

6.消息队列生产者MessageProvider方法

package com.zq.cnz.mq;

import com.alibaba.fastjson.JSONObject;
import com.zq.common.utils.IdUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

/**
 * 消息队列生产
 */
@Component
public class MessageProvider implements RabbitTemplate.ConfirmCallback {
	static Logger logger = LoggerFactory.getLogger(MessageProvider.class);

	/**
	 * RabbitMQ 模版消息实现类
	 */
	protected RabbitTemplate rabbitTemplate;

	public MessageProvider(RabbitTemplate rabbitTemplate) {
		this.rabbitTemplate = rabbitTemplate;
		this.rabbitTemplate.setMandatory(true);
		this.rabbitTemplate.setConfirmCallback(this);
	}

	private String msgPojoStr;

	/**
	 * 推送消息至消息队列
	 *
	 * @param msg
	 * @param queueName
	 */
	public void sendMqMessage(String queueName,String msg) {
		try {
			JSONObject object = JSONObject.parseObject(msg);
			String msgId = IdUtils.fastUUID().toString();
			object.put("msgId", msgId);
			msg = object.toString();
			msgPojoStr = msg;
			logger.info("推送消息至" + queueName + "消息队列,消息内容" + msg);
			rabbitTemplate.convertAndSend(queueName, msg);
		} catch (AmqpException e) {
			e.printStackTrace();
			logger.error("推送消息至消息队列异常 ,msg=" + msg + ",queueName=" + queueName, e);
		}
	}

	/**
	 * 推送广播消息
	 *
	 * @param exchangeName
	 * @param msg
	 */
	public void sendFanoutMsg(String exchangeName, String msg) {
		try {
			JSONObject object = JSONObject.parseObject(msg);
			String msgId = IdUtils.fastUUID().toString();
			object.put("msgId", msgId);
			msg = object.toString();
			msgPojoStr = msg;
			logger.info("推送广播消息至交换机" + exchangeName + ",消息内容" + msg);
			rabbitTemplate.convertAndSend(exchangeName, "", msg);
		} catch (AmqpException e) {
			e.printStackTrace();
			logger.error("推送广播至交换机异常 ,msg=" + msg + ",exchangeName=" + exchangeName, e);
		}
	}

	/**
	 * 发送延时消息
	 *
	 * @param queueName
	 * @param msg
	 */
	public void sendTimeDelayMsg(String queueName, String exchangeName, String msg, Integer time) {
		try {
			JSONObject object = JSONObject.parseObject(msg);
			String msgId = IdUtils.fastUUID().toString();
			object.put("msgId", msgId);
			msg = object.toString();
			msgPojoStr = msg;
			logger.info("推送延时消息至" + exchangeName + "," + queueName + "消息队列,消息内容" + msg + ",延时时间" + time + "秒");
			rabbitTemplate.convertAndSend(exchangeName, queueName, msg, new MessagePostProcessor() {
				@Override
				public Message postProcessMessage(Message message) throws AmqpException {
					message.getMessageProperties().setHeader("x-delay", time * 1000);
					return message;
				}
			});
		} catch (AmqpException e) {
			e.printStackTrace();
			logger.error("推送消息至消息队列异常 ,msg=" + msg + ",exchangeName=" + exchangeName + ",queueName=" + queueName
					+ ",time=" + time, e);
		}
	}

	@Override
	public void confirm(CorrelationData correlationData, boolean ack, String cause) {
		if (ack) {
			logger.info(msgPojoStr + ":消息发送成功");
		} else {
			logger.warn(msgPojoStr + ":消息发送失败:" + cause);
		}
	}

}

7.消息队列消费者RabbitMqConfiguration文件配置

package com.zq.cnz.mq;

import com.zq.cnz.mq.constant.QueueContent;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMqConfiguration {

	@Resource
	RabbitAdmin rabbitAdmin;


	// 创建初始化RabbitAdmin对象
	@Bean
	public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
		RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
		// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
		rabbitAdmin.setAutoStartup(true);
		return rabbitAdmin;
	}

	/**
	 * 测试消息队列
	 *
	 * @return
	 */
	@Bean
	public Queue TEST_QUEUE() {
		return new Queue(QueueContent.TEST_MQ_QUEUE);
	}

	/**
	 * 测试交换机
	 *
	 * @return
	 */
	@Bean
	FanoutExchange TEST_MQ_QUEUE_EXCHANGE() {
		return new FanoutExchange(QueueContent.TEST_MQ_QUEUE_EXCHANGE);
	}


	/**
	 * 测试延迟消费交换机
	 *
	 * @return
	 */
	@Bean
	public CustomExchange TEST_MQ_QUEUE_TIME_DELAY_EXCHANGE() {
		Map<String, Object> args = new HashMap<>();
		args.put("x-delayed-type", "direct");
		return new CustomExchange(QueueContent.TEST_MQ_QUEUE_TIME_DELAY_EXCHANGE, "x-delayed-message", true, false, args);
	}

	/**
	 * 测试延迟消费交换机绑定延迟消费队列
	 *
	 * @return
	 */
	@Bean
	public Binding banTestQueue() {
		return BindingBuilder.bind(TEST_QUEUE()).to(TEST_MQ_QUEUE_TIME_DELAY_EXCHANGE()).with(QueueContent.TEST_MQ_QUEUE).noargs();
	}


	// 创建交换机和对列,跟上面的Bean的定义保持一致
	@Bean
	public void createExchangeQueue() {
		//测试消费队列
		rabbitAdmin.declareQueue(TEST_QUEUE());
		//测试消费交换机
		rabbitAdmin.declareExchange(TEST_MQ_QUEUE_EXCHANGE());
		//测试延迟消费交换机
		rabbitAdmin.declareExchange(TEST_MQ_QUEUE_TIME_DELAY_EXCHANGE());
	}

}

8.TestQueueConsumer 消息队列消费+延迟消费

package com.zq.cnz.mq.MessageConsumer;

import com.alibaba.druid.util.StringUtils;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.zq.cnz.mq.MessageProvider;
import com.zq.cnz.mq.constant.Action;
import com.zq.cnz.mq.constant.QueueContent;
import com.zq.common.utils.IdUtils;
import com.zq.common.utils.RedisUtils;
import com.zq.common.utils.spring.SpringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 测试消息队列消费
 */
@Component
@RabbitListener(queues = QueueContent.TEST_MQ_QUEUE)
public class TestQueueConsumer {
    @Autowired
    private RedisUtils redisUtils;
    static final Logger logger = LoggerFactory.getLogger(TestQueueConsumer.class);

    @RabbitHandler
    public void handler(String msg, Channel channel, Message message) throws IOException {
        if (!StringUtils.isEmpty(msg)) {
            JSONObject jsonMsg = JSONObject.parseObject(msg);
//            logger.info("TestQueueConsumer:"+jsonMsg.toJSONString());
            Action action = Action.RETRY;
//			获取消息ID
            String msgId = jsonMsg.getString("msgId");
//			消费次数+1
            redisUtils.incr("MQ_MSGID:" + msgId, 1);
            redisUtils.expire("MQ_MSGID:" + msgId, 60);
            try {
                logger.info("测试消费队列消费成功啦,消费信息:"+jsonMsg.getString("test"));
                action = Action.ACCEPT;
            } catch (Exception e) {
                logger.error("MQ_MSGID:" + msgId + ",站控权限请求关闭接口异常,msg=" + msg, e);
            } finally {
                // 通过finally块来保证Ack/Nack会且只会执行一次
                if (action == Action.ACCEPT) {
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                } else if (action == Action.RETRY) {
//					判断当前消息消费次数,已经消费3次则放弃消费
                    if ((Integer) redisUtils.get("MQ_MSGID:" + msgId) >= 3) {
                        logger.error("MQ_MSGID:" + msgId + ",异步处理超出失败次数限制,msg=" + msg);
                        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                    } else {
//						回归队列重新消费
                        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                    }
                } else {
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                }
            }
        }
    }
}

9.TestExchangeConsumer 交换机广播模式 

package com.zq.cnz.mq.MessageConsumer;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.rabbitmq.client.Channel;
import com.zq.cnz.mq.constant.Action;
import com.zq.cnz.mq.constant.QueueContent;
import com.zq.common.utils.IdUtils;
import com.zq.common.utils.RedisUtils;
import com.zq.common.utils.StringUtils;
import com.zq.common.utils.spring.SpringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.List;

/**
 * 测试交换机消费
 */
@Component
@RabbitListener(bindings = @QueueBinding(value = @Queue(), exchange = @Exchange(value = QueueContent.TEST_MQ_QUEUE_EXCHANGE, type = ExchangeTypes.FANOUT)))
public class TestExchangeConsumer {
    static final Logger logger = LoggerFactory.getLogger(TestExchangeConsumer.class);
    @Resource
    private RedisUtils redisUtils;

    @RabbitHandler
    public void handler(String msg, Channel channel, Message message) throws IOException {
        if (!StringUtils.isEmpty(msg)) {
//            logger.info("接收交换机生产者消息:{}", msg);
            Action action = Action.ACCEPT;
            // 请求参数
            JSONObject jsonMsg = JSONObject.parseObject(msg);
//			获取消息ID
            String msgId = jsonMsg.getString("msgId");

//			消费次数+1
            redisUtils.incr("MQ_MSGID:" + msgId, 1);
            redisUtils.expire("MQ_MSGID:" + msgId, 60);
            try {

                Integer CMD = jsonMsg.getInteger("cmd");
                if (CMD==1) {
                    logger.info("cmd1测试消费队列消费成功啦,消费信息:"+jsonMsg.getString("test"));
                }else if(CMD==2){
                    logger.info("cmd2测试消费队列消费成功啦,消费信息:"+jsonMsg.getString("test"));
                }
                action = Action.ACCEPT;
            } catch (Exception e) {
                action = Action.REJECT;
                e.printStackTrace();
            } finally {
                // 通过finally块来保证Ack/Nack会且只会执行一次
                if (action == Action.ACCEPT) {
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

                } else if (action == Action.RETRY) {
//					判断当前消息消费次数,已经消费3次则放弃消费
                    if ((Integer) redisUtils.get("MQ_MSGID:" + msgId) >= 3) {
                        logger.error("MQ_MSGID::" + msgId + ",换电失败消息队列消费了三次,msg=" + msg);
                        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                    } else {
//						回归队列重新消费
                        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                    }
                } else {
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                }

            }
        }
    }
}

运行项目 调用RabbitmqTestController生产RabbitMq消息体, TestExchangeConsumer和TestQueueConsumer自动消费

package com.zq.web.controller.tool;
import com.alibaba.fastjson.JSONObject;
import com.zq.cnz.mq.MessageProvider;
import com.zq.cnz.mq.constant.QueueContent;
import com.zq.common.utils.IdUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;


/**
 * 消息队列测试
 */
@RestController
@RequestMapping("/test/mq")
public class RabbitmqTestController {
    @Resource
    private MessageProvider messageProvider;

    /**
     * 查询储能站信息列表
     */
    @GetMapping("/putMq")
    public void putMq(){
        JSONObject obj=new JSONObject();
        obj.put("test","测试数据");
        //推送消息至消息队列
       messageProvider.sendMqMessage(QueueContent.TEST_MQ_QUEUE,obj.toString());
        obj.put("cmd",1);
        obj.put("test","这是广播消费");
        //推送广播消息
        messageProvider.sendFanoutMsg(QueueContent.TEST_MQ_QUEUE_EXCHANGE,obj.toString());
        //发送延时消息
        obj.put("cmd",2);
        obj.put("test","这是延迟消费");
        messageProvider.sendTimeDelayMsg(QueueContent.TEST_MQ_QUEUE,QueueContent.TEST_MQ_QUEUE_TIME_DELAY_EXCHANGE,obj.toString(),2*60);

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/148537.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

mysql---主从复制和读写分离

主从复制 主从复制&#xff0c;修改&#xff0c;表里的数据&#xff1a;主mysql上的数据&#xff0c;新增都会同步到从mysql上面试题&#xff1a;mysql的主从复制的模式&#xff1a; 1、异步复制&#xff1a;mysql的默认复制就是异步复制。只要执行完之后&#xff0c;客户端提…

自动化测试Mock神器:轻松模拟HTTP请求..

一、背景 在日常测试过程中或者研发开发过程中&#xff0c;目前接口暂时没有开发完成&#xff0c;测试人员又要提前介入接口测试中&#xff0c;测试人员不仅仅只是简单的编写测试用例&#xff0c;也可以通过一些mock的方法进行来提前根据接口测试的情况进行模拟返回接口的信息…

软件测试/测试开发丨接口自动化测试学习笔记,加密与解密

点此获取更多相关资料 本文为霍格沃兹测试开发学社学员学习笔记分享 原文链接&#xff1a;https://ceshiren.com/t/topic/28019 一、原理 在得到响应后对响应做解密处理&#xff1a; 如果知道使用的是哪个通用加密算法的话&#xff0c;可以自行解决。如果不了解对应的加密算法…

终于有人把VMware虚拟机三种网络模式讲清楚了!

前段时间VMware更新了&#xff0c;你用上最新版了吗&#xff1f; 有几个网工在操作中遇到过各种各样的问题。 比如说由于公司服务器重启导致出现下面的问题&#xff1a;在Xshell里连接虚拟机映射时连接失败&#xff1b;能够连接上虚拟机的映射地址&#xff0c;但git pull时报…

企业常用的几种FTP传输加速方式,最后一种百倍提速

在数字化时代&#xff0c;FTP传输协议仍然是企业之间进行文件传输的重要方式之一。但是&#xff0c;传统的FTP传输速度较慢&#xff0c;对于大文件和海量数据的传输更是显得力不从心。为了提高FTP传输速度&#xff0c;企业们通常会采取一些加速方式。本文将介绍几种企业常用的F…

考研分享第3期 | 211本378分上岸大连理工电子信息经验贴

考研分享第3期 | 211本378分上岸大连理工电子信息经验贴 一、个人信息 姓名&#xff1a;Ming 本科院校&#xff1a;某211学校电子信息工程学院 电子科学与技术专业 上岸院校&#xff1a;大连理工大学 电子信息与电气工程学部 电子信息&#xff08;0854&#xff09; 择校意…

CSGO游戏搬砖项目需要掌握哪些基础知识?

CSGO搬砖之90%饰品商人都不知道的玄学皮肤盘点 CSGO游戏搬砖主要就是倒卖装备&#xff0c;那具体是哪些装备&#xff0c;以及怎么去区分皮肤类型&#xff0c;今天童话就给大家介绍一下。 CSGO游戏搬砖虽然不要求会玩游戏&#xff0c;但是我们作为一个商人&#xff0c;要知道我…

JAVA基础9:Debug

1.Debug概述 Debug:是供程序员使用的程序调试工具&#xff0c;它可以用于查看程序的执行流程&#xff0c;也可以用于追踪程序执行过程来调试程序。 2.Debug操作流程 Debug调试&#xff0c;又被称为断点调试&#xff0c;断点其实是一个标记&#xff0c;告诉我们从哪里开始查看…

MyBatis配置与映射文件深度解析

文章目录 MyBatis配置文件解析配置文件的组成部分配置数据源和数据库连接信息MyBatis的属性和类型别名 MyBatis映射文件详解映射文件的作用编写简单的映射文件resultMap和resultType的区别 结语 &#x1f388;个人主页&#xff1a;程序员 小侯 &#x1f390;CSDN新晋作者 &…

Postman如何发送Https请求

Postman如果想要发送Https请求&#xff0c;需要从设置中将SSL安全认证禁用

LoadRunner脚本编写之三(事务函数)

关于脚本的这块&#xff0c;前两篇都在讲C语言&#xff0c;其实&#xff0c;要整理点实用的东西挺难&#xff0c;在应用中多对录制的脚本分析&#xff0c;但对于新手学脚本确实无从下手。 先贴一个脚本&#xff1a; 完整代码&#xff1a; 重点代码部分&#xff1a; Action(…

数据中心:精密空调监控,这招太高效了!

在当今日益复杂的工业环境中&#xff0c;精密空调系统的监控和管理变得至关重要。随着科技的迅猛发展&#xff0c;各行各业对温度、湿度和空气质量等参数的高度控制需求不断增加。 精密空调监控系统通过实时数据采集、分析和反馈&#xff0c;为企业提供了可靠的手段来确保生产环…

一步路难倒英雄汉?app自动化测试,怎么从零搭建appium!

不少软件测试想进阶到自动化测试&#xff0c;没有前人知道&#xff0c;只能像个无头的苍蝇&#xff0c;到处乱转&#xff0c;根本不知道从何处下手 特别是自学路上碰到需要安装什么程序、工具的时候&#xff0c;一个报错就需要在百度上查个半天&#xff0c;这么浪费时间的事情…

入站一个月涨粉80万!B站竖屏UP主如何突出重围?

B站仍然秉持着“内容为王”的社区氛围&#xff0c;这也是众多UP主们一同坚持的事。不管是今年宣布的Story Mode竖屏模式开放还是14周年庆上B站董事长兼CEO陈睿宣布作品播放量改播放分钟数等等改动来看&#xff0c;都能感受到B站在向更多优质创作者招手&#xff0c;并维护着优质…

MySQL--视图、存储过程、触发器

1、视图 1、定义&#xff1a; 所谓的视图是一种虚拟存在的表&#xff0c;视图中的数据并不在数据库中实际存在&#xff0c;就是视图只保存了查询的SQL逻辑&#xff0c;不保存查询的结果&#xff0c;所以在创建视图的时候&#xff0c;主要的工作就是落在创建这条SQL查询语句的时…

web 渗透 信息搜集

一 收集域名信息 1.whois查询 whois&#xff08;读作“Who is”&#xff0c;非缩写&#xff09;&#xff0c;标准的互联网协议&#xff0c…

Vue 模板语法 v-bind

红色框里面的都是vue的模板。有了模板就得有模板的特殊语法。上面只是简单的双括号加上表达式&#xff0c;这种叫做插值语法&#xff0c;除了这种语法还有其他语法吗&#xff1f; 插值语法实现的功能很单一&#xff0c;就是将指定的值放到指定的位置。还有一种叫做指令语法&am…

光伏仪器-87110A/B太阳辐照度计

87110A/B 太阳辐照度计 光伏仪器 一款小巧、全数字化的太阳辐照度测试仪表&#xff0c;通过标准太阳电池测试太阳辐照度&#xff0c;并自带温度修正功能。太阳辐照度计集成了环境温度、电池板温度、倾斜角等测试功能&#xff0c;可以通过附带的蓝牙或串行接口连至电脑或智能…

多种格式图片可用的二维码生成技巧,快来学习一下

将图片存入二维码是现在很常见的一种图片展现方式&#xff0c;有效的节省了图片占用内容空间以及获取图片内容的速度&#xff0c;所以现在会有很多人将不同的图片、照片生成二维码展示。如何使用图片二维码生成器来快速生成二维码呢&#xff1f;下面就让小编来给大家分享一下图…

CV计算机视觉每日开源代码Paper with code速览-2023.11.13

精华置顶 墙裂推荐&#xff01;小白如何1个月系统学习CV核心知识&#xff1a;链接 点击CV计算机视觉&#xff0c;关注更多CV干货 论文已打包&#xff0c;点击进入—>下载界面 点击加入—>CV计算机视觉交流群 1.【基础网络架构&#xff1a;Transformer】PolyMaX: Gener…