RabbitMQ:消息中间件

文章目录

  • 概念
  • 管理界面简介
  • 4中常见交换器类型
    • 1.Direct交换器:
    • 2.Fanout交换器
    • 3.Topic交换器
    • 4.headers交换器
  • 对象类型消息传递
  • 同步等待
  • 使用代码创建队列
  • 待续......

概念

在微服务架构中项目之间项目A调用项目B 项目B调用项目C项目C调用项目D。。
用户必须等待项目之间内容依次的运行结束后才会给用户返回结果
这是一个同步的调用,用户等待时间可能比较长,用户体验度比较差

消息中间件: 可以理解成一个队列,把用户需要处理的任务交给放到队列,任务在队列中进行排队等待执行,用户无需等待代码的执行的时间.
在这里插入图片描述

管理界面简介

1.Overview: 此面板为RabbitMQ基础信息展示面板,列举了服务器的信息,如:节点名称、内存占用、磁盘占用等。
2.Connections: 此面板中展示所有连接到RabbitMQ的客户端链接。只展示基于5672端口的链接。
3.Channels: 此面板中展示各链接中的具体信道。标记方式为链接(编号),如:192.168.91.1:12345(1)。
4.Exchanges: 此面板中展示RabbitMQ中已有的交换器,并注明交换器名称、类型等基本信息。其中只有direct交换器有默认交换器(AMQP default),当使用direct交换器时,如果没有明确指定名称,使用AMQP default交换器,也可明确指定名称。但是其他类型交换器没有默认的,都需要指定名称。
5.Queues: 此面板展示RabbitMQ中的队列信息。

4中常见交换器类型

1.Direct交换器:

direct会根据路由键把消息放入到指定的队列中。
在Queues界面创建队列q1,在Exchanges创建direct交换机fs.direct绑定交换机q1,指定路由key,fs.q1
一.生产者
1.导入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

2.配置yml

# 配置RabbitMQ相关信息
# 当创建RabbitMQ容器的时候,不提供用户名和密码配置,自动创建用户guest,密码guest。
# guest用户只能本地访问RabbitMQ。
spring:
  rabbitmq:
    host: localhost # RabbitMQ服务器的IP。默认localhost
    port: 32769 # RabbitMQ服务器的端口。
    username: guest # RabbitMQ的访问用户名。默认guest。
    password: guest # RabbitMQ的访问密码。默认guest
    virtual-host: / # 连接RabbitMQ中的哪一个虚拟主机。默认 /

3.在测试包下发送消息:
类型可以是: AmqpTemplate(顶级接口), RabbitOperations(专用子接口),RabbitTemplate(具体实现)
建议使用接口: 优先级是 RabbitOperations > AmqpTemplate

Spring AMQP可以发送的消息类型必须是Message类型。
Spring AMQP可以帮助程序员自动封装消息类型Message对象(默认封装),自动转换封装的消息体类型是Object,只要类型可序列化即可。

消息发送到队列后,无需等待,是异步操作,生产者接着往下执行.

@SpringBootTest
class RabbitmqApplicationTests {
    @Autowired
    private RabbitOperations rabbitOperations;
    @Test
    void contextLoads() {
		//仅发送,不需要等待返回值,异步,推荐
        rabbitOperations.convertAndSend("fs.dire"(交换机名称),"xy.q1"(路由key),"你好 rabbitmq"(消息内容));
        //发送,有返回值,使得rabbitmq变成同步的了,需要等待返回值,失去了rabbitmq的意义
        //rabbitOperations.convertSendAndReceive("","","");
    }
}

二.消费者
1.pom.xml

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.yml

spring:
  rabbitmq:
    host: localhost
    port: 32769
    username: guest
    password: guest

3.监听队列
修饰符: public
返回值: 异步消息必须是void
方法名: 自定义
参数表: 一个参数,类型可以是Message或者具体的消息体类型(Object)。message是Spring AMQP中消息的唯一类型。代表完整消息,有头和体组成。如果对消息头没有任何处理要求,则直接定义消息体具体类型即可。
方法实现: 根据具体要求,定义即可。

