RabbitMQ消息队列快速入门

RabbitMQ消息队列快速入门

初始MQ

MQ全称为Message Queue,即消息队列,是在消息的传输过程中保存消息的容器。它是典型的生产者-消费者模型
生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。消息的生产和消费都是异步的,可以解耦发送者和接收者之间的通信,提高系统的可扩展性和可靠性

技术选型

目比较常见的MQ实现有:

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka
RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

RabbitMQ

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:
Messaging that just works — RabbitMQ

安装

基于Docker来安装RabbitMQ

docker pull rabbitmq

运行

docker run \
 -e RABBITMQ_DEFAULT_USER=daybreak \
 -e RABBITMQ_DEFAULT_PASS=123456 \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq

在安装命令中有两个映射的端口:

  • 15672:RabbitMQ提供的管理控制台的端口
  • 5672:RabbitMQ的消息发送处理接口

运行成功后,访问http://ip:15672,输入username和password即可进入管理控制台。

RabbitMQ架构

  • publisher:生产者,也就是发送消息的一方
  • consumer:消费者,也就是消费消息的一方
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

Spring AMQP

由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。
但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:Spring AMQP

SpringAmqp的官方地址:Spring AMQP

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

交换机

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。
  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列。
  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符。
  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

声明队列和交换机

基于Bean方式声明
package com.itheima.consumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfiguration {

    /**
     * 声明交换机
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("daybreak.fanout");
    }

    /**
     * 声明队列
     * @return
     */
    @Bean
    public Queue fanoutQueue(){
        return new Queue("fanout.queue");
    }

    /**
     * 绑定队列和交换机
     * @param fanoutQueue3
     * @param fanoutExchange
     * @return
     */
    @Bean
    public Binding FanoutBinding3(Queue fanoutQueue, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
    }
}
基于注解声明

声明Direct模式的交换机和队列:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue", durable = "true"),
        exchange = @Exchange(name = "daybreak.direct", type = ExchangeTypes.DIRECT),
        key = {"red", "yellow"}
))
public void listenDirectQueue(String msg){
    System.out.println("消费者收到了direct.queue的消息:" + msg);
}

声明Topic模式的交换机和队列:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue"),
    exchange = @Exchange(name = "daybreak.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue(String msg){
    System.out.println("消费者接收到topic.queue的消息:【" + msg + "】");
}

快速入门

导入依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

添加配置

在application.yml中添加配置:

spring:
  rabbitmq:
    host: 192.168.200.130 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: daybreak # 用户名
    password: 123456 # 密码

配置JSON转换器

Spring的消息发送代码接收的消息体是一个Object:

在数据传输时,它会把发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
然而默认情况下Spring采用的序列化方式是JDK序列化。

JDK序列化存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差

显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

使用JSON方式序列化需要引入以下依赖:

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

配置消息转换器,在服务的启动类中添加一个Bean即可:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

接收端

package com.itheima.consumer.listeners;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyListener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "daybreak.queue", durable = "true"),
            exchange = @Exchange(name = "daybreak.direct", type = ExchangeTypes.DIRECT),
            key = "demo"
    ))
    public void listenDirectQueue(String msg){
        System.out.println("消费者收到了direct.queue的消息:" + msg);
    }
}

发送端

package com.itheima.publisher;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class MyPublisher {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void myTest(){
        String exchangeName = "daybreak.direct";
        String routingKey = "demo";
        String msg = "Hello,RabbitMQ";
        rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
    }
}

启动发送端和接收端后,运行结果如下:

消费者收到了direct.queue的消息:Hello,RabbitMQ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/173569.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

SQL DELETE 语句:删除表中记录的语法和示例,以及 SQL SELECT TOP、LIMIT、FETCH FIRST 或 ROWNUM 子句的使用

