RabbitMQ的安装使用

RabbitMQ是什么?

MQ全称为Message Queue,消息队列,在程序之间发送消息来通信,而不是通过彼此调用通信。
RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。

为什么使用RabbitMQ?

优点:
1、实现应用系统的解耦,客户端只关心发送消息,而不关心处理。
2、异步提升效率,在主业务逻辑发送消息,异步去处理消息
3、流量削峰,将请求放到mq消息队列中,mysql每秒去拉取请求消费,避免请求全部一下子全部打到mysql,请求过多而崩溃

怎么使用RabbitMQ?

1.安装windows的客户端,参考链接3

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

2.java 代码引入相关jar包
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.amqp</groupId>
			<artifactId>spring-amqp</artifactId>
			<version>2.2.3.RELEASE</version>
		</dependency>
3.编写发送,接收消息的工具类
延迟队列配置
package com.next.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @desc 延迟队列配置
 */
@Configuration
public class RabbitDelayMqConfig {

    @Bean("delayDirectExchange")
    public DirectExchange delayDirectExchange() {
        DirectExchange directExchange = new DirectExchange(QueueConstants.DELAY_EXCHANGE, true, false);
        //交换机开启延迟设置true,延迟才会生效
        directExchange.setDelayed(true);
        return directExchange;
    }

    @Bean("delayNotifyQueue")
    public Queue delayNotifyQueue() {
        return new Queue(QueueConstants.DELAY_QUEUE);
    }

    @Bean("delayBindingNotify")
    public Binding delayBindingNotify(@Qualifier("delayDirectExchange") DirectExchange delayDirectExchange,
                                 @Qualifier("delayNotifyQueue") Queue delayNotifyQueue) {
        return BindingBuilder.bind(delayNotifyQueue).to(delayDirectExchange).with(QueueConstants.DELAY_ROUTING);
    }
}

队列配置
package com.next.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

/**
 * @desc 队列配置
 */
@Configuration
public class RabbitMqConfig {

    @Bean("directExchange")
    @Primary
    public DirectExchange directExchange() {
        return new DirectExchange(QueueConstants.COMMON_EXCHANGE, true, false);
    }

    @Bean("notifyQueue")
    @Primary
    public Queue notifyQueue() {
        return new Queue(QueueConstants.COMMON_QUEUE);
    }

    @Bean("bindingNotify")
    @Primary
    public Binding bindingNotify(@Qualifier("directExchange") DirectExchange directExchange,
                                 @Qualifier("notifyQueue") Queue notifyQueue) {
        return BindingBuilder.bind(notifyQueue).to(directExchange).with(QueueConstants.COMMON_ROUTING);
    }
}

发送消息工具类
package com.next.mq;

import com.next.util.JsonMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * @desc 客户端工具类 -- 发送消息
 */
@Component
@Slf4j
public class RabbitMqClient {

    @Resource
    private RabbitTemplate rabbitTemplate;

    //发送同步消息
    public void send(MessageBody messageBody) {
        try {
            //生成唯一的消息id
            String uuid = UUID.randomUUID().toString();
            //初始话消息
            CorrelationData correlationData = new CorrelationData(uuid);
            //使用模板工具类rabbitTemplate 来发消息
            rabbitTemplate.convertAndSend(QueueConstants.COMMON_EXCHANGE, QueueConstants.COMMON_ROUTING,
                    JsonMapper.obj2String(messageBody), new MessagePostProcessor() {
                        @Override
                        public Message postProcessMessage(Message message) throws AmqpException {
                            // 消息持久化
                            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                            //记录日志
                            log.info("message send, {}", message);
                            return message;
                        }
                    }, correlationData);
        } catch (Exception e) {
            //日志打印,以便定位问题
            log.error("message send exception, msg:{}", messageBody.toString(), e);
        }
    }