注意:方法可以抛出任意类型的异常。只要抛出异常,则代表消费错误,RabbitMQ不删除队列中的消息。
注解是RabbitListener。

如果有多个消费监听,默认采用轮询消费,消费监听不需要相互等待,并发执行.

@Component
public class StringMessageConsumer {
    @RabbitListener(queues = {"q1"})(队列名称可以指定多个)
    public void received1(String m){
        System.out.println(m);
    }

	@RabbitListener(queues = {"q1"})(队列名称可以指定多个)
    public void received2(String m){
        System.out.println(m);
    }
}

2.Fanout交换器

扇形交换器: 会把消息发送给所有的绑定在当前交换器上的队列,可以不写路由键
1.在Queues界面创建队列f1,f2,在Exchanges创建fanout交换机fs.fanout绑定交换机f1,f2,不指定路由
2.编写测试代码

    @Autowired
    private RabbitOperations rabbitOperations;
    @Test
    void contextLoad1s() {
    	//没有绑定路由键,设为null即可
        rabbitOperations.convertAndSend("fs.fanout",null,"你好 rabbitmq");
    }

3.f1,f2队列都多了一个消息

3.Topic交换器

主题交换器: 路由键可包括特殊字符实现通配。特殊字符包括: 星号和 ‘#’。
星号 : 代表一个单词。多个单词使用’.'分割。
‘#’ : 代表0~n个字符,即任意字符串。//如abc.# 代表以 abc. 开头的所有路由key
1.在Queues界面创建队列t1,t2,t3在Exchanges创建Topic交换机fs.topic绑定交换机t1,t2,t3
2.指定t1路由键: abc.t1,指定t2路由键: abc.*,指定t3路由键: abc.#
3.编写测试代码

    @Autowired
    private RabbitOperations rabbitOperations;

    @Test
    void contextLoawd1s() {
        rabbitOperations.convertAndSend("fs.topic","abc.t1","你好 rabbitmq");//t1,t2,t3
        rabbitOperations.convertAndSend("fs.topic","abc.123","你好 rabbitmq");//t2,t3
        rabbitOperations.convertAndSend("fs.topic","abc.123.234","你好 rabbitmq");//t3
    }

4.headers交换器

headers交换器和direct交换器的主要区别是在传递消息时可以传递header部分消息
生产消息
在Queues界面创建队列h1,在Exchanges创建headers交换机fs.headers绑定交换机h1,指定路由key,fs.h1

    @Autowired
    private RabbitOperations rabbitOperations;
    
    @Test
    void conwtextLoawd1s() {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("fs","java");
        Message message = new Message("我是消息".getBytes(), messageProperties);
        rabbitOperations.convertAndSend("fs.headers","fs.h1",message);
    }

消费消息

    @Test
    void conwtextLoawd1s() {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("fs","java");
        Message message = new Message("我是消息".getBytes(), messageProperties);
        rabbitOperations.convertAndSend("fs.headers","fs.h1",message);
    }

对象类型消息传递

1.生产者
创建User类
注意要继承序列化接口Serializable
同时指定序列化版本号serialVersionUID且和消费者一致(如果不指定,消费者的User类必须和生产者的User类完全一致,否则,不会认为是统一类型)
User类在生产者和消费者中的包路径必须一致

@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
    private final Long serialVersionUID = 1L;
    private String name;
    private Integer age;
}

发送消息
    	@Autowired
    	private RabbitOperations rabbitOperations;
    
        User user = new User("张三",18);
        rabbitOperations.convertAndSend("fs.direct","xy.q1",user);

2.消费者
在消费者中创建User类
注意要继承序列化接口Serializable
同时指定序列化版本号serialVersionUID且和消费者一致(如果不指定,消费者的User类必须和生产者的User类完全一致,否则,不会认为是统一类型)
User类在生产者和消费者中的包路径必须一致
直接用对应类型接收

