VEC系列-RabbitMQ 入门笔记

消息队列(MQ)对于开发者来说是一个经常听到的词汇,但在实际开发中,大多数人并不会真正用到它。网上已经有很多关于 MQ 概述和原理的详细讲解,官网文档和技术博客也都介绍得很深入,因此,我在这里就不再赘述。

我一直认为,学习一项技术不仅要知道它是什么,更重要的是知道怎么用,以及在哪些场景下应该用。所以这篇文章主要就是站在一个新手的角度进行描述以及实现MQ的实际运用。

使用MQ的常见情景

  1. 系统解耦:比如电商系统,订单系统 → 库存系统 → 物流系统 订单系统发送“新订单”消息到 MQ,库存系统和物流系统各自订阅处理。即使库存系统或物流系统短暂不可用,消息仍然可以暂存,系统整体不会受影响。这一方面说实话不是架构师也没必要太过关注,毕竟系统的底层普通开发也没这个资格去搭建。只是用于了解,不要因为这段话阻拦学习的脚步。

  2. 流量削峰,降低并发:这个比较好理解,也是最能遇到的情况。用户请求先进入 MQ 队列,由后台的消费端按照数据库的最大承载能力逐步处理请求。确保数据库不会被瞬间压垮,提高系统稳定性。还是电商系统常用些。

  3. 异步任务处理:邮件、短信、推送通知,日志处理等。

理论上MQ能做的不止这些,抛砖引玉,一起深入学习吧。

对MQ进行拆分理解

MQ里常说生产者,消费者等。我会通过简单的例子来描述:

  • 生产者:一个游戏,我是GM,我要发送公告,玩家分为普通玩家和VIP玩家等。在这里,发布公告的人就是消息的生产者。应该很好理解嗷?

  • 交换器:如上述,有普通玩家和VIP玩家等,我的公告在普通玩家面前必然是拽的很啊,但是VIP玩家面前还是要舔下的……那么我会发布一条给普通玩家的消息,和一条给VIP玩家的消息。交换器的作用在我看来就是消息的承载体,类似一条运输船,负责把消息运输给玩家们。产生消息的地方很多,但是交换器不用关心是谁发布了消息,他只承载你的消息。

  • 队列:如上述,有了运输船。那么队列有点像是码头了。普通玩家进普通码头,VIP玩家进黄金码头。各自码头停泊各自的船。总不会在普通码头取出黄金码头的货哦?

补充:交换器是有类型的:Direct(直连交换器)Fanout(扇形交换器)Topic(主题交换器)Headers(头交换器)

概念不多说了。比较常用的是Direct,Fanout

Direct:通过路由键进行匹配,运输船是一艘,但是分为普通区和VIP区,玩家凭借船票(路由键)进行取货(取消息)

Fanout:只要是是绑定了某个交换器的队列都能进行取货。玩家进普通码头就拿普通货,进黄金码头拿黄金货。当然这是举例子,玩家的队列还是要看你如何分配的。

  • 消费者:说了这么多,玩家就是消费者嗷。

MQ代码演示 

最新代码是通过 事件总线 来跨方法传递信息和触发动作。通过发布和订阅事件,模块之间能够解耦通信,使得事件的发布和处理不再依赖于直接调用方法的方式,而是通过事件总线进行跨模块、跨方法的异步传递。这种方式提高了系统的灵活性和扩展性,同时保持了模块之间的松耦合。

长代码警告,有兴趣可以fork仓库进行实际操练 VerEasy.Core

必要的知识点大致如此,通过代码+注释的形式来演示更好理解。

我这里是NETCore项目,所以还是接口的形式方便依赖注入。

