SpringAMQP使用

说明:SpringAMQP(官网:https://spring.io/projects/spring-amqp)是基于RabbitMQ封装的一套模板,并利用了SpringBoot对其实现了自动装配,使用起来非常方便。安装和原始使用参考:http://t.csdn.cn/51qyD

基础操作

创建两个模块,一个用于发送消息(sender),一个用于接收消息(receiver),两个模块拥有共同的父模块

第一步:添加依赖

在父模块的pom.xml文件中,添加依赖,如下:

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.9.RELEASE</version>
        <relativePath/>
    </parent>

    <dependencies>
        <!--lombok依赖,用于生成set、get、toString方法-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <!--单元测试-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>

第二步:创建配置文件

配置文件(application.yml)内容如下,两个模块的内容一样

spring:
  rabbitmq:
    # MQ ip地址
    host: XXX.XXX.XXX.XXX
    # MQ的端口号
    port: 5672
    # 虚拟主机 每个用户单独对应一个 不同用户之间无法访问彼此的虚拟主机
    virtual-host: /
    # 用户名
    username: root
    # 密码
    password: 123456

第三步:创建Listener类

在接收方,创建监听类,用来接收消息,如下:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitListenerDemo {

    @RabbitListener(queues = "demo.queue")
    public void listenDemoQueueMessage(String msg){
        System.out.println("msg = " + msg);
    }
}

第四步:编写发送端代码

在发送方的测试类中,写测试代码,发消息给接收方,其中RunWith()注解用于构建程序运行的上下文环境;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SenderTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSender() {
        rabbitTemplate.convertAndSend("demo.queue","hello rabbit mq!");
    }
}

第五步:启动

先启动接收方(这是因为,如果队列在RabbitMQ管理平台上不存在的话,先启动发送方会造成消息丢失,而先启动接收方,RabbitMQ会根据队列名先创建出队列),再启动发送方;

可以看到,测试完成,接收方可以接收到消息

在这里插入图片描述

工作队列

实际的业务情况是一个发送方,可能会有多个接收方来接收,而且接收方处理效率可能各不相同。这样,接收方的代码可以写成这样,使用线程休眠模拟接收方执行的效率,再设置变量用于统计各个接收方执行的次数:

(RabbitListenerDemo.java)

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitListenerDemo {
    
    private static int count1 = 0;
    private static int count2 = 0;
    private static int count3 = 0;

    @RabbitListener(queues = "demo.queue")
    public void listenDemoQueueMessage1(String msg) throws InterruptedException {
        System.out.println("msg1 = " + msg + "======= count1 =" + (++count1));
        Thread.sleep(10);
    }

    @RabbitListener(queues = "demo.queue")
    public void listenDemoQueueMessage2(String msg) throws InterruptedException {
        System.out.println("msg2 = " + msg + "======= count2 =" + (++count2));
        Thread.sleep(20);
    }

    @RabbitListener(queues = "demo.queue")
    public void listenDemoQueueMessage3(String msg) throws InterruptedException {
        System.out.println("msg3 = " + msg + "======= count3 =" + (++count3));
        Thread.sleep(50);
    }
}

(SenderTest:循环发送200次,休眠10毫秒)

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SenderTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSender() throws InterruptedException {
        for (int i = 0; i < 200; i++) {
            rabbitTemplate.convertAndSend("demo.queue","hello rabbit mq!======>" + i);
            Thread.sleep(10);
        }
    }
}

启动,可以看到执行效率最低的3号,也和1号、2号接收到了等量的消息量,

在这里插入图片描述

这是因为RabbitMQ有默认的分配策略,使每个接收方都可以接收到等量的消息量,而不是处理越快的处理越多。可以在接收方的配置文件中,添加这个配置,表示每个接收方只能一个消息一个消息处理(可以推测默认是先按照接收方数量,把请求都平均分配好之后,再让它们各自处理的);

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1

重启测试,可以看到,达到了“能者多劳”的效果

在这里插入图片描述

发布/订阅

发布/订阅,是指在消息发给队列前,对消息所绑定的队列信息做判断,然后按照绑定的队列对消息进行分发;