@Component
public class StringMessageConsumer {
    @RabbitListener(queues = {"q1"})
    public void received(User(类型和发送来的一致即可) user){
        System.out.println(user);
    }
}

统一用Message对象接收,常用语Headers交换器

@Component
public class StringMessageConsumer {
    @RabbitListener(queues = {"q1"})
    public void received(Message message) throws IOException, ClassNotFoundException {
        byte[] body = message.getBody();
        
        ByteArrayInputStream bai = new ByteArrayInputStream(body);
        ObjectInputStream ois = new ObjectInputStream(bai);
        
        Object object = ois.readObject();
        if(object instanceof User){
            User user = (User) object;
            System.out.println(user);
        }
    }
}

同步等待

等待结果返回,才能往下执行
默认等待5s,可以配置,超时返回null
yml配置

spring:
  rabbitmq:
    host: localhost # RabbitMQ服务器的IP。默认localhost
    port: 32769 # RabbitMQ服务器的端口。
    username: guest # RabbitMQ的访问用户名。默认guest。
    password: guest # RabbitMQ的访问密码。默认guest
    virtual-host: / # 连接RabbitMQ中的哪一个虚拟主机。默认 /

生产者:

    @Test
    void contextLowads() {
    	//发送,有返回值,使得rabbitmq变成同步的了,需要等待返回值,等待结果返回,才能往下执行
        String aa = (String)rabbitOperations.convertSendAndReceive("fs.direct", "xy.q1", "aa");
        System.out.println(aa);
}

消费者:

@Component
public class StringMessageConsumer {
    @RabbitListener(queues = {"q1"})
    public String received(Message message) {
        return "hello";
    }
}

使用代码创建队列

一.在生成者端创建队列
是发送消息时创建,而不是启动项目时。

@Configuration
public class RabbitMQConfig {
    // 发送消息时如果不存在这个队列,会自动创建这个队列。
    // 注意:是发送消息时,而不是启动项目时。
    // 相当于:可视化操作时创建一个队列
    // 如果队列创建完成后,没有绑定(没有另外两个方法),默认绑定到AMQP default交换器
    @Bean
    public Queue queue(){
        return new Queue("queue.second");
    }

    // 如果没有这个交换器,在发送消息创建这个交换器
    // 配置类中方法名就是这个类型的实例名。相当于<bean id="" class="">的id属性,返回值相当于class
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("direct.first.ex");
    }

    // 配置类中方法参数,会由Spring 容器自动注入
    @Bean
    public Binding directBingding(DirectExchange directExchange,Queue queue){
        // with(“自定义路由键名称”)
        return BindingBuilder.bind(queue).to(directExchange).with("routing.key.2");
        // withQueueName() 表示队列名就是路由键名称
        // return BindingBuilder.bind(queue).to(directExchange).withQueueName();
    }
}

二.消费者

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(name = "queue.second",autoDelete = "false", durable = "true"),
                    exchange = @Exchange(name = "direct.second.ex",autoDelete = "false", type = ExchangeTypes.DIRECT),
                    key = {"routing.key.second.1"}
            )
    })
    public void onMessage(String messageBody){
        System.out.println("第二个消息消费者监听,处理消息:" + messageBody);
    }

待续…

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/13215.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Linux:centos:系统服务基础控制(systemctl)基础使用 图形化工具ntsysv使用

基础使用的办法为&#xff1a; systemctl控制类型服务名称 控制常用类型为一下几个 start 启动 stop 停止 enable 开机自启 disable 开机不自启 restart 重新启动 reload 重新加载 status 查看服务状态 systemc…

智加科技与舍弗勒签订商用车先进转向系统量产合作协议,将率先量产行业首个正向开发的智能重卡冗余转向