    /**
     * @desc 发送延迟消息
     */
    public void sendDelay(MessageBody messageBody, int delayMillSeconds) {
        try {
            //设置消息延迟时间
            messageBody.setDelay(delayMillSeconds);
            String uuid = UUID.randomUUID().toString();
            CorrelationData correlationData = new CorrelationData(uuid);
            //延迟交换机和路由
            rabbitTemplate.convertAndSend(QueueConstants.DELAY_EXCHANGE, QueueConstants.DELAY_ROUTING,
                    JsonMapper.obj2String(messageBody), new MessagePostProcessor() {
                        @Override
                        public Message postProcessMessage(Message message) throws AmqpException {
                            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 消息持久化
                            //设置消息延迟的时间(毫秒值)
                            message.getMessageProperties().setDelay(delayMillSeconds);
                            log.info("delay message send, {}", message);
                            return message;
                        }
                    }, correlationData);
        } catch (Exception e) {
            log.error("delay message send exception, msg:{}", messageBody.toString(), e);
        }
    }
}

接收消息工具类
package com.next.mq;

import com.next.dto.RollbackSeatDto;
import com.next.model.TrainOrder;
import com.next.service.TrainOrderService;
import com.next.service.TrainSeatService;
import com.next.util.JsonMapper;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jackson.type.TypeReference;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * @desc rabbitmq的server端 - 延迟接收消息
 * 用处:在主流程里面发送消息,异步流程里面接收消息,处理。提升代码性能
 */

@Component
@Slf4j
public class RabbitDelayMqServer {

    @Resource
    private TrainSeatService trainSeatService;
    @Resource
    private TrainOrderService trainOrderService;

    @RabbitListener(queues = QueueConstants.DELAY_QUEUE)
    public void receive(String message) {
        log.info("delay queue receive message, {}", message);

        try {
            MessageBody messageBody = JsonMapper.string2Obj(message, new TypeReference<MessageBody>() {
            });

            if (messageBody == null) {
                return;
            }

            switch (messageBody.getTopic()) {
                case QueueTopic.SEAT_PLACE_ROLLBACK:
                    RollbackSeatDto dto = JsonMapper.string2Obj(messageBody.getDetail(), new TypeReference<RollbackSeatDto>() {
                    });
                    trainSeatService.batchRollbackSeat(dto.getTrainSeat(), dto.getFromStationIdList(), messageBody.getDelay());
                    break;
                case QueueTopic.ORDER_PAY_DELAY_CHECK:
                    TrainOrder trainOrder = JsonMapper.string2Obj(messageBody.getDetail(), new TypeReference<TrainOrder>() {
                    });
                    trainOrderService.delayCheckOrder(trainOrder);
                    break;
                default:
                    log.warn("delay queue receive message, {}, no need handle", message);
            }
        } catch (Exception e) {
            log.error("delay queue message handle exception, msg:{}", message, e);
        }
    }
}

参考链接:
1.rabbitMQ到底是个啥东西?
2.超详细!!!Windows下安装RabbitMQ的步骤详解
3.windows安装rabbitmq和环境erlang(最详细版,包括对应关系,安装错误解决方法)
4.RabbitMQ安装或启动后,无法访问http://localhost:15672/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/339925.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

公网环境调试本地配置的Java支付宝沙箱环境模拟支付场景

文章目录 前言1. 下载当面付demo2. 修改配置文件3. 打包成web服务4. 局域网测试5. 内网穿透6. 测试公网访问7. 配置二级子域名8. 测试使用固定二级子域名访问 前言 在沙箱环境调试支付SDK的时候&#xff0c;往往沙箱环境部署在本地&#xff0c;局限性大&#xff0c;在沙箱环境…

【Proteus仿真】【51单片机】遥控小风扇设计

文章目录 一、功能简介二、软件设计三、实验现象联系作者 一、功能简介 本项目使用Proteus8仿真51单片机控制器&#xff0c;使用数码管显示模块、L298N电机驱动模块、按键、直流电机风扇、红外遥控等。 主要功能&#xff1a; 系统运行后&#xff0c;数码管显示风扇速度档位&a…