在这里插入图片描述

根据分发的情况,可分为以下三种:

  • 广播(Fanout):消息分发给所有队列;

  • 路由(Direct):消息只分发给拥有关键字(RoutingKey)的队列;

  • 主题(Topic):消息只分发给符合条件的队列;

Fanout(广播)

创建一个广播配置类,用于绑定队列与广播交换机(FanoutExchange);

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * 广播配置类
 */
@Configuration
public class FanoutConfig {

    /**
     * 声明交换机
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("essay.fanout");
    }

    /**
     * 生成第一个队列
     * @return
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定队列和交换机
     *
     * @return
     */
    @Bean
    public Binding bindingQueue1(){
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }

    /**
     * 生成第二个队列
     * @return
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     *
     * @return
     */
    @Bean
    public Binding bindingQueue2(){
        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }
}

接收方代码

    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg){
        System.out.println("接收者1接收到了消息:" + msg);
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg){
        System.out.println("接收者2接收到了消息:" + msg);
    }

发送方代码:消息并不直接发送给队列,而是发送个交换机;

    @Test
    public void fanoutExchangeTest(){
    	// 第二个参数是routeKey(路由转发关键字)不能不加,可以为空字符串
        rabbitTemplate.convertAndSend("essay.fanout","", "hello everyone!");
    }

测试结果,每个队列都接收到了消息,并发给各自的接收方

在这里插入图片描述

Direct(路由)

在接收方的接收方法上,创建对应的队列、路由交换机,并设置routeKey(路由关键字),接收者1号(group1, group2),接收者2号(group1, group3)

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "essay.direct",type = ExchangeTypes.DIRECT),
            key = {"group1", "group2"}
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("接收者1号接收到了消息:" + msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "essay.direct",type = ExchangeTypes.DIRECT),
            key = {"group1", "group3"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("接收者2号接收到了消息:" + msg);
    }

发送方发送消息,routeKey = group1

	rabbitTemplate.convertAndSend("essay.direct","group1", "hello group1!");

在这里插入图片描述


发送方发送消息,routeKey = group2

	rabbitTemplate.convertAndSend("essay.direct","group2", "hello group2!");

只有接收者1号拥有group2,故只有接收者1号接收到消息

在这里插入图片描述

Topic(主题)

与路由类似,不同的是ExchangeTypes的类型key的组成,key由通配符和关键字组成

  • #:表示一个或多个字符;

  • *:表示一个字符;

如下面的三个key分别表示:

  • group.#:表示以“group”开头的消息都发过来;

  • #.class:表示以“class”结尾的消息都发过来;

  • *.person:表示两个字符,并以“person”结尾的消息都发过来;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "essay.topic",type = ExchangeTypes.TOPIC),
            key = "group.#"
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("接收者1号接收到了消息:" + msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "essay.topic",type = ExchangeTypes.TOPIC),
            key = "#.class"
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("接收者2号接收到了消息:" + msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue3"),
            exchange = @Exchange(name = "essay.topic",type = ExchangeTypes.TOPIC),
            key = "*.person"
    ))
    public void listenTopicQueue3(String msg){
        System.out.println("接收者3号接收到了消息:" + msg);
    }

发送方测试

	// 1号接收
	rabbitTemplate.convertAndSend("essay.topic","group.b.c.d", "hello NO.1!");
	
	// 2号接收
	rabbitTemplate.convertAndSend("essay.topic","b.c.d.class", "hello NO.2!");
	
	// 3号接收
	rabbitTemplate.convertAndSend("essay.topic","b.person", "hello NO.3!");

启动,测试结果如下,可以看到达到了预期结果

在这里插入图片描述

总结

RabbitMQ是一门异步通信的技术,SpringAMQP是基于RabbitMQ的模版,可以省去原始操作RabbitMQ的繁琐(建立连接、设置连接参数、创建通道、创建队列、发送消息/接收消息)。

另外,可以使用SpringAMQP建立工作队列、发布/订阅等模式,其中工作队列可设置spring.rabbitmq.listener.simple.prefetch=1,达到“能者多劳”的效果;