自动驾驶已经成为当前汽车行业的重要发展趋势之一。在此背景下&#xff0c;在2023上海国际汽车展期间&#xff0c;智加科技与舍弗勒集团签订量产合作协议&#xff0c;双方将在自动驾驶商用车先进转向系统领域展开合作&#xff0c;共同推动重卡自动驾驶的技术应用和创新发展。 图…

死锁---银行家算法例题

1、知识点 1.银行家算法使用的四个必要的数据结构是: 可用资源向量Available&#xff0c;最大需求矩阵Max&#xff0c;分配矩阵Allocation&#xff0c;需求矩阵Need。 2.银行家算法是不是破坏了产生死锁的必要条件来达到避免死锁的目的&#xff1f;若是&#xff0c;请简述破…

浅析商场智能导购系统功能与实施效益

商场智能导购系统是一种基于物联网技术和人工智能算法的解决方案&#xff0c;旨在提供商场内部的智能导购服务&#xff0c;为消费者提供个性化的购物导引和推荐&#xff0c;提升用户购物体验&#xff0c;增加商场的客流量和销售额。 商场智能导购系统的方案一般包括以下主要功能…

初识 MongoDB

文章目录 一、简介二、体系结构三、数据类型四、特点五、应用场景 提示&#xff1a;以下是本篇文章正文内容&#xff0c;MongoDB 系列学习将会持续更新 一、简介 MongoDB 是一个文档数据库&#xff0c;是由字段和值对&#xff08;field:value&#xff09;组成的数据结构&…

同态随机基加密的量子多方密码-数学公式

众所周知&#xff0c;信息和信息处理的完全量子理论提供了诸多好处&#xff0c;其中包括一种基于基础物理的安全密码学&#xff0c;以及一种实现量子计算机的合理希望&#xff0c;这种计算机可以加速某些数学问题的解决。这些好处来自于独特的量子特性&#xff0c;如叠加、纠缠…

第一节 法学

目录 法学的概念法学的性质 实践性构成了法学的学问性质 法学的研究对象 1.法律制度问题&#xff08;X法律制度&#xff09;2. 社会现实或社会生活关系问题 (Y社会现实/社会关系)3.法律制度与社会现实之间如何对应的问题 &#xff08;Yf(x) f为什么函数&#xff09; 法学的概…

耗时半月,终于把牛客网上的软件测试面试八股文整理成了PDF合集(测试基础+linux+MySQL+接口测试+自动化测试+测试框架+jmeter测试+测试开发)

大家好&#xff0c;最近有不少小伙伴在后台留言&#xff0c;近期的面试越来越难了&#xff0c;要背的八股文越来越多了&#xff0c;考察得越来越细&#xff0c;越来越底层&#xff0c;明摆着就是想让我们徒手造航母嘛&#xff01;实在是太为难我们这些程序员了。 这不&#xf…

shell中的for循环和if判断

一.编写脚本for1.sh,使用for循环创建20账户&#xff0c;账户名前缀由用户从键盘输入&#xff0c;账户初始密码由用户输入&#xff0c;例如: test1、test2、test3、.....、 test10 1.创建脚本for1.sh [rootserver ~]# vim for1.sh 2.编写脚本for1.sh 3.执行脚本for1.sh [roo…

fzyczn生日赛t1 CZN

fzy&czn生日赛t1 CZN 膜拜hybb首杀 文章目录 fzy&czn生日赛t1 CZN题目背景题目描述分析my codewnags code 题目 题目背景 有一天&#xff0c;czn在机房里面心心念念的pj终于来找他了&#xff0c;pj希望czn能够帮助她来解决一道数学题&#xff0c;czn“十分不乐意”地…

Spring入门案例--bean基础配置

bean基础配置(id与class) 对于bean的基础配置&#xff0c;在前面的案例中已经使用过: 1 <bean id"" class""/> 其中&#xff0c;bean标签的功能、使用方式以及id和class属性的作用&#xff0c;我们通过一张图来描述下 这其中需要大家重点掌握的…