【操作系统】实验二 Proc文件系统

&#x1f57a;作者&#xff1a; 主页 我的专栏C语言从0到1探秘C数据结构从0到1探秘Linux &#x1f618;欢迎关注&#xff1a;&#x1f44d;点赞&#x1f64c;收藏✍️留言 &#x1f3c7;码字不易&#xff0c;你的&#x1f44d;点赞&#x1f64c;收藏❤️关注对我真的很重要&…

FluoroQuest抗淬灭试剂盒I 适合载玻片成像,能够提高荧光信号的强度和稳定性

您好&#xff0c;欢迎来到新研之家 文章关键词&#xff1a;FluoroQuestAnti-fading Kit I Optimized for Slide Imaging&#xff0c;FluoroQuest抗淬灭试剂盒I 适合载玻片成像 一、基本信息 产品简介&#xff1a;FluoroQuest抗淬灭试剂盒I 适合载玻片成像能够抑制淬灭效应&a…

unity项目《样板间展示》开发:素材导入与整理

第一章&#xff1a;素材导入与整理 前言一、创建项目文件二、导入素材模型三、素材模型整理四、光源模型管理结语 前言 这次带大家从0到1做一个unity项目&#xff1a;《样板间展示》。 顾名思义&#xff0c;项目内容是展示样板间&#xff0c;即玩家可以与房间中的物体、家具进行…

二维码地址门牌管理系统:智慧城市新篇章

文章目录 前言一、轮播广告位&#xff1a;全面信息传达二、智能化管理&#xff1a;应对挑战三、安全保障&#xff1a;市民隐私优先四、广泛应用&#xff1a;助力城市建设 前言 随着科技的飞速发展&#xff0c;城市的智能化已成不可逆转的趋势。二维码地址门牌管理系统作为新一…

ITSS认证有用吗❓属于gj级证书吗❓

&#x1f525;ITSS由中国电子技术标准化研究院推出&#xff0c;包括“IT 服务工程师”和“IT 服务经理”两种认证。该系列认证符合GB/T 28827.1 的评估和ITSS服务资质升级要求。 &#x1f3af;ITSS是受到gj认可的&#xff0c;在全国范围内对IT服务管理人员从业资格为一的权威的…

linux杀毒软件clamav安装使用

1、下载 在下面地址下载&#xff1a;https://www.clamav.net/downloads 2、安装 clamav-1.2.1.linux.x86_64.rpm放在/home路径。 执行&#xff1a; chmod -R 777 /home/clamav-1.2.1.linux.x86_64.rpm rpm -ivh clamav-1.2.1.linux.x86_64.rpm3、下载病毒库 下载路径&am…

解决element-ui中的el-select选择器无法显示选中内容的问题

问题描述&#xff1a; 排查方法&#xff1a; 检查数据控制台是否报错&#xff0c;无报错 检查change是否触发&#xff0c;会触发 最后开始百度&#xff0c;查看文档 官方文档有这么一段话&#xff0c;就是属性一定要挂载到data上&#xff0c;不然无法检测。 最后解决&#…

K8S的helm

helm的作用 在没有helm之前&#xff0c;deploymen service ingress &#xff0c;helm的作用就是通过打包的方式&#xff0c;把deployment&#xff0c;service&#xff0c;ingress 这些打包在一块&#xff0c;一键式的部署服务&#xff0c;类似yum 官方提供的一个类似于安装仓库…

python|写一个简单的web应用框架

写应用框架需要写底层服务器么? 这个要区分2种情况&#xff0c;如果应用框架&#xff0c;你没有参考WSGI标准&#xff0c;那么在写应用框架之前&#xff0c;你就必须要定义一套属于自己的服务器&#xff0c;当然本文不采取这种方式&#xff0c;专业的事情应该专业的人来做。我…