而发布/订阅模式又分为广播、路由和主题,广播模式需要手动建立队列和路由交换机的关联,路由与主题的区别在于路由交换机的类型和路由关键字的格式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/41337.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

SpringCloud(五)Gateway 路由网关

一、路由网关 官网地址&#xff1a;https://docs.spring.io/spring-cloud-gateway/docs/current/reference/html/ 我们需要连接互联网&#xff0c;那么就需要将手机或是电脑连接到家里的路由器才可以&#xff0c;而路由器则连接光猫&#xff0c;光猫再通过光纤连接到互联网&a…

按键控制led变化

文章目录 按键控制led变化一、简介二、代码三、仿真代码四、仿真结果五、总结 按键控制led变化 一、简介 使用按键控制开发板上一个led灯的亮灭&#xff0c;当按键按下的时候led灯就亮&#xff0c;当再一次按下按键的时候led就不亮了。由于按键存在抖动&#xff0c;按键松开的…

idea 有时提示找不到类或者符号,日志报java: 找不到符号的解决

解决一&#xff1a; idea maven编译成功&#xff0c;运行失败提示找不到符号&#xff0c;主要是get和set方法找不到符号&#xff0c;此时就是idea的lombok版本冲突 IDEA版本导致的Lombok失效&#xff0c;需要更新lombok版本到1.18.14及之后版本得到解决 <dependency>&…

Linux:squid透明代理

在传统代理上进行修改并添加网卡 这次不使用手动代理&#xff0c;而是把网关搞成代理 在下面这个链接里的文章实验下进行修改 Linux&#xff1a;squid传统代理_鲍海超-GNUBHCkalitarro的博客-CSDN博客 完成以后不用再win10上去配置&#xff0c;代理的那一步&#xff0c;然后…

3、Java入门教程【数据类型】

一、概述 java中数据类型分为两大类&#xff1a;【基本数据类型】和【引用数据类型】 二、基础数据类型 数据类型含义默认值取值范围存储大小&#xff08;字节&#xff09;整型byte字节型0-128 到 1271整型short短整型0-2^15 到 2^15-12整型int【默认】整形0-2^31 到 2^31-14…

1186. 删除一次得到子数组最大和;1711. 大餐计数;1834. 单线程 CPU

1186. 删除一次得到子数组最大和 解题思路&#xff1a;如果没做过还不是很好想&#xff0c;当时自己第一反应是双指针&#xff0c;结果是个动态规划的题。 核心就是dp的定义&#xff0c;dp[i][k]表示以arr[i]结尾删除k次的最大和。看到这里其实就有一点思路了 dp[i][0]表示以…

⛳ Git安装与配置

Git安装配置目录 ⛳ Git安装与配置&#x1f3ed; 一&#xff0c;git的安装&#x1f3a8; 1&#xff0c;下载git&#x1f463; 2&#xff0c;下载完成之后&#xff0c;双击安装即可。&#x1f4bb; 3&#xff0c;更改安装目录&#xff08;没有中文且没有空格&#xff09;&#x…

Netty核心技术十一--用Netty 自己 实现 dubbo RPC

1. RPC基本介绍 RPC&#xff08;Remote Procedure Call&#xff09;:远程 过程调用&#xff0c;是一个计算机 通信协议。该协议允许运 行于一台计算机的程序调 用另一台计算机的子程序&#xff0c; 而程序员无需额外地为这 个交互作用编程 两个或多个应用程序都分 布在不同的服…

【已解决】html元素如何使字体占据相同的元素显得整齐

本博文源于自身的亲身实践&#xff0c;让html的文本元素对齐&#xff0c;如果不让其对齐就会变得很丑陋&#xff0c;如下图&#xff0c;那么如何设置才能让元素占据相同呢&#xff1f; 文章目录 1、问题来源2、问题解决思路3、问题解决方案4、问题完整源码及效果 1、问题来源 …

摩尔投票算法(Moore‘s Voting Algorithm)及例题

摩尔投票算法&#xff08;Moores Voting Algorithm&#xff09;及例题 摩尔投票算法简介摩尔投票算法算法思想摩尔投票算法经典题目169. 多数元素229. 多数元素 II6927. 合法分割的最小下标 上午打力扣第 354 场周赛最后十五分钟用摩尔投票算法直接秒了第三题。 摩尔投票算法简…

使用原生Redis命令实现分布式锁

推荐文章&#xff1a; 1、springBoot对接kafka,批量、并发、异步获取消息,并动态、批量插入库表; ​ 2、SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据; 3、java后端接口API性能优化技巧 4、SpringBootMyBatis流式查询,处理大规模数据,提高系统的性能和响应…

一零六四、世界杯数据可视化分析(阿里云天池赛)

目录 赛制官方链接 活动背景 活动时间&#xff1a;即日起-12月31日17点 数据说明 世界杯成绩信息表&#xff1a;WorldCupsSummary 世界杯比赛比分汇总表&#xff1a;WorldCupMatches.csv 世界杯球员信息表&#xff1a;WorldCupPlayers.csv 代码实现 赛制官方链接 世界杯…

Git 学习笔记

Git 仓库中的提交记录保存的是你的目录下所有文件的快照&#xff0c;就像是把整个目录复制&#xff0c;然后再粘贴一样&#xff0c;但比复制粘贴优雅许多&#xff01; Git 希望提交记录尽可能地轻量&#xff0c;因此在你每次进行提交时&#xff0c;它并不会盲目地复制整个目录。…

使用typora+PicGo+Gitee简单实现图片上传功能

本文通过配置PicGoGitee来实现typora图片上传功能&#xff0c;系统是window 注意下载的清单有&#xff1a;PicGo&#xff0c;node.js&#xff0c;配置有&#xff1a;PicGo&#xff0c;node.js&#xff0c;gitee&#xff0c;typora 看着复杂实际上并不难&#xff0c;只是繁琐&am…

ADC 的初识

ADC介绍 Q: ADC是什么&#xff1f; A: 全称&#xff1a;Analog-to-Digital Converter&#xff0c;指模拟/数字转换器 ADC的性能指标 量程&#xff1a;能测量的电压范围分辨率&#xff1a;ADC能辨别的最小模拟量&#xff0c;通常以输出二进制数的位数表示&#xff0c;比如&am…

HttpClient使用MultipartEntityBuilder上传文件时乱码问题解决

HttpClient使用MultipartEntityBuilder是常用的上传文件的组件&#xff0c;但是上传的文件名称是乱码&#xff0c;一直输出一堆的问号&#xff1a; 如何解决呢&#xff1f;废话少说&#xff0c;先直接上代码&#xff1a; public static String doPostWithFiles(HttpClient http…

scripy其他

持久化 # 爬回来&#xff0c;解析完了&#xff0c;想存储&#xff0c;有两种方案 ## 方案一&#xff1a;一般不用 parse必须有return值&#xff0c;必须是列表套字典形式--->使用命令&#xff0c;可以保存到json格式中&#xff0c;csv中scrapy crawl cnblogs -o cnbogs.j…

【精华】maven 生命周期 + 依赖传递+ scope【依赖范围】 + 排除依赖 可选依赖

目录 一 . lifecycle 生命周期 二. 依赖 与 依赖传递 三. scope 依赖范围 scope指定依赖范围 依赖传递依赖与原依赖冲突 四 maven的可选依赖与排除依赖 可选依赖 全部 排除依赖 显式的指定 maven官网技术文档&#xff1a; 一 . lifecycle 生命周期 * clean&…

基于appium的常用元素定位方法

目录 一、元素定位工具 1.uiautomatorviewer.bat 2.appium检查器 二、常用元素定位方法 1.id定位 2.class_name定位 3.accessibility_id定位 4.android_uiautomator定位 5.xpath定位 三、组合定位 四、父子定位 五、兄弟定位 总结&#xff1a; 一、元素定位工具 app应…

postgresql regular lock常规锁申请与释放 内幕 以及fastpath快速申请优化的取舍

​专栏内容&#xff1a; postgresql内核源码分析 手写数据库toadb 并发编程 个人主页&#xff1a;我的主页 座右铭&#xff1a;天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物. 定义 每种常规锁都需要定义几个要素&#xff0c;它由结构体 Lo…