Linux应用编程(进程)

一、进程与程序 注册进程终止处理函数 atexit() #include <stdlib.h> int atexit(void (*function)(void));使用该函数需要包含头文件<stdlib.h>。 函数参数和返回值含义如下&#xff1a; function&#xff1a;函数指针&#xff0c;指向注册的函数&#xff0c;此…

leetcode160. 相交链表

给你两个单链表的头节点 headA 和 headB &#xff0c;请你找出并返回两个单链表相交的起始节点。如果两个链表不存在相交节点&#xff0c;返回 null 。 图示两个链表在节点 c1 开始相交&#xff1a; 题目数据 保证 整个链式结构中不存在环。 注意&#xff0c;函数返回结果后&…

软件测试工程师需要达到什么水平才能顺利拿到 20k 无压力?

最近有粉丝朋友问&#xff1a;软件测试员需要达到什么水平才能顺利拿到 20k 无压力&#xff1f; 这里写一篇文章来详细说说&#xff1a; 目录 扎实的软件测试基础知识&#xff1a;具备自动化测试经验和技能&#xff1a;熟练掌握编程语言&#xff1a;具备性能测试、安全测试、全…

flv怎么无损转换成mp4格式,3大超级方法分享

flv格式是目前在视频分享媒体播放网站上广泛使用的一种视频文件格式&#xff0c;可以在网站窗口中直接播放&#xff0c;这类视频文件还能够有效保护版权。但是有些时候我们可能需要将flv格式的视频转换为其他格式&#xff0c;比如mp4。但是该怎么操作呢&#xff1f; 其实有很多…

【花雕学AI】深度挖掘ChatGPT角色扮演的一个案例—CHARACTER play : 莎士比亚

CHARACTER play : 莎士比亚 : 52岁&#xff0c;男性&#xff0c;剧作家&#xff0c;诗人&#xff0c;喜欢文学&#xff0c;戏剧&#xff0c;爱情 : 1、问他为什么写《罗密欧与朱丽叶》 AI: 你好&#xff0c;我是莎士比亚&#xff0c;一位英国的剧作家和诗人。我很高兴你对我的…

【状态估计】用于描述符 LTI 和 LPV 系统的分析、状态估计和故障检测的算法(Matlab代码实现)

&#x1f4a5; &#x1f4a5; &#x1f49e; &#x1f49e; 欢迎来到本博客 ❤️ ❤️ &#x1f4a5; &#x1f4a5; &#x1f3c6; 博主优势&#xff1a; &#x1f31e; &#x1f31e; &#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 …

一文看懂数据云平台的“可观测性”技术实践

背景 这是一家大型制造集团。为监控及预测工厂设备运行情况&#xff0c;IT部门在数据云平台DataSimba上按天执行数据作业&#xff0c;每24小时对工厂设备的日志数据进行分析&#xff0c;发现能对业务起到很好的辅助作用&#xff0c;效果不错。 “要不升级为每1个小时跑一次&am…

腾讯云轻量级云服务器Centos7防火墙开放8080端口

腾讯云轻量级云服务器Centos7防火墙开放8080端口 一、centos7防火墙打开端口 因为Centos7以上用firewalld代替了iptables,也就是说firewalld开通了8080端口应该就行了 1.查看8080是否已经放开 sudo firewall-cmd --permanent --zonepublic --list-ports2.查看防火墙状态 s…

EMQX vs NanoMQ | 2023 MQTT Broker 对比

引言 EMQX 和 NanoMQ 都是由全球领先的开源物联网数据基础设施软件供应商 EMQ 开发的开源 MQTT Broker。 EMQX 是一个高度可扩展的大规模分布式 MQTT Broker&#xff0c;能够将百万级的物联网设备连接到云端。NanoMQ 则是专为物联网边缘场景设计的轻量级 Broker。 本文中我们…