RabbitMQ消息队列 发送和接受

步骤 1: 安装 RabbitMQ

首先,需要安装 RabbitMQ,并确保它在运行中。

下载erlang语言包OTP。官网地址:Downloads - Erlang/OTP

Rabbitmq官网下载地址:Downloading and Installing RabbitMQ — RabbitMQ

安装MQ注意事项:需要先安装Erlang语言包,然后再安装RabbitMQ,安装成功后默认登录账号和密码是guest

步骤 2: 添加 RabbitMQ 客户端库

在你的 .NET Core Web API 项目中,使用 NuGet 包管理器添加 RabbitMQ.Client 客户端库

创建两个控制台

发送

// 引入RabbitMQ客户端库
using RabbitMQ.Client;
using System.Text;

// 创建连接工厂对象(用于配置连接参数)
var factory = new ConnectionFactory();
factory.HostName = "127.0.0.1";        // RabbitMQ服务器IP地址
factory.DispatchConsumersAsync = true; // 启用异步消费者模式(即使当前是生产者)

// 定义消息路由关键参数
string exchangeName = "exchange1";    // 直连(Direct)类型交换机名称
string eventName = "myEvent";         // 路由键(RoutingKey),用于消息路由匹配

// 创建持久化连接(TCP连接复用)
using var conn = factory.CreateConnection();

// 持续发送消息的循环
while (true)
{
    // 生成消息内容:当前时间的时分秒
    string msg = DateTime.Now.TimeOfDay.ToString();
    
    // 创建临时信道(Channel),using确保资源释放
    using (var channel = conn.CreateModel())
    {
        // 设置消息属性
        var properties = channel.CreateBasicProperties();
        properties.DeliveryMode = 2; // 2=消息持久化(重启后不会丢失)
        
        // 声明直连型交换机(服务端不存在时自动创建)
        // durable参数默认为false,重启后交换机会被删除
        channel.ExchangeDeclare(
            exchange: exchangeName, 
            type: "direct"); 
        
        // 将消息转换为字节数组
        byte[] body = Encoding.UTF8.GetBytes(msg);
        
        // 发布消息到交换机
        channel.BasicPublish(
            exchange: exchangeName,     // 目标交换机
            routingKey: eventName,      // 路由键(决定哪个队列接收)
            mandatory: true,            // 开启消息回退机制(找不到队列时返回消息)
            basicProperties: properties,// 消息属性配置
            body: body);                // 消息体
    }
    
    Console.WriteLine($"已发布消息:{msg}");
    Thread.Sleep(1000); // 每秒发送一次
}

[生产者] → (发布消息到exchange1交换机) 
           ↓
exchange1根据routingKey="myEvent"路由 
           ↓
[队列] ← 需提前绑定队列到该交换机(代码中未体现)
 

接受

// 引入RabbitMQ客户端库
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

// 创建连接工厂(配置服务器参数)
var factory = new ConnectionFactory();
factory.HostName = "127.0.0.1";         // RabbitMQ服务器地址
factory.DispatchConsumersAsync = true;  // 启用异步消息处理模式(重要!提升吞吐量)

// 定义消息路由参数(需与生产者一致)
string exchangeName = "exchange1";     // 直连型交换机名称
string eventName = "myEvent";          // 路由键匹配规则

// 建立TCP连接和信道(Channel)
using var conn = factory.CreateConnection();
using var channel = conn.CreateModel(); // 信道复用,避免频繁创建开销

// 队列定义与绑定
string queueName = "queue1";
// 声明持久化队列(服务重启后仍存在)
channel.ExchangeDeclare(
    exchange: exchangeName, 
    type: "direct",                     // 直连型交换机
    durable: true);                     // 持久化交换机(建议生产环境开启)

channel.QueueDeclare(
    queue: queueName,
    durable: true,                      // 队列持久化
    exclusive: false,                   // 非排他队列(允许多消费者)
    autoDelete: false,                  // 不自动删除队列(需显式删除)
    arguments: null);                   // 无额外参数

// 绑定队列到交换机(路由键严格匹配)
channel.QueueBind(
    queue: queueName,
    exchange: exchangeName, 
    routingKey: eventName);             // 仅接收routingKey=myEvent的消息

// 配置异步消费者
var consumer = new AsyncEventingBasicConsumer(channel);
// 注册消息接收事件处理器
consumer.Received += Consumer_Received; 

// 启动消费(关闭自动确认)
channel.BasicConsume(
    queue: queueName,
    autoAck: false,                     // 手动消息确认(确保处理完成后再ACK)
    consumer: consumer);

Console.ReadLine(); // 保持程序运行

// 消息处理函数(异步)
async Task Consumer_Received(object sender, BasicDeliverEventArgs args)
{
    try
    {
        // 解析消息内容
        var bytes = args.Body.ToArray();
        string msg = Encoding.UTF8.GetString(bytes);
        Console.WriteLine($"{DateTime.Now} 处理消息:{msg}");
        
        // 模拟业务处理耗时
        await Task.Delay(800);
        
        // 手动确认消息(删除队列中的消息)
        channel.BasicAck(
            deliveryTag: args.DeliveryTag, 
            multiple: false);           // 不批量确认
        
        Console.WriteLine($"消息 {msg} 处理完成");
    }
    catch (Exception ex)
    {
        // 拒绝消息(requeue:true 重新入队)
        channel.BasicReject(
            deliveryTag: args.DeliveryTag,
            requeue: true);             // 允许消息重新投递
        
        Console.WriteLine($"消息处理失败,已重新入队。错误:{ex.Message}");
    }
}

运行效果

代码参考:杨中科老师

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/968076.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

2025最新版Node.js下载安装~保姆级教程

1. node中文官网地址:http://nodejs.cn/download/ 2.打开node官网下载压缩包: 根据操作系统不同选择不同版本(win7系统建议安装v12.x) 我这里选择最新版win 64位 3.安装node ①点击对话框中的“Next”,勾选同意后点…

Spring Boot 3.4 中 MockMvcTester 的新特性解析

引言 在 Spring Boot 3.4 版本中,引入了一个全新的 MockMvcTester 类,使 MockMvc 测试可以直接支持 AssertJ 断言。本文将深入探讨这一新特性,分析它如何优化 MockMvc 测试并提升测试的可读性。 Spring MVC 示例 为了演示 MockMvcTester 的…

WEB攻防-文件下载文件读取文件删除目录遍历目录穿越

目录 一、文件下载漏洞 1.1 文件下载案例(黑盒角度) 1.2 文件读取案例(黑盒角度) 二、文件删除 三、目录遍历与目录穿越 四、审计分析-文件下载漏洞-XHCMS 五、审计分析-文件读取漏洞-MetInfo-函数搜索 六、审计分析-…

01.Docker 概述

Docker 概述 1. Docker 的主要目标2. 使用Docker 容器化封装应用程序的意义3. 容器和虚拟机技术比较4. 容器和虚拟机表现比较5. Docker 的组成6. Namespace7. Control groups8. 容器管理工具9. docker 的优缺点10. 容器的相关技术 docker 官网: http://www.docker.com 帮助文档…

IDEA中常见问题汇总

🍓 简介:java系列技术分享(👉持续更新中…🔥) 🍓 初衷:一起学习、一起进步、坚持不懈 🍓 如果文章内容有误与您的想法不一致,欢迎大家在评论区指正🙏 🍓 希望这篇文章对你有所帮助,欢…

基于蜘蛛蜂优化算法的无人机集群三维路径规划Matlab实现

代码下载:私信博主回复基于蜘蛛蜂优化算法的无人机集群三维路径规划Matlab实现 《基于蜘蛛蜂优化算法的无人机集群三维路径规划》 摘要 本研究针对无人机集群三维路径规划问题,提出了一种基于蜘蛛蜂优化算法的解决方案。以5个无人机构成的集群为研究对…

路由过滤方法与常用工具

路由过滤 定义:路由器在发布或者接收消息时,可能需要对路由信息进行过滤。 作用:控制路由的传播与生成;节省设备和链路资源消耗,保护网络安全。 举例:学习汇总后的路由,而不学习汇总时的明细路由…

仿 RabbitMQ 实现的简易消息队列

文章目录 项目介绍开放环境第三⽅库介绍ProtobufMuduo库 需求分析核⼼概念实现内容 消息队列系统整体框架服务端模块数据管理模块虚拟机数据管理模块交换路由模块消费者管理模块信道(通信通道)管理模块连接管理模块 客户端模块 公共模块日志类其他工具类…

【天梯赛】L1-104 九宫格(C++)

易忽略的错误&#xff1a;开始习惯性地看到n就以为是n*n数组了&#xff0c;实际上应该是9*9的固定大小数组&#xff0c;查了半天没查出来 题面 L1-104 九宫格 - 团体程序设计天梯赛-练习集 代码实现 #include<bits/stdc.h> using namespace std; //易错&#xff1a;开…

CSS 小技巧 —— CSS 实现 Tooltip 功能-鼠标 hover 之后出现弹层

CSS 小技巧 —— CSS 实现 Tooltip 功能-鼠标 hover 之后出现弹层 1. 两个元素实现 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><title>纯 CSS 实现 Tooltip 功能-鼠标 hover 之后出现弹层</titl…

【转载】开源鸿蒙OpenHarmony社区运营报告(2025年1月)

●截至2025年1月31日&#xff0c;开放原子开源鸿蒙&#xff08;OpenAtom OpenHarmony&#xff0c;简称“开源鸿蒙”或“OpenHarmony”&#xff09;社区累计超过8200名贡献者&#xff0c;共63家成员单位&#xff0c;产生51.2万多个PR、2.9万多个Star、10.5万多个Fork、68个SIG。…

03:Spring之Web

一&#xff1a;Spring整合web环境 1&#xff1a;web的三大组件 Servlet&#xff1a;核心组件&#xff0c;负责处理请求和生成响应。 Filter&#xff1a;用于请求和响应的预处理和后处理&#xff0c;增强功能。 Listener&#xff1a;用于监听 Web 应用中的事件&#xff0c;实…

ASP.NET Core 如何使用 C# 向端点发出 POST 请求

使用 C#&#xff0c;将 JSON POST 到 REST API 端点&#xff1b;如何从 REST API 接收 JSON 数据。 本文需要 ASP .NET Core&#xff0c;并兼容 .NET Core 3.1、.NET 6和.NET 8。 要从端点获取数据&#xff0c;请参阅本文。 使用 . 将 JSON 数据发布到端点非常容易HttpClien…

大语言模型需要的可观测性数据的关联方式

可观测性数据的关联方式及其优缺点 随着现代分布式架构和微服务的普及&#xff0c;可观测性&#xff08;Observability&#xff09;已经成为确保系统健康、排查故障、优化性能的重要组成部分。有效的可观测性数据关联方式不仅能够帮助我们实时监控系统的运行状态&#xff0c;还…

渗透利器:Burp Suite 联动 XRAY 图形化工具.(主动扫描+被动扫描)

Burp Suite 联动 XRAY 图形化工具.&#xff08;主动扫描被动扫描&#xff09; Burp Suite 和 Xray 联合使用&#xff0c;能够将 Burp 的强大流量拦截与修改功能&#xff0c;与 Xray 的高效漏洞检测能力相结合&#xff0c;实现更全面、高效的网络安全测试&#xff0c;同时提升漏…

C语言_通讯录

“我若成佛&#xff0c;天下无魔&#xff1b;我若成魔&#xff0c;佛奈我何。” “小爷是魔&#xff0c;那又如何&#xff1f;” 下面我和一起来攻克通讯录的难关&#xff01;&#xff01; 明确通讯录的基本结构 实现一个通讯录: 人的信息: 名字年龄性别电话地址 实现通讯录的…

STM32 Flash详解教程文章

目录 Flash基本概念理解 Flash编程接口FPEC Flash擦除/写入流程图 Flash选项字节基本概念理解 Flash电子签名 函数读取地址下存放的数据 Flash的数据处理限制部分 编写不易&#xff0c;请勿搬运&#xff0c;感谢理解&#xff01;&#xff01;&#xff01; Flash基本概念…

Flutter项目试水

1基本介绍 本文章在构建您的第一个 Flutter 应用指导下进行实践 可作为项目实践的辅助参考资料 Flutter 是 Google 的界面工具包&#xff0c;用于通过单一代码库针对移动设备、Web 和桌面设备构建应用。在此 Codelab 中&#xff0c;您将构建以下 Flutter 应用。 该应用可以…

第六篇:数字逻辑的“矩阵革命”——域控制器中的组合电路设计

副标题 &#xff1a;用卡诺图破解车身域控制器的逻辑迷宫&#xff0c;揭秘华为DriveONE的“数字特工” ▍ 开篇&#xff1a;黑客帝国世界观映射 > "Welcome to the Real World." —— Morpheus > 在数字逻辑的世界里&#xff0c;组合电路就是构建Matr…

成为高能量体质:从身体神庙到精神圣殿的修炼之路

清晨五点&#xff0c;当城市还在沉睡&#xff0c;瑜伽垫上的汗水已经折射出第一缕阳光。这不是苦行僧的自虐&#xff0c;而是高能量体质者的日常仪式。在这个能量稀缺的时代&#xff0c;如何把自己修炼成一座小型核电站&#xff1f;答案就藏在身体的每个细胞里。 一、能量管理…