SQL DELETE 语句 SQL DELETE 语句用于删除表中的现有记录。 DELETE 语法 DELETE FROM 表名 WHERE 条件;注意&#xff1a;在删除表中的记录时要小心&#xff01;请注意DELETE语句中的WHERE子句。WHERE子句指定应删除哪些记录。如果省略WHERE子句&#xff0c;将会删除表中的所…

战备器材管理系统-部队物资仓库管理系统

一、项目背景 传统的战备物资管理&#xff0c;一般依赖于一个非自动化的、以纸张文件为基础的系统来记录、追踪进出的货物&#xff0c;完全由人工实施仓库内部的管理&#xff0c;因此仓库管理的效率极其低下。对此&#xff0c;我们运用无线射频技术(RFID)的仓库智能管理系统&am…

Fiddler抓包看这篇就够了:fiddler设置弱网测试

弱网测试 概念&#xff1a;弱网看字面意思就是网络比较弱&#xff0c;我们通称为信号差&#xff0c;网速慢。 意义&#xff1a;模拟在地铁、隧道、电梯和车库等场景下使用APP &#xff0c;网络会出现延时、中断和超时等情况。 自动化测试相关教程推荐&#xff1a; 2023最新自…

基于单片机加热炉多参数检测和PID炉温系统

**单片机设计介绍&#xff0c; 基于单片机加热炉多参数检测和PID炉温系统 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于单片机的公交安全预警系统可以被设计成能够实时监测公交车辆的行驶状态&#xff0c;并在发生异常情况…

金蝶云星空套打设计

文章目录 金蝶云星空套打设计下载登录打开需要创建套打的单据新建套打模板数据中心-发货通知单-设置预览 金蝶云星空套打设计 下载 登录 打开需要创建套打的单据 KD开头&#xff0c;是标准产品预设。 新建套打模板 默认A4纸 默认插入三行三列。 拖入文本&#xff0c;填写内容…

计算机基础知识54

ORM的介绍 # ORM是什么&#xff1f; 我们在使用Django框架开发web应用的过程中&#xff0c;不可避免地会涉及到数据的管理操作&#xff08;增、删、改、查&#xff09;&#xff0c;而一旦谈到数据的管理操作&#xff0c;就需要用到数据库管理软件&#xff0c;例如mysql、oracle…

map的基础定义及运用

Map 1 使用 1 声明 /*声明map*/map<int, string> myMap {{1, "Apple"}, {2, "Banana"}, {3, "Orange"}};2 插入元素 myMap.insert(make_pair(4, "Graphes"));3 通过访问键查找和访问元素 cout << myMap[2] <<…

Ubuntu Server download

前言 Ubuntu——公共云、数据中心和边缘上最受欢迎的 Linux 发行版。自成立以来&#xff0c;Ubuntu 一直在获得市场份额&#xff0c;截至今天已接近 50%。 Ubuntu Server download VersionUbuntu Server 其它主机型号版本Ubuntu AMD历史版下载百度云Ubuntu Server all Ubuntu…

小白也能看懂的国内外 AI 芯片概述

随着越来越多的企业将人工智能应用于其产品&#xff0c;AI芯片需求快速增长&#xff0c;市场规模增长显著。因此&#xff0c;本文主要针对目前市场上的AI芯片厂商及其产品进行简要概述。 简介 AI芯片也被称为AI加速器或计算卡&#xff0c;从广义上讲只要能够运行人工智能算法…

本机idea连接虚拟机中的Hbase

相关环境&#xff1a; 虚拟机&#xff1a;Centos7 hadoop版本:3.1.3 hbase版本:2.4.11 zookeeper版本:3.5.7 Java IDE:IDEA JDK&#xff1a;8 步骤 步骤一&#xff1a;在idea创建一个maven项目 步骤二&#xff1a;在虚拟机里找到core-site.x…

【C++】map multimap