接口部分代码

    public interface IRabbitMQPersistentConnection
    {
        /// <summary>
        /// 是否已经连接:判断MQ是否是连接状态
        /// </summary>
        bool IsConnected { get; }

        /// <summary>
        /// 尝试连接:断连重连方法
        /// </summary>
        /// <returns></returns>
        Task<bool> TryConnectAsync();

        /// <summary>
        /// 唯一通道:发布通道可以随时关闭,消费通道需要保持打开状态,否则无法进行消费。
        /// </summary>
        IChannel Channel { get; }

        /// <summary>
        /// 唯一连接:同理,一个连接可以有N个通道,无需建立过多连接。
        /// </summary>
        IConnection Connection { get; }

        /// <summary>
        /// 释放
        /// </summary>
        /// <returns></returns>
        Task DisposeAsync();

        /// <summary>
        /// 发布:发布消息
        /// </summary>
        /// <param name="msg"></param>
        /// <param name="exChangeName"></param>
        /// <param name="routeKey"></param>
        /// <param name="type"></param>
        /// <returns></returns>
        Task PublishAsync(string msg, string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout);

        /// <summary>
        /// 订阅:订阅队列。
        /// </summary>
        /// <returns></returns>
        Task SubscribeAsync(string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout);
    }

 接口实现部分代码

    public class RabbitMQPersistentConnection : IRabbitMQPersistentConnection
    {
        //构造函数注入,获取MQ的地址账号密码端口,如果不传就用我默认配置的。
        public RabbitMQPersistentConnection(IConnectionFactory? connectionFactory = null, int retryCount = 5)
        {
            _connectionFactory = connectionFactory ?? new ConnectionFactory
            {
                HostName = Appsettings.AppStr("RabbitMQ:Connection:HostName"),
                UserName = Appsettings.AppStr("RabbitMQ:Connection:UserName"),
                Password = Appsettings.AppStr("RabbitMQ:Connection:PassWord"),
                Port = Appsettings.AppStr("RabbitMQ:Connection:Port").ObjToInt()
            };
            //使用Policy进行重连,这个是重连次数=5
            _retryCount = retryCount;
        }
        //私有变量,获取连接成功时创建的Mq通道。
        private IChannel _channel = default!;

        public IChannel Channel
        {
            get
            {
                return _channel;
            }
        }

        /// <summary>
        /// RabbitMQ 连接工厂
        /// </summary>
        private readonly IConnectionFactory _connectionFactory;

        /// <summary>
        /// 私有变量 RabbitMQ 连接上下文
        /// </summary>
        private IConnection _connection = default!;

        /// <summary>
        /// 重连次数
        /// </summary>
        private readonly int _retryCount;

        /// <summary>
        /// 标志是否已释放
        /// </summary>
        private bool _disposed;

        /// <summary>
        /// 是否有效连接
        /// </summary>
        public bool IsConnected
        {
            get
            {
                return _connection != null && _connection.IsOpen && !_disposed;
            }
        }

        public IConnection Connection
        {
            get
            {
                return _connection;
            }
        }

        /// <summary>
        /// 手动释放
        /// </summary>
        /// <returns></returns>
        public async Task DisposeAsync()
        {
            if (_disposed) return;

            _disposed = true;

            try
            {
                await _connection.DisposeAsync();
            }
            catch (IOException ex)
            {
                Console.WriteLine(ex.Message);
            }
        }

        /// <summary>
        /// 重连机制
        /// </summary>
        /// <returns></returns>
        public async Task<bool> TryConnectAsync()
        {
            var policy = Policy.Handle<SocketException>()//捕获连接异常
                .Or<BrokerUnreachableException>()//无法连接异常
                .WaitAndRetryAsync(_retryCount, x =>
                TimeSpan.FromSeconds(Math.Pow(2, x)), (ex, time) =>
                {
                    //日志
                });

            try
            {
                await policy.ExecuteAsync(async () =>
                {
                    //重建连接【赋值给私有化变量,通过get同步给接口里的Connection和Channel】
                    _connection = await _connectionFactory.CreateConnectionAsync();
                    _channel = await _connection.CreateChannelAsync();
                });

                //如果连接成功
                if (IsConnected)
                {
                    // 连接成功后,注册连接关闭、异常、阻塞的事件处理程序
                    _connection.ConnectionShutdownAsync += OnConnectionShutdownAsync;
                    _connection.CallbackExceptionAsync += OnCallbackExceptionAsync;
                    _connection.ConnectionBlockedAsync += OnConnectionBlockedAsync;
                    return true;
                }
                else
                {
                    return false;
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine($"重连失败,最终抛出异常: {ex.Message}");
                return false;
            }
        }

        private async Task OnConnectionShutdownAsync(object sender, ShutdownEventArgs e)
        {
            if (_disposed) return;
            Console.WriteLine("RabbitMQ连接关闭,正在尝试重连...");
            await TryConnectAsync();
        }

        private async Task OnCallbackExceptionAsync(object sender, CallbackExceptionEventArgs e)
        {
            if (_disposed) return;
            Console.WriteLine($"RabbitMQ连接出现异常,正在尝试重连... 异常信息: {e.Exception.Message}");
            await TryConnectAsync();
        }

        private async Task OnConnectionBlockedAsync(object sender, ConnectionBlockedEventArgs e)
        {
            if (_disposed) return;
            Console.WriteLine("RabbitMQ连接被阻塞,正在尝试重连...");
            await TryConnectAsync();
        }

        //发布消息
        public async Task PublishAsync(string msg, string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout)
        {
            //判断是否连接状态,没有连接就重连
            if (!IsConnected)
            {
                await TryConnectAsync();
            }

            //创建通道,因为是发布消息,通道不用常打开,所以使用了USING
            using var channel = await _connection.CreateChannelAsync();
            //【ExchangeDeclareAsync】声明交换机,exchange:交换机名称,type:交换机类型
            await channel.ExchangeDeclareAsync(exchange: exChangeName, type: type);
            
            //msg就是消息,需要传递Byte[]
            var body = Encoding.UTF8.GetBytes(msg);
            
            //启动消息持久化,我的项目里使用MQ来进行公告的推送,使用的Fanout类型交换机,故此消息保持持久化。
            var properties = new BasicProperties()
            {
                Persistent = true,
            };

            //发布消息
            await channel.BasicPublishAsync(
                exchange: exChangeName,
                routingKey: routeKey,
                mandatory: false,
                basicProperties: properties,
                body: body);
        }

        //订阅消息
        public async Task SubscribeAsync(string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout)
        {
            if (!IsConnected)
            {
                await TryConnectAsync();
            }
            
            //【queue】队列
            string queueName = string.IsNullOrWhiteSpace(routeKey) ? exChangeName : routeKey;
            //【durable】持久化队列,MQ服务器不会删除它。
            QueueDeclareOk queueDeclareResult = await Channel.QueueDeclareAsync(
                 queue: queueName,
                  durable: true,
                  exclusive: false,
                  autoDelete: false);

            //根据queue,exchange,routingKey 对 交换机和队列进行绑定,如果是Fanout类型不需要routeKey。
            await Channel.QueueBindAsync(queue: queueName, exchange: exChangeName, routingKey: routeKey);
            //创建消费者
            var consumer = new AsyncEventingBasicConsumer(Channel);
            //消费者消费后执行方法
            consumer.ReceivedAsync += async (model, ea) =>
            {
                byte[] body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                //确认消息已被消费,这样后续该消息就不会被该队列继续消费到了。
                await Channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
            };
            //启动消费者队列,将消费者和队列绑定
            await Channel.BasicConsumeAsync(queueName, autoAck: false, consumer: consumer);
        }
    }

 

MQ服务注入

            if (Appsettings.AppStr("RabbitMQ:Enable").ObjToBool())
            {
                services.AddSingleton<IRabbitMQPersistentConnection>(x =>
                {
                    var connectionFactory = new ConnectionFactory()
                    {
                        HostName = Appsettings.AppStr("RabbitMQ:Connection:HostName"),
                        UserName = Appsettings.AppStr("RabbitMQ:Connection:UserName"),
                        Password = Appsettings.AppStr("RabbitMQ:Connection:PassWord"),
                        Port = Appsettings.AppStr("RabbitMQ:Connection:Port").ObjToInt()
                    };
                    var mq = new RabbitMQPersistentConnection(connectionFactory);

                    return mq;
                });
            }

 


我在注入各种服务时,添加了一些日志进行输出,效果如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/982820.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

大模型应用开发学习笔记

Huggingface 下载模型&#xff1a; model_dirr"G:\python_ws_g\code\LLMProject\session_4\day02_huggingface\transformers_test\model\uer\uer\gpt2-chinese-cluecorpussmall\models--uer--gpt2-chinese-cluecorpussmall\snapshots\c2c0249d8a2731f269414cc3b22dff021…

使用 Elasticsearch 进行集成测试初始化​​数据时的注意事项

作者&#xff1a;来自 Elastic piotrprz 在创建应该使用 Elasticsearch 进行搜索、数据聚合或 BM25/vector/search 的软件时&#xff0c;创建至少少量的集成测试至关重要。虽然 “模拟索引” 看起来很诱人&#xff0c;因为测试甚至可以在几分之一秒内运行&#xff0c;但它们实际…

高并发内存池 · 基本认识

目录 前言&#xff1a; 项目基础认识 内存碎片 效率问题 定长内存池 切内存 给谁切&#xff1f;怎么切&#xff1f; 怎么管理回收内存&#xff1f; 前言&#xff1a; 本文呢开始搞搞项目咯&#xff0c;于是准备从一个最经典的项目入手--tcmalloc&#xff0c;也就是从谷…

通用信息抽取大模型PP-UIE开源发布,强化零样本学习与长文本抽取能力,全面适配多场景任务

背景与简介 信息抽取&#xff08;information extraction&#xff09;是指&#xff0c;从非结构化或半结构化数据&#xff08;如自然语言文本&#xff09;中自动识别、提取并组织出结构化信息。通常包含多个子任务&#xff0c;例如&#xff1a;命名实体识别&#xff08;NER&am…

游戏引擎学习第140天

回顾并为今天的内容做准备 目前代码的进展到了声音混音的部分。昨天我详细解释了声音的处理方式&#xff0c;声音在技术上是一个非常特别的存在&#xff0c;但在游戏中进行声音混音的需求其实相对简单明了&#xff0c;所以今天的任务应该不会太具挑战性。 今天我们会编写一个…

Goby 漏洞安全通告| Ollama /api/tags 未授权访问漏洞(CNVD-2025-04094)

漏洞名称&#xff1a;Ollama /api/tags 未授权访问漏洞&#xff08;CNVD-2025-04094&#xff09; English Name&#xff1a;Ollama /api/tags Unauthorized Access Vulnerability (CNVD-2025-04094) CVSS core: 6.5 风险等级&#xff1a; 中风险 漏洞描述&#xff1a; O…

Python----数据分析(Matplotlib五:pyplot的其他函数,Figure的其他函数, GridSpec)

一、pyplot的其他函数 1.1、xlabel 在matplotlib中&#xff0c; plt.xlabel() 函数用于为当前活动的坐标轴&#xff08;Axes&#xff09;设置x轴的 标签。当你想要标识x轴代表的数据或单位时&#xff0c;这个函数非常有用。 plt.xlabel(xlabel text) 1.2、ylabel 在matplotl…

构建python3.8的docker镜像,以便解决: dlopen: /lib64/libc.so.6: version `GLIBC_2.28‘

1、简介 在使用pyinstaller打包工具打包应用为二进制的时候&#xff0c;出现了一个“”: dlopen: /lib64/libc.so.6: version GLIBC_2.28”的问题 2、解决方案 2.1、问题原因 由于使用了官方提供的镜像&#xff0c;而官方提供的镜像编译的机器上、glibc的版本过高&#xff…

音频3A测试--AEC(回声消除)测试

一、测试前期准备 一台录制电脑:用于作为近段音源和收集远端处理后的数据; 一台测试设备B:用于测试AEC的设备; 一个高保真音响:用于播放设备B的讲话; 一台播放电脑:用于模拟设备A讲话,和模拟设备B讲话; 一台音频处理器(调音台):用于录制和播放数据; 测试使用转接线若…

MATLAB程序介绍,三维环境下的IMM(交互式多模型),使用CV和CT模型,EKF作为滤波

本文所述的MATLAB代码为三维的交互式多模型&#xff08;IMM&#xff09;滤波器&#xff0c;结合了匀速直线运动&#xff08;CV模型&#xff09;和匀速圆周运动&#xff08;CT模型&#xff09;的状态估计。使用扩展卡尔曼滤波&#xff08;EKF&#xff09;来处理状态更新与观测数…

upload-labs详解(1-12)文件上传分析

目录 uploa-labs-main upload-labs-main第一关 前端防御 绕过前端防御 禁用js Burpsuite抓包改包 upload-labs-main第二关 上传测试 错误类型 upload-labs-env upload-labs-env第三关 上传测试 查看源码 解决方法 重命名&#xff0c;上传 upload-labs-env第四关…

第一:goland安装

GOPROXY (会话临时性)&#xff0c;长久的可以在配置文件中配置 go env -w GOPROXYhttps://goproxy.cn,direct 长久的&#xff0c;在~/.bashrc文件中添加&#xff1a; export GOPROXYhttps://goproxy.cn,direct &#xff0d;&#xff0d;&#xff0d;&#xff0d;&#xff0d…

ASP使用EFCore和AutoMapper添加导航属性数据

目录 一、不使用自增主键 &#xff08;1&#xff09;下载AutoMapper的nuget包 &#xff08;2&#xff09;配置映射规则 &#xff08;3&#xff09;配置MappingProfile文件 &#xff08;4&#xff09;控制器编写添加控制器 &#xff08;5&#xff09;测试 二、使用自增主…

什么是Jmeter? Jmeter工作原理是什么?

第一篇 什么是 JMeter&#xff1f;JMeter 工作原理 1.1 什么是 JMeter Apache JMeter 是 Apache 组织开发的基于 Java 的压力测试工具。用于对软件做压力测试&#xff0c;它最初被设计用于 Web 应用测试&#xff0c;但后来扩展到其他测试领域。 它可以用于测试静态和动态资源…

汽车零部件厂如何选择最适合的安灯系统解决方案

在现代制造业中&#xff0c;安灯系统作为一种重要的生产管理工具&#xff0c;能够有效提升生产线的异常处理效率&#xff0c;确保生产过程的顺畅进行。对于汽车零部件厂来说&#xff0c;选择一套适合自身生产需求的安灯系统解决方案尤为重要。 一、安灯系统的核心功能 安灯系统…

Ubuntu20.04双系统安装及软件安装(七):Anaconda3

Ubuntu20.04双系统安装及软件安装&#xff08;七&#xff09;&#xff1a;Anaconda3 打开Anaconda官网&#xff0c;在右侧处填写邮箱&#xff08;要真实有效&#xff01;&#xff09;&#xff0c;然后Submit。会出现如图示的Success界面。 进入填写的邮箱&#xff0c;有一封Ana…

为解决局域网IP、DNS切换的Windows BAT脚本

一、背景 为解决公司普通人员需要切换IP、DNS的情况&#xff0c;于是搞了个windows下的bat脚本&#xff0c;可以对有线网络、无线网络进行切换设置。 脚本内容 echo off title 多网络接口IP切换工具:menu cls echo echo 请选择要配置的网络接口: echo echo 1. 有线网络&am…

【OMCI实践】wireshark解析脚本omci.lua文件(独家分享)

引言 omci.lua文件是Wireshark的OMCI协议解析插件的核心组件。它配合BinDecHex.lua&#xff0c;可以解析OMCI协议的数据包&#xff0c;提取出消息类型、受管实体标识、受管实体属性等关键信息&#xff0c;并以人类可读的形式显示在Wireshark的解码视图中&#xff0c;方便研发人…

JPA编程,去重查询ES索引中的字段,对已有数据的去重过滤,而非全部字典数据

一、背景 课程管理界面&#xff0c;查询前&#xff0c;需要把查询元数据给出。 学科列表、学段列表和分类列表&#xff0c;我们把它定义为查询元数据。 一般的业务需求是&#xff1a; 系统维护好多个字典&#xff0c;比如学科、学段等等&#xff0c;相当于属性库。 但是&…

vue3与react、 react hooks

一、Vue3新特性&#xff1a;setup、ref、reactive、computed、watch、watchEffect函数、生命周期钩子、自定义hooks函数、toRef和toRefs、shallowReactive 与 shallowRef、readonly 与 shallowReadonly、toRaw 与 markRaw、customRef、provide 与 inject、Fragment、Teleport、…