Centos安装RabbitMQ,JavaSpring发送RabbitMQ延迟延时消息,JavaSpring消费RabbitMQ消息

1,版本说明

erlang 和 rabbitmq 版本说明
https://www.rabbitmq.com/which-erlang.html
确认需要安装的mq版本以及对应的erlang版本。

2,下载安装文件

RabbitMQ下载地址:
https://packagecloud.io/rabbitmq/rabbitmq-server

Erlang下载地址:
https://packagecloud.io/rabbitmq/erlang

RabbitMQ延迟消息插件下载
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

下载文件如图

在这里插入图片描述

3,安装步骤

3.1, 查询是否有安装过erlang、rabbitmq, 查询到有的话需要删除。

	rpm -qa | grep rabbitmq-server
	rpm -qa | grep erlang
	# 删除
	yum -y remove rabbitmq-server.noarch

3.2, 本地安装erlang

	yum localinstall erlang-23.2.7-2.el7.x86_64.rpm
	# 查询安装的版本
	erl -version
	# Erlang (SMP,ASYNC_THREADS,HIPE) (BEAM) emulator version xxx

3.3, 本地安装rabbitmq

	yum localinstall rabbitmq-server-3.9.0-1.el7.noarch.rpm
	# 启动rabbitmq
	systemctl start rabbitmq-server

	# 查看rabbitmq状态
	systemctl status rabbitmq-server

	# 设置rabbitmq服务开机自启动
	systemctl enable rabbitmq-server

	# 关闭rabbitmq服务
	systemctl stop rabbitmq-server

	# 重启rabbitmq服务
	systemctl restart rabbitmq-server

3.4, mq 端口开放:

	firewall-cmd --zone=public --add-port=5672/tcp --permanent
	firewall-cmd --zone=public --add-port=15672/tcp --permanent
	firewall-cmd --reload
	firewall-cmd --zone=public --list-ports

3.5, 安装mq管理界面

	
	# 启用管理界面插件
	rabbitmq-plugins enable rabbitmq_management

	curl http://localhost:15672 就可以打开web管理页面

	# rabbitmq有一个默认的账号密码guest,但该情况仅限于本机localhost进行访问,所以需要添加一个远程登录的用户

	# 添加用户
	rabbitmqctl add_user 用户名 密码

	rabbitmqctl add_user admin 123456

	# 设置用户角色,分配操作权限
	rabbitmqctl set_user_tags 用户名 角色

	rabbitmqctl set_user_tags admin administrator

	# 为用户添加资源权限(授予访问虚拟机根节点的所有权限)
	rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*"

	rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

	# 角色有四种:
	# administrator:可以登录控制台、查看所有信息、并对rabbitmq进行管理
	# monToring:监控者;登录控制台,查看所有信息
	# policymaker:策略制定者;登录控制台指定策略
	# managment:普通管理员;登录控制

	# 修改密码
	rabbitmqctl change_ password 用户名 新密码

	# 删除用户
	rabbitmqctl delete_user 用户名

	# 查看用户清单
	rabbitmqctl list_users

3.6, 延迟消息插件安装:

    # 把插件包先复制到	 /usr/lib/rabbitmq/lib/rabbitmq_server-3.9.0/plugins
    cp rabbitmq_delayed_message_exchange-3.9.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.9.0/plugins/
	rabbitmq-plugins enable rabbitmq_delayed_message_exchange
	#重启mq		
	systemctl restart rabbitmq-server
	rabbitmq-plugins list

3.7,登录测试

访问地址: ip:15672 账号密码: admin 123456
登录界面

找到交换机 exchange,看看类型是否有延迟消息类型的
在这里插入图片描述

然后就可以写代码去连接发消息了。

4, Java代码

4.1, pom 引入:

         <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

4.2, 配置类:

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

}

4.3, 消息定义配置类:


import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class OrderRabbitMQConfig {

    @Autowired
    private RabbitAdmin rabbitAdmin;
    //================================订单延时=================================
    @Bean
    CustomExchange order_pay_delay_exchange() {
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("order_pay_delay_exchange", "x-delayed-message", true, false, args);
    }
    @Bean
    public Queue order_pay_delay_queue() {
        Queue queue = new Queue("order_pay_delay_queue", true, false, false);
        rabbitAdmin.declareQueue(queue);
        return queue;
    }
    @Bean
    public Binding order_pay_delay_binding() {
        return BindingBuilder.bind(order_pay_delay_queue())
                .to(order_pay_delay_exchange()).with("order_pay_delay_routing").noargs();
    }

    //================================订单支付通知======================================
    @Bean
    public DirectExchange order_pay_notify_exchange() {
        return new DirectExchange("order_pay_notify_exchange", true, false);
    }
    @Bean
    public Queue order_pay_notify_direct_queue() {
        Map<String, Object> argsMap = new HashMap<>();
        argsMap.put("x-max-priority", 5);
        Queue queue = new Queue("order_pay_notify_queue", true, false, false, argsMap);
        rabbitAdmin.declareQueue(queue);
        return queue;
    }
    @Bean
    public Binding ctc_bidding_auction_pay_notify_binding() {
        return BindingBuilder.bind(order_pay_notify_direct_queue())
                .to(order_pay_notify_exchange()).with("order_pay_notify_routing");
    }
}

4.4, 消息发送类:


import cn.hutool.json.JSONUtil;
import com.xxx.rabbitmq.dto.PayOrderNotifyDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class RabbitMQSendUtils {

    private static RabbitTemplate rabbitTemplate;

    @Autowired
    public RabbitMQSendUtils(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    /**
     * 订单支付延时通知、发送MQ消息
     */
    public static void sendPayDelayMessage(PayOrderNotifyDto dto, final Integer delayTimes) {
        //给延迟队列发送消息
        String msg = JSONUtil.toJsonStr(dto);
        log.info("订单支付延时通知、发送MQ消息: {}, delayTimes={}", msg, delayTimes);
        rabbitTemplate.convertAndSend("order_pay_delay_exchange", "order_pay_delay_routing", msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //给消息设置延迟毫秒值
                message.getMessageProperties().setDelay(delayTimes);
                return message;
            }
        });
    }

    /**
     * 订单支付通知,发送MQ消息
     */
    public static void sendPayNotifyMsg(PayOrderNotifyDto dto) {
        log.info("订单支付通知,发送MQ消息: {}", dto);
        rabbitTemplate.convertAndSend("order_pay_notify_exchange", "order_pay_notify_routing", JSONUtil.toJsonStr(dto));
    }
}

4.5, 消息监听消费类:


import cn.hutool.json.JSONUtil;
import com.xxx.rabbitmq.dto.PayOrderNotifyDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * MQ消费监听
 */
@Slf4j
@Component
public class OrderMQListener {
    /**
     * 订单延时通知 消息
     */
    @RabbitListener(queues = {"order_pay_delay_queue"})
    public void payDelayNotify(Message message) {
        try {
            String msg = new String(message.getBody());
            log.info("【消费】订单延时通知 MQ 消息内容: {}, Message={}", msg, message);
            //支付订单改成超时未支付》取消
            PayOrderNotifyDto dto = JSONUtil.toBean(msg, PayOrderNotifyDto.class);

        } catch (Exception e) {
            log.error("订单延时通知 消息消费失败:", e);
        }
    }
    /**
     * 订单支付通知 消息
     */
    @RabbitListener(queues = {"order_pay_notify_queue"})
    public void payNotify(Message message) {
        try {
            String msg = new String(message.getBody());
            log.info("订单支付通知 MQ 消息内容:{}, {}", msg, message);
            PayOrderNotifyDto payOrderNotifyDto = JSONUtil.toBean(msg, PayOrderNotifyDto.class);
        } catch (Exception e) {
            log.error("订单支付通知 消息消费失败:", e);
        }
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/107756.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

exFAT文件系统的目录与文件存储

目录与文件存储的差异 在exFAT文件系统中&#xff0c;目录和文件的存储方式是不同的。 目录和文件都是以簇&#xff08;Cluster&#xff09;为单位进行存储&#xff0c;但它们的数据结构和用途不同。 目录的存储&#xff1a;目录&#xff08;子目录&#xff09;是用于组织和管…

Spring Boot进阶(93):体验式教程:手把手教你整合Spring Boot和Zipkin

&#x1f4e3;前言 分布式系统开发中&#xff0c;服务治理是一个比较重要的问题。为了更好地实现服务治理&#xff0c;需要解决服务跟踪问题&#xff0c;即如何对分布式系统中的服务进行监控和追踪。本文将介绍如何使用Zipkin进行服务跟踪&#xff0c;并结合Spring Boot进行整合…

解决cloudflare pages部署静态页面发生404错误的问题

cloudflare pages是一个非常方便的部署静态页面的sass工具。 但是很多人部署上去以后&#xff0c;访问服务会报404错误。什么原因&#xff1f; 原因如下图所示&#xff1a; 注意这个Build output directory, 这个是部署的关键&#xff01; 这个Build output directory目录的…

JVM性能优化 —— 类加载器,手动实现类的热加载

一、类加载的机制的层次结构 每个编写的”.java”拓展名类文件都存储着需要执行的程序逻辑&#xff0c;这些”.java”文件经过Java编译器编译成拓展名为”.class”的文件&#xff0c;”.class”文件中保存着Java代码经转换后的虚拟机指令&#xff0c;当需要使用某个类时&#…

虹科 | 解决方案 | 汽车示波器 索赔管理方案

索赔管理 Pico汽车示波器应用于主机厂/供应商与服务店/4S店的协作&#xff0c;实现产品索赔工作的高效管理&#xff1b;同时收集的故障波形数据&#xff0c;便于日后的产品优化和改进 故障记录 在索赔申请过程中&#xff0c;Pico汽车示波器的数据记录功能可以用于捕捉故障时的…

异步请求池——池式组件

前言 本文详细介绍异步请求池的实现过程&#xff0c;并使用DNS服务来测试异步请求池的性能。            两个必须牢记心中的概念&#xff1a; 同步&#xff1a;检测IO 与 读写IO 在同一个流程里异步&#xff1a;检测IO 与 读写IO 不在同一个流程 同步请求 与 异步请求…

聊聊“JVM 调优JVM 性能优化”是怎么个事?

所谓“调优”就是一个诊断和处理手段&#xff0c;最终的目标是让系统的处理能力&#xff0c;也就是“性能”达到最优化。 计算机系统中&#xff0c;性能相关的资源主要分为这几类&#xff1a; CPU&#xff1a;CPU 是系统最关键的计算资源&#xff0c;在单位时间内有限&#xf…

机器学习之查准率、查全率与F1

文章目录 查准率&#xff08;Precision&#xff09;&#xff1a;查全率&#xff08;Recall&#xff09;&#xff1a;F1分数&#xff08;F1 Score&#xff09;&#xff1a;实例P-R曲线F1度量python实现 查准率&#xff08;Precision&#xff09;&#xff1a; 定义&#xff1a; …

Unity中从3D模型资产中批量提取材质

如何使用 只需在“项目”窗口中创建一个名为“编辑器”的文件夹&#xff0c;然后在其中添加此脚本即可。然后&#xff0c;打开Window-Batch Extract Materials&#xff0c;配置参数并点击“ Extract&#xff01; ”。 在Unity 2019.1上&#xff0c;可以将默认材质重映射条件配…

RSA:基于小加密指数的攻击方式与思维技巧

目录 目录 目录 零、前言 一、小加密指数爆破 [FSCTF]RSA签到 思路&#xff1a; 二、基于小加密指数的有限域开根 [NCTF 2019]easyRSA 思路&#xff1a; 三、基于小加密指数的CRT [0CTF 2016] rsa 思路&#xff1a; 零、前言 最近&#xff0c;发现自己做题思路比较…

SpringCore完整学习教程5,入门级别

本章从第6章开始 6. JSON Spring Boot提供了三个JSON映射库的集成: Gson Jackson JSON-B Jackson是首选的和默认的库。 6.1. Jackson 为Jackson提供了自动配置&#xff0c;Jackson是spring-boot-starter-json的一部分。当Jackson在类路径上时&#xff0c;将自动配置Obj…

Java 将数据导出到Excel并发送到在线文档

一、需求 现将列表数据&#xff0c;导出到excel,并将文件发送到在线文档&#xff0c;摒弃了以往的直接在前端下载的老旧模式。 二、pom依赖 <!-- redission --><dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-…

UE5 C++自定义Http节点获得Header数据

一、新建C文件 选择All Classes&#xff0c;选择父类BlueprintFunctionLibrary&#xff0c;命名为SendHttpRequest。 添加Http支持 代理回调的参数使用DECLARE_DYNAMIC_DELEGATE_TwoParam定义&#xff0c;第一参数是代理类型&#xff0c;后面是参数1类型&#xff0c;参数1&…

APP自动化测试 ---- Appium介绍及运行原理

在面试APP自动化时&#xff0c;有的面试官可能会问Appium的运行原理&#xff0c;以下介绍Appium运行原理。 一、Appium介绍 1.Appium概念 Appium是一个开源测试自动化框架&#xff0c;可用于原生&#xff0c;混合和移动Web应用程序测试。它使用WebDriver协议驱动IOS&#xf…

33:深入浅出x86中断机制

背景 我们知道使用0x10号中断&#xff0c;可以在屏幕上打印一个字符。 问题 系统中的 中断 究竟是什么&#xff1f; 生活中的例子 来看一个生活中例子&#xff1a; 小狄的工作方式 在处理紧急事务的时候&#xff0c;不回应同事的技术求助。老板的召唤必须回应&#xff0c;…

C/C++面试常见问题——const关键字的作用和用法

首先我们需要一下const关键字的定义&#xff0c;const名叫常量限定符&#xff0c;当const修饰变量时&#xff0c;就是在告诉编译器该变量只可访问不可修改&#xff0c;而编译器对于被const修饰的变量有一个优化&#xff0c;编译器不会专门为其开辟空间&#xff0c;而是将变量名…

linux入门---多线程的控制

目录标题 线程库pthread_create如何一次性创建多个线程线程的终止线程的等待线程取消分离线程如何看待其他语言支持的多线程线程id的本质线程的局部存储线程的封装 线程库 要想控制线程就得使用原生线程库也可以将其称为pthread库&#xff0c;这个库是遵守posix标准的&#xf…

【AD9361 数字接口CMOS LVDSSPI】B 并行数据之CMOS 续

续【AD9361 数字接口CMOS &LVDS&SPI】B 并行数据之CMOS 数据总线空闲和周转周期 &#xff08;CMOS&#xff09; P0_D[11&#xff1a;0]和P1_D[11&#xff1a;0]总线信号通常由BBP或AD9361有源驱动。在任何空闲期间&#xff0c;两个组件都会忽略数据总线值。但是&…

【Linux】权限完结

个人主页点击直达&#xff1a;小白不是程序媛 系列专栏&#xff1a;Linux被操作记 目录 前言 chown指令 chgrp指令 文件类型 file指令 目录的权限 粘滞位 umask指令 权限总结 前言 上篇文章我们说到对于一个文件所属者和所属组都是同一个人时&#xff0c;使用所属者身…

计算机操作系统重点概念整理-第五章 文件管理【期末复习|考研复习】

第五章 文件管理 【期末复习|考研复习】 计算机操作系统系列文章传送门&#xff1a; 第一章 计算机系统概述 第二章 进程管理 第三章 进程同步 第四章 内存管理 第五章 文件管理 第六章 输出输出I/O管理 文章目录 第五章 文件管理 【期末复习|考研复习】前言五、文件管理5.1 文…