文章目录 1.map介绍2.map的使用3.multimap介绍4.multimap的使用 1.map介绍 map的文档 翻译&#xff1a; map是关联容器&#xff0c;它按照特定的次序(按照key来比较)存储由键值key和值value组合而成的元素。 在map中&#xff0c;键值key通常用于排序和惟一地标识元素&#x…

四川芸鹰蓬飞:抖店运营的时候注意什么?

抖店作为一个短视频平台&#xff0c;吸引了越来越多的商家加入。在抖店上进行有效的运营是提高销量和曝光度的关键。那么&#xff0c;抖店怎么设置运营呢&#xff1f;有哪些方法可以帮助商家在这个竞争激烈的平台上脱颖而出呢&#xff1f; 一、抖店怎么设置运营&#xff1f; 首…

VC++添加包含目录

安装了QT;它带有很多例子; 新建一个工程,添加一个C++源文件,随便拷个例子的源码过来; 然后VS会提示头2个include语句出错; 因为找不到这两个头文件;这需要添加包含目录; 进入项目属性页;VC++目录 - 包含目录,它之前有2个系统的值;点击右侧的下拉箭头按钮,点…

天空分割技术解决方案

图像处理技术已经成为企业提升用户体验、优化产品和服务的重要工具。美摄科技&#xff0c;作为全球领先的AI图像处理技术提供商&#xff0c;一直致力于研发和应用最先进的技术&#xff0c;以满足企业的各种需求。今天&#xff0c;我们很高兴地向大家介绍我们的新一代产品——美…

【PTA题目】L1-4 稳赢 分数 15

L1-4 稳赢 分数 15 全屏浏览题目 切换布局 作者 陈越 单位 浙江大学 大家应该都会玩“锤子剪刀布”的游戏&#xff1a;两人同时给出手势&#xff0c;胜负规则如图所示&#xff1a; 现要求你编写一个稳赢不输的程序&#xff0c;根据对方的出招&#xff0c;给出对应的赢招。但…

AR远程辅助技术应用到气象部门有何好处?

随着科技的不断发展&#xff0c;人类对于自然环境的理解和掌控能力也在不断提升。其中&#xff0c;AR(增强现实)技术的应用&#xff0c;为气象监控带来了革命性的变化。AR气象远程监控&#xff0c;就是将AR技术与气象监控相结合&#xff0c;通过虚拟与现实的融合&#xff0c;实…

数字化转型导师坚鹏:数字化时代银行网点厅堂营销5大痛点分析

数字化时代银行网点厅堂营销存在以下5大痛点&#xff1a; 1、业务办理时间较长。目前很多银行业务办理时间仍然较长&#xff0c;可能的原因包括银行业务办理流程比较复杂、柜员操作技能不够熟练、银行系统的稳定性欠佳、网点某段时间客户比较多等。 2、现场提交材料太多。银行…

事件溯源(Event Sourcing)和命令查询责任分离(CQRS)经验

这篇文章是实现一个基于 CQRS 和事件溯源原则的应用程序&#xff0c;描述这个过程的方式&#xff0c;我相信分享我面临的挑战和问题可能对一些人有用。特别是如果你正在开始自己的旅程。 业务背景 项目的背景与空中交通管理&#xff08;ATM&#xff09;领域相关。我们为一个 …

Spring框架学习 -- 核心思想

目录 (1) Spring是什么? (2) 什么是IOC容器? (3) 从传统开发认识spring (4) 这种传统开发的缺陷 (5)解决传统开发中的缺陷 (6) 对比总结规律 (7) 理解IOC 创作不易多多支持 (1) Spring是什么? 我们常说的Spring的全称是: Spring Framework(Spring框架), 它是一个开源…

ueditor整合到thinkPHP里

<?phpnamespace app\ueditor\controller;use think\Controller;class Ueditor extends Controller {//首页public function upload(){//header(Access-Control-Allow-Origin: http://www.baidu.com); //设置http://www.baidu.com允许跨域访问//header(Access-Control-Allow…