JavaEE 文件操作IO

文件操作&IO 文章目录 文件操作&IO1. 认识文件2. 文件操作2.1 File 类2.2 文件读写2.2.1 FileInputStream2.2.2 FileOutputStream2.2.3 FileReader2.2.4 FileWriter2.2.5 Scanner读取文件 3. 案例练习3.1 案例一3.2 案例二3.3 案例三 在进行文件操作之前&#xff0c;我…

Oladance、韶音、南卡开放式耳机究竟哪款更胜一筹?揭秘超强机型对比!

​探寻音乐的美妙&#xff0c;我来亲测市面上热门三大品牌Oladance、韶音、南卡的开放式耳机&#xff01;深度评测音质&#xff0c;真实还原音乐细腻之处。从我自己测评过的开放式耳机中挑选&#xff0c;告别劣质产品带来的音乐质量风险。严选精品&#xff0c;守护你的听觉健康…

CERT_HAS_EXPIRED

npm 安装报错&#xff0c;提示证书过期&#xff1a; npm ERR! code CERT_HAS_EXPIRED npm ERR! errno CERT_HAS_EXPIRED npm ERR! request to https://r.cnpmjs.org/md5 failed, reason: certificate has expired npm ERR! A complete log of this run can be found in: npm…

【openGauss/MogDB使用mog_xlogdump解析 xlog文件内容】

openGauss/MogDB的mog_xlogdump工具类似于PostgreSQL的pg_xlogdump/pg_waldump&#xff0c;可以解析xlog日志&#xff0c;获取xlog里的相关记录。可以通过MogDB的官网下载对应的版本使用&#xff0c; https://www.mogdb.io/downloads/mogdb 一、 创建表并增加主键&#xff08;…

【HarmonyOS】体验鸿蒙电商平台的未来之旅!

从今天开始&#xff0c;博主将开设一门新的专栏用来讲解市面上比较热门的技术 “鸿蒙开发”&#xff0c;对于刚接触这项技术的小伙伴在学习鸿蒙开发之前&#xff0c;有必要先了解一下鸿蒙&#xff0c;从你的角度来讲&#xff0c;你认为什么是鸿蒙呢&#xff1f;它出现的意义又是…

OceanBase在作业帮业务的应用实践

作业帮成立于 2015 年&#xff0c;致力于用科技手段助力教育普惠&#xff0c;运用人工智能、大数据等技术&#xff0c;为学生、老师、家长提供学习、教育解决方案&#xff0c;智能硬件产品等。 在业务初期&#xff0c;作业帮使用阿里云 ECS 自建 MySQL&#xff0c;同时最大程度…

揭开Spring MVC的真面目

官方对于Spring MVC的描述为&#xff1a; Spring Web MVC是基于Servlet API框架构建的原始Web框架&#xff0c;从一开始就包含在Spring框架中。它的正式名称“Spring Web MVC”来自其源模块的名称&#xff08;Spring-webmvc&#xff09;&#xff0c;但它通常被称为“Spring-MVC…

网络通信(Socket/TCP/UDP)

一、Socket 1.概念: Socket(又叫套接字)是通信的基石,是支持TCP/IP协议的网络通信的基本操作单元。它是网络通信过程中端点的抽象表示,包含进行网络通信必须的五种信息:连接协议,客户端的IP地址,客户端的端口,服务器的IP地址,服务器的端口。 一个Socket是一对IP地址…

OpenGPTs:一款外挂般的GPTs管理器,由ChatPaper团队开源!

OpenGPTs-非常好用的开源GPTs管理器. 一句话介绍 非常好用的GPTs管理器&#xff0c;ChatPaper团队开源一款功能强大的浏览器插件&#xff0c;适合所有拥有Plus权限的朋友。 为什么要做OpenGPTs&#xff1f; &#x1f914;&#x1f4a1; 众所周知&#xff0c;OpenAI官网的GPT…