RabbitMQ消息队列

简介

MQ(message queue),从字面意思上看就个 FIFO 先入先出的队列,只不过队列中存放的内容是 message 而已,它是一种具有接收数据、存储数据、发送数据等功能的技术服务。

作用:流量削峰、应用解耦、异步处理。

在这里插入图片描述
生产者将消息发送到消息队列中,消息队列负责转发消息给消费者,消费者在处理完消息后会对消息队列进行应答,消息队列收到应答信息会将相应的消息进行丢弃。

批量应答会导致高并发时消息的丢失,所以尽力以channel.ack()进行手动应答。

docker安装

  1. 拉取镜像并后台运行
docker run -id --name=rabbitmq -v rabbitmq-home:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=yi -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq

需要将RABBITMQ_DEFAULT_USER、RABBITMQ_DEFAULT_PASS改成自己的用户名、密码。

  1. 开启manager插件,可以在网页进行管理。
 docker exec -it 容器id /bin/bash  #这里可以用docker ps 查询刚刚开启的容器id
 #进入容器后输入,开启
 rabbitmq-plugins enable rabbitmq_management

可以登录 http://服务器IP:15672 访问web管理界面,访问成功则代表开启成功。

JAVA环境搭建

jar包:

<dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>
        <dependency>
              <groupId>org.slf4j</groupId>
              <artifactId>slf4j-simple</artifactId>
              <version>1.7.25</version>
        </dependency>
    </dependencies>

Helloworld实例

生产者

public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("yi");
        connectionFactory.setPassword("123456");
        //获取连接
        Connection connection = connectionFactory.newConnection();

        //获取信道,一个连接中有多个信道
        Channel channel = connection.createChannel();
        //声明一个队列 String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        String message="hello world";
        //(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("发送成功");
    }

消费者

public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("yi");
        connectionFactory.setPassword("123456");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        DeliverCallback deliverCallback=(consumerTag,message)->{
            System.out.println(new String(message.getBody()));
        };
        CancelCallback cancelCallback=(String var1)->{
            System.out.println("消息消费被中断");
        };

        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

    }

工作队列(任务队列)

RabbitMQ默认为工作队列模式,消费者C1,C2为竞争关系,接收到的消息将轮询发送给C1,C2处理,即C1一条C2一条依次循环。
在这里插入图片描述

手动应答ack

因为自动应答不会考虑消息是否处理成功,所以可能会导致消息丢失,需要在代码中将自动应答改为手动应答。批量应答在高并发的时候也容易丢失消息,也应该关闭。

生产者的代码无需修改。
消费者:

public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        System.out.println("work2 waiting:");

        DeliverCallback deliverCallback= (String s, Delivery delivery)->{
            System.out.println(new String(delivery.getBody()));
            // do something
            //手动回复ack,false为关闭批量应答
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback=(s)->{
            System.out.println("消息被打断");
        };
        //false表示不自动应答ack
        channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);

    }

不公平分发

会存在有些线程能力差耗时长,有些能力强耗时短的情况,不公平分发将实现能者多劳。
设立channel的basicQos即可实现不公平分发, basicQos的数值意味着channel的最大存储上限,channel为1时,消费者最多同时缓存一条待处理消息。

channel.basicQos(1);

发布确认

在开启队列持久化、消息持久化后,RabbitMQ服务器仍然可能在将消息存储在磁盘前宕机,需要发布确认才能保证消息不丢失,即RabbitMQ在存储磁盘成功后,发送确认给生产者。

单个发布确认

每条消息存储在磁盘后进行发布确认,只有发送者在接收到消费者对应的发布确认消息后才会给此消费者发送下一条消息。

public static void publicMsgIndividual()throws Exception{
        Channel channel = RabbitMQUtils.getChannel();
        String QUEUE_NAME = UUID.randomUUID().toString();
        //开启持久化
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        channel.confirmSelect();//开启发布确认

        long begin = System.currentTimeMillis();

        for (int i = 0; i < 1000; i++) {
            channel.basicPublish("",QUEUE_NAME,null, new String(i+" ").getBytes());       
            boolean flag = channel.waitForConfirms(); //等待发布确认
            if(flag){
                System.out.println("消息发送成功");
             }
        }
        long end = System.currentTimeMillis();
        System.out.println("发布1000条耗时:"+(end-begin)+"ms");
    }

批量发布确认

每发送100条消息进行一次发布确认。速度快,但是不知道具体是哪一条消息发送失败了。

public static void publicMsgIndividual()throws Exception{
        Channel channel = RabbitMQUtils.getChannel();
        String QUEUE_NAME = UUID.randomUUID().toString();
        //开启持久化
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        channel.confirmSelect();//开启发布确认

        long begin = System.currentTimeMillis();

        for (int i = 0; i < 1000; i++) {
            channel.basicPublish("",QUEUE_NAME,null, new String(i+" ").getBytes());
            if(i%100==0){
                boolean flag = channel.waitForConfirms(); //等待发布确认
                if(flag){
                    System.out.println("消息发送成功");
                }
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("发布1000条耗时:"+(end-begin)+"ms");
    }

异步发布确认

推荐使用,需要加入确认发布监听器confirmListener,并且记录序列号与消息的关联(ConcurrentSkipListMap)。

 public static void publicMsgAsync()throws Exception{
        Channel channel = RabbitMQUtils.getChannel();
        String QUEUE_NAME = UUID.randomUUID().toString();
        //开启持久化
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        channel.confirmSelect();//开启发布确认

//       将序列号与信息相关联,
        ConcurrentSkipListMap concurrentSkipListMap = new ConcurrentSkipListMap<Long,String>();

        //加入确认监听器
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long msgTag, boolean multiply) throws IOException {
                System.out.println("消息发送成功:"+msgTag);
                if(multiply) { //如果是批量确认,批量删除
                    //headMap返回小于msgTag的map视图
                    ConcurrentNavigableMap concurrentNavigableMap = concurrentSkipListMap.headMap(msgTag);
                    //清理已经标记的Map
                    concurrentNavigableMap.clear();
                }else {
                    concurrentSkipListMap.remove(msgTag);
                }

            }

            @Override
            public void handleNack(long msgTag, boolean multiply) throws IOException {
                System.out.println("未确认的消息:"+concurrentSkipListMap.get(msgTag));
            }
        });
        long begin = System.currentTimeMillis();


        for (int i = 0; i < 1000; i++) {
            channel.basicPublish("",QUEUE_NAME,null, new String(i+" ").getBytes());
            //记录发送的信息与其序列号
            concurrentSkipListMap.put(channel.getNextPublishSeqNo(),new String(i+" "));
        }
        long end = System.currentTimeMillis();
        System.out.println("发布1000条耗时:"+(end-begin)+"ms");
    }

发布/订阅模式(fanout交换机)

首先要弄明白交换机和队列的关系,交换机负责信息的接收,通过不同的RountingKey将消息转发到不同的队列,每个队列上的接收者都是竞争关系(即队列上的消息只会被处理一次),那么当一个交换机对应多个队列时,每个队列仅有一个消费者,这个时候即发布/订阅模式,消息会被每个消费者接收。
在这里插入图片描述

生产者代码:向交换机中发送消息

public static final String EXCHANGE_NAME="logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String next = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,"", null,next.getBytes());
        }
    }

消费者代码:声明匿名队列,将队列绑定到交换机上,不同的消费者用相同的RountingKey,以便同时接收到消息。

public static final String EXCHANGE_NAME="logs";
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //FANOUT煽出,就是发布订阅模式

        String queue = channel.queueDeclare().getQueue(); //声明匿名队列

        channel.queueBind(queue,EXCHANGE_NAME,""); //将队列绑定到交换机上,RountingKey为“”

        DeliverCallback deliverCallback=(consumerTag,message)->{
            System.out.println("接收到消息:"+new String(message.getBody()));
        };
        channel.basicConsume(queue,true,deliverCallback, (consumerTag)->{});
    }

Direct交换机

与fanout模式相比,不同的队列有不同的Rounting key,通过Rounting Key能够直接向指定队列发送消息。

Topic交换机

rountingKey作为匹配串,发送消息时,匹配上的则能进行发送。
routingKey必须是单词列表,用.隔开。如aa.bb.cc
*可以代表一个单词 ,#可以代表若干个单词
比如向rountingKey为aa.orange.rabbit发送消息,Q1和Q2都能接收到消息,而向aa.orange.bb发送消息则只有Q1能够接收到消息。
在这里插入图片描述
当队列的rountingKey绑定的#,则相当于fanout煽出交换机。
当队列的rountingKey绑定不带#*时,相当于direct交换机。

死信队列

在队列中1消息超时、2无法处理、3队列已满时,消息会被送入死信队列。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/197049.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

黑马点评-Feed流的实现方案,基于推拉结合模式实现笔记推送

Feed流实现方案 我们关注了博主之后,当用户发布了动态后我们应该把这些数据推送给粉丝,关注推送也叫作Feed(投喂)流,通过无限下拉刷新获取新的信息 传统的模式内容检索: 粉丝需要主动通过搜索引擎或者是其他方式去查找想看的内容新型Feed流的效果: 系统分析用户到底想看什么,…

okhttp系列-拦截器的执行顺序

1.将拦截器添加到ArrayList final class RealCall implements Call {Response getResponseWithInterceptorChain() throws IOException {//将Interceptor添加到ArrayListList<Interceptor> interceptors new ArrayList<>();interceptors.addAll(client.intercept…

注意力机制(Attention Mechanism)

目录 1. 简介&#xff1a;探索注意力机制的世界 2. 历史背景 3. 核心原理 4. 应用案例 5. 技术挑战与未来趋势 6. 图表和示例 7. Conclusion 1. 简介&#xff1a;探索注意力机制的世界 在当今的人工智能&#xff08;AI&#xff09;和机器学习&#xff08;ML&#xff09;…

戴尔科技推出全新96核Precision 7875塔式工作站

工作站行业一直是快节奏且充满惊喜的。在过去25年中,戴尔Precision一直处于行业前沿,帮助创作者、工程师、建筑师、研究人员等将想法变为现实,并对整个世界产生影响。工作站所发挥的作用至关重要,被视为化不可能为可能的必要工具。如今,人工智能(AI)和生成式AI(GenAI)的浪潮正在…

npm管理发布包-创建与发布

创建与发布 我们可以将自己开发的工具包发布到 npm 服务上&#xff0c;方便自己和其他开发者使用&#xff0c;操作步骤如下 创建文件夹&#xff0c;并创建文件indexjs&#xff0c;在文件中声明函数&#xff0c;使用 module.exports 暴露npm初始化工具包&#xff0c;package.j…

浅谈硬件连通性测试几大优势

硬件连通性测试是确保硬件系统正常运行、提高系统可靠性和降低生产成本的关键步骤。在现代工程和制造中&#xff0c;将连通性测试纳入生产流程是一个明智的选择&#xff0c;有助于确保硬件产品的质量和性能达到最优水平。本文将介绍硬件连通性测试的主要优势有哪些! 一、提高系…

Java基础之集合类

Java基础之集合类 一、集合的框架1.1、集合概述1.2、集合与数组区别1.3、数组的缺点&#xff1a;1.4、常用集合分类1.5、Collection常用方法 二、List集合2.1、ArrayList2.2、LinkedList2.3、Vector2.4、区别 三、Set集合3.1、HashSet集合3.2、LinkedHashSet集合3.3、TreeSet集…

Unity 接入TapADN播放广告时闪退 LZ4JavaSafeCompressor

通过跟踪安卓日志&#xff0c;发现报如下错误 Didnt find class "com.tapadn.lz4.LZ4JavaSafeCompressor" 解决方案&#xff1a; 去掉Minify这边的勾选&#xff0c;再打包即可。

国内高速下载huggingface上的模型

前提 Python版本至少是3.8 安装 安装hugging face官方提供的下载工具 pip install -U huggingface_hub hf-transfer Windows设置环境变量 在当前窗口设置临时环境变量&#xff08;cmd.exe&#xff09; set HF_HUB_ENABLE_HF_TRANSFER 1 你也可以设置永久的环境变量&am…

MySQL基础进阶篇

进阶篇 存储引擎 MySQL体系结构&#xff1a; 存储引擎就是存储数据、建立索引、更新/查询数据等技术的实现方式。存储引擎是基于表而不是基于库的&#xff0c;所以存储引擎也可以被称为表引擎。 默认存储引擎是InnoDB。 相关操作&#xff1a; -- 查询建表语句 show create …

uniapp 导航分类

商品分类数据&#xff0c;包括分类名称和对应的商品列表点击弹出 列表的内容 展示效果如下&#xff1a; 代码展示 ①div部分 <view class"container"><view class"menu-bar"><view class"menu"><view class"menu-sc…

差异性分析方法汇总与pk

在数据研究中&#xff0c;常见的数据关系可以分为四类&#xff0c;分析是相关关系&#xff0c;因果关系、差异关系以及其它。本次所进行研究的关系为差异关系。对于差异性分析方法常见可以分为三类&#xff1a;参数检验、非参数检验以及可视化图形。 一、参数检验 1、参数检验…

Flask Session 登录认证模块

Flask 框架提供了强大的 Session 模块组件&#xff0c;为 Web 应用实现用户注册与登录系统提供了方便的机制。结合 Flask-WTF 表单组件&#xff0c;我们能够轻松地设计出用户友好且具备美观界面的注册和登录页面&#xff0c;使这一功能能够直接应用到我们的项目中。本文将深入探…

Redis(二):常见数据类型:String 和 哈希

引言 Redis 提供了 5 种数据结构&#xff0c;理解每种数据结构的特点对于 Redis 开发运维⾮常重要&#xff0c;同时掌握每 种数据结构的常⻅命令&#xff0c;会在使⽤ Redis 的时候做到游刃有余。 Redis 的命令有上百种&#xff0c;我们不可能全部死记硬背下来&#xff0c;但是…

linaro交叉编译工具链下载与使用笔记

笔记 文章目录 笔记确定目标 &#xff08;aarch64&#xff09;选择版本&#xff08;7.5&#xff09;选择目标&#xff08;aarch64-linux-gnu&#xff09;下载地址工具链&#xff08;gcc-linaro-7.5.0-2019.12-x86_64_aarch64-linux-gnu.tar.xz&#xff09;编译测试 &#xff08…

Selenium+Python做web端自动化测试框架与实例详解教程

最近受到万点暴击&#xff0c;由于公司业务出现问题&#xff0c;工作任务没那么繁重&#xff0c;有时间摸索seleniumpython自动化测试&#xff0c;结合网上查到的资料自己编写出适合web自动化测试的框架&#xff0c;由于本人也是刚刚开始学习python&#xff0c;这套自动化框架目…

python爬虫实习找工作练习测试(以下内容仅供参考学习)

要求&#xff1a;获取下图指定网站的指定数据 空气质量状况报告-中国环境监测总站 输入&#xff1a;用户输入下载时间范围&#xff0c;格式为2022-10 输出&#xff1a;将更新时间在2022年10月1日到31日之间的文件下载到本地目录&#xff08;可配置&#xff09;&#xff0c;并…

WIFI模块(esp-01s)实现天气预报代码实现

目录 前言 实现图片 一、串口编程的实现 二、发送AT指令 esp01s.c esp01s.h 三、数据处理 1、初始化 2、cjson处理函数 3、核心控制代码 四、修改堆栈大小 前言 实现图片 前面讲解了使用AT指令获取天气与cjson的解析数据&#xff0c;本章综合将时间显示到屏幕 一、…

分布式锁之基于zookeeper实现分布式锁(三)

3. 基于zookeeper实现分布式锁 实现分布式锁目前有三种流行方案&#xff0c;分别为基于数据库、Redis、Zookeeper的方案。这里主要介绍基于zk怎么实现分布式锁。在实现分布式锁之前&#xff0c;先回顾zookeeper的相关知识点 3.1. 知识点回顾 3.1.1. 安装启动 安装&#xff1a…

python回溯求解电话号码组合

给定一个仅包含数字 2-9 的字符串&#xff0c;返回所有它能表示的字母组合。答案可以按 任意顺序 返回。 给出数字到字母的映射如下&#xff08;与电话按键相同&#xff09;。注意 1 不对应任何字母。 输入&#xff1a;digits "23" 输出&#xff1a;["ad&qu…