RabbitMQ 客户端 连接、发送、接收处理消息

RabbitMQ 客户端 连接、发送、接收处理消息

一. RabbitMQ 的机制跟 Tcp、Udp、Http 这种还不太一样

RabbitMQ 服务,不是像其他服务器一样,负责逻辑处理,然后转发给客户端
而是所有客户端想要向 RabbitMQ服务发送消息,

第一步:创建一个链接 RabbitMQ 服务的连接

需要传入 RabbitMQ服务地址、用户名、密码,然后在连接代码中传入一个 queue 的字符串作为 标志
连接成功后,RabbitMQ服务上就可以看到这个链接了
如下图,可以看到有一个 Name = queueL1 的连接,后边有链接状态、消息数
Ready 和 Total 都是 0
在这里插入图片描述

向 RabbitMQ 发送消息的:

(1) 如果没有建立连接,执行第一步,建立一个链接
(2) 通过 发送消息接口向 RabbitMQ 服务 发消息
(3) RabbitMQ 服务接收到消息,只是按照连接的 queue 分别把消息放在自己名字的 queue 下, RabbitMQ 服务只是存着客户端发送的消息,服务什么都不处理

向 RabbitMQ 服务发送几条消息
下图可以看到 queueL1 的队列已经接收了 5 条消息,这五条消息如果没有客户端接收处理,就一直在这存着
在这里插入图片描述

接收 RabbitMQ 服务消息:

(1) 如果没有建立连接,执行第一步,建立一个链接
(2) 注册接收消息接口,在 RabbitMQ 中叫 消费消息,可以标记消费消息后是否将 RabbitMQ 的数据删除
(3) 如果 RabbitMQ 服务收到消息,就转发给 注册接收消息接口的 连接,如果接收的连接标记了 AutoDelete,那么发送给客户端后,RabbitMQ 就会将消息从消息队列中删除

注册接收消息,我的客户端就会收到 RabbitMQ 发送过来的消息,消息中包含发送上来的消息内容,还有发送消息的 queue 名字

此时再看,就会发现 Ready 和 Total 又变成 0 了
在这里插入图片描述

为什么上面讲解中将 接收 RabbitMQ 服务消息、向 RabbitMQ 发送消息的 分开说
是因为 RabbitMQ 发送消息就仅仅是发消息,发送完就不管了
而 RabbitMQ 的消费消息(接收消息) 也仅仅是接收消息,它不管是谁发的消息,只要是发送的 RabbitMQ 服务的消息,它都能接收,

(3.1) 比如我创建了 一个 连接,queue名为 xxxA,
它发送了消息 “Hello World”,
xxxA 连接自己又注册了 消费消息(接收消息),那么xxxA 自己就会接收到 xxxA 队列发送的 Hello World 信息

(3.2) 我又创建了 新的连接,queue 名还是 xxxA
那么新的连接也可以收到 (3.1) 发的 消息 HelloWorld

二. 客户端连接服务器
  1. 实例化一个 连接 RabbitMQ 服务的客户端连接
    实例化需要传入 服务地址、端口、用户名、密码

    using RabbitMQ.Client;
    using System;
    using System.Threading.Tasks;
    using RabbitMQ.Client.Exceptions;
    using UnityEngine;
    using System.Text;
    using RabbitMQ.Client.Events;

    namespace Network
    {
    ///
    /// RabbitMQ 创建一个链接
    /// 供 RabbitMQReceive、RabbitMQSend 使用
    ///
    public class RabbitMQConnect
    {
    private RabbitMQConnectData connectData;

        private ConnectionFactory factory;
        private IChannel channel;
        private IConnection connection;
    
        private NetWorkState state;
    
        private Action<string, byte[]> receivedCallBack;
    
        private const int TimeOut = 10; //连接超时 10 秒
        private bool dispose = false;
    
        public RabbitMQConnect(RabbitMQConnectData connectData)
        {
            this.connectData = connectData;
            State = NetWorkState.Disconnected;
            dispose = false;
        }
    
        public string Queue
        {
            get { return connectData.queue; }
        }
    
        public NetWorkState State
        {
            get { return state; }
            private set { state = value; }
        }
    
        public IChannel Channel
        {
            get { return channel; }
        }
    
        /// <summary>
        /// 网络是否连接中
        /// </summary>
        public bool IsConnect
        {
            get
            {
                if (null == channel || null == connection)
                {
                    return false;
                }
                return channel.IsOpen && connection.IsOpen;
            }
        }
    
        public async Task StartConnect()
        {
            if (State == NetWorkState.Connecting)
            {
                await Task.Delay(TimeOut * 1000);
            }
    
            if (State == NetWorkState.Connected)
            {
                return;
            }
            // 创建连接工厂
            // 如果初始化失败,不会启动恢复连接
            //factory = new ConnectionFactory()
            //{
            //    HostName = hostName, // 替换为你的 RabbitMQ 服务器地址
            //    UserName = userName, // 替换为用户名
            //    Password = password  // 替换为密码
            //};
    
            string url = $"amqp://{connectData.userName}:{connectData.password}@{connectData.hostName}:{connectData.port}"; //string.Format("amqp://unity:unity@139.9.137.14:5672");
            factory = new ConnectionFactory()
            {
                Uri = new Uri(url)
            };
    
            // 自动恢复连接
            factory.AutomaticRecoveryEnabled = true;
            // 如果由于异常导致恢复失败(例如RabbitMQ节点仍然不可达),它将在固定的时间间隔(默认为5秒)后重试。间隔时间可配置如下
            // Connection.CloseAsync 关闭的连接不会启动自动恢复连接
            factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);
            factory.TopologyRecoveryEnabled = true;
    
            while (State != NetWorkState.Connected)
            {
                if (!dispose)
                {
                    await Connect();
                }
            }
    
            await Task.Delay(1);
    
            if (!string.IsNullOrEmpty(connectData.receiveQueeu))
            {
                await BasicConsumer();
            }
        }
    
        private async Task Connect()
        {
            try
            {
                State = NetWorkState.Connecting;
                // 异步创建连接
                connection = await factory.CreateConnectionAsync();
                channel = await connection.CreateChannelAsync();
                // 声明队列
                QueueDeclareOk queueDeclareOk = await channel.QueueDeclareAsync(
                    queue: connectData.queue,
                    durable: false,
                    exclusive: false,
                    autoDelete: false,
                    arguments: null);
    
                /*
                    autoDelete = true:没有消费者时队列自动删除,通常用于临时或一次性的队列。
                    autoDelete = false:队列不会自动删除,通常用于需要长期存在的队列。
                    选择是否设置 autoDelete = true 取决于你是否希望队列在没有消费者时自动删除。如果你的队列是临时的、一次性的,那么使用 autoDelete = true 会更适合;如果队列是长期需要使用的,则设置为 autoDelete = false 会更为合适 
                */
    
                State = NetWorkState.Connected;
    
                // 设置消费者的预取计数为10,允许同时处理10条消息
                await channel.BasicQosAsync(
                    prefetchSize: 0, 
                    prefetchCount: 10, 
                    global: false);
                Debug.Log("RabbitMQ Connect Success");
                GameNotifycation.GetInstance().Notify<NetWorkState>(ENUM_MSG_TYPE.MSG_NETWORK_STATE_CHANGE, State);
            }
            catch (BrokerUnreachableException e)
            {
                await Task.Delay(5000);
                State = NetWorkState.ConnectFailed;
                Debug.LogError("ConnectError:" + e.ToString());
                // apply retry logic
            }
    
            await Task.Delay(1);
        }
    
        /// <summary>
        /// 发送消息
        /// exchange:   要发布消息的交换机名称。
        /// routingKey: 路由键,决定消息应该路由到哪个队列。
        /// mandatory:  如果设置为 true,RabbitMQ 会确保消息至少被投递到一个队列。如果没有队列接收该消息,RabbitMQ 会触发 basic.return。
        /// immediate:  如果设置为 true,RabbitMQ 会在消息无法立即被消费时丢弃消息。
        /// basicProperties: 消息的属性,类型为 IBasicProperties。这些属性可以设置消息的优先级、持久性等。
        /// body: 消息体的字节数组。
        /// 
        /// BasicPublishAsync 方法 没有返回消息投递的结果。它仅仅表示“请求已经被成功发送到 RabbitMQ 的交换机”。如果发布操作成功,Task 会正常完成,不会抛出异常。你可以通过异常处理来捕获潜在的错误。
        /// </summary>
        /// <param name="msg"></param>
        public async Task SendAsync(string message)
        {
            if (!IsConnect)
            {
                UnityEngine.Debug.Log("Send not IsConnect");
                await StartConnect();
            }
    
            try
            {
                IChannel channel = Channel;
                var body = Encoding.UTF8.GetBytes(message);
                var props = new BasicProperties();
                props.ContentType = "text/plain";
                props.DeliveryMode = DeliveryModes.Transient;
                await channel.BasicPublishAsync(
                    exchange: "",
                    routingKey: Queue,
                    mandatory: false,
                    basicProperties: props,
                    body: body).ConfigureAwait(false);
    
                //Debug.Log($"[x] Sent: Complete");
            }
            catch (Exception ex)
            {
                UnityEngine.Debug.LogError($"Error publishing message: {ex.Message}");
            }
        }
    
        /// <summary>
        /// 设置接收消息回调
        /// </summary>
        /// <param name="receivedCallBack"></param>
        public void SetReceive(Action<string, byte[]> receivedCallBack)
        {
            this.receivedCallBack = receivedCallBack;
        }
    
        /// <summary>
        /// 创建异步消费者
        /// </summary>
        /// <returns></returns>
        public async Task<string> BasicConsumer()
        {
            if (!IsConnect)
            {
                await StartConnect();
            }
    
            var consumer = new AsyncEventingBasicConsumer(Channel);
            // 处理消息的异步回调逻辑
            consumer.ReceivedAsync += ReceivedAsync;
    
            // 开始消费
            string result = await Channel.BasicConsumeAsync(
                queue: connectData.receiveQueeu,  // 指定消费者要监听的队列名称
                autoAck: false,        // 决定是否自动确认消息。如果 true,消息在交付时会自动确认。如果 false,则需要手动调用 BasicAck 确认消息
                consumer: consumer);  // 指定消息的处理方式,通过实现 IBasicConsumer 接口来定义如何处理从队列中接收到的消息
    
            /*
                autoAck = true:消息一旦传递给消费者,RabbitMQ 就认为该消息被成功处理,无需再确认。
                autoAck = false:消费者需要显式地调用 channel.BasicAck 来确认消息的处理,通常用于消息处理失败时能够重试消息。
            */
    
            return result;
        }
    
        /// <summary>
        /// 异步接收消息
        /// 如果 Channel.BasicConsumeAsync 方法中 autoAck 设置为 true,那么 channel.BasicAckAsync 调用是不允许的
        /// 想在  Channel.BasicConsumeAsync 消费消息收到消息时 调用 channel.BasicAckAsync,必须将 Channel.BasicConsumeAsync 方法中 autoAck 设置为 false
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="eventArgs"></param>
        /// <returns></returns>
        private async Task ReceivedAsync(object sender, BasicDeliverEventArgs eventArgs)
        {
            try
            {
                //Debug.Log("ReceivedAsync");
                AsyncEventingBasicConsumer consumer = sender as AsyncEventingBasicConsumer;
                string queue = consumer.Channel.CurrentQueue;
                var body = eventArgs.Body.ToArray();
                receivedCallBack?.Invoke(queue, body);
                // 模拟异步任务处理(比如访问数据库或调用其他服务)
                await channel.BasicAckAsync(eventArgs.DeliveryTag, false);
            }
            catch (Exception ex)
            {
                Debug.LogError($"Error processing message: {ex.Message}");
                // 如果处理失败,可以拒绝并重新入队(可选)
                //await Channel.BasicNackAsync(eventArgs.DeliveryTag, multiple: false, requeue: true);
            }
            await Task.Delay(1);
        }
    
        /// <summary>
        /// 关闭连接
        /// </summary>
        public async void Dispose()
        {
            dispose = true;
            // 先关闭通道、再关闭连接
            if (channel != null)  // 通道关闭
            {
                await channel.CloseAsync();
                channel = null;
            }
    
            if (connection != null)  // 连接关闭
            {
                UnityEngine.Debug.Log("ConnectDispose");
                await connection.CloseAsync();
                connection = null;
            }
            await Task.Delay(1);
        }
    }
    

    }

RibbitMQ 服务通过 queue 来区分每一个连接的客户端,代码部分如下

QueueDeclareOk queueDeclareOk = await channel.QueueDeclareAsync(
                    queue: queue,
                    durable: false,
                    exclusive: false,
                    autoDelete: false,
                    arguments: null);
  1. 客户端实例

  2. 测试用例

    using UnityEngine;
    using Network;
    using LitJson;
    using System.Text;
    using System.Collections;
    using System.Collections.Generic;

    public class RabbitMQDemo : MonoBehaviour
    {
    // 客户端
    private RabbitMQConnect rabbitMQConnect;
    private Queue receiveQueue = new Queue();

    void Start()
    {
        RabbitMQConnectData connectData = new RabbitMQConnectData();
        connectData.queue = "TestA";
        connectData.receiveQueeu = "TestA";
        connectData.hostName = "XXX.XXX.XXX.XXX";
        connectData.port = "5672";
        connectData.userName = "unity";
        connectData.password = "unity";
    
        // 实例化
        rabbitMQConnect = new RabbitMQConnect(connectData);
        rabbitMQConnect.SetReceive(Receive);
        StartConnect();
    }
    
    private async void StartConnect()
    {
        await rabbitMQConnect.StartConnect();
    }
    
    private async void Send(string meg)
    {
        await rabbitMQConnect.SendAsync(meg);
    }
    
    private void Receive(string queue, byte[] byteData)
    {
        var json = Encoding.UTF8.GetString(byteData);
        UnityEngine.Debug.Log($"[x] ReceivedAsync: {json}");
        receiveQueue.Enqueue(netWorkData);
    }
    
    private int number = 1000;
    // Update is called once per frame
    void Update()
    {
        if (Input.GetKeyDown(KeyCode.A))
        {
            ++number;
            Send("Hello RabbitMQ:" + number);
        }
    
        DispatchMessage();
    }
    
    private void DispatchMessage()
    {
        if (receiveQueue.Count <= 0)
        {
            return;
        }
    
        string json = receiveQueue.Dequeue();
    }
    
    private void OnDestroy()
    {
        Debug.LogError("OnDestroy");
        rabbitMQConnect.Dispose();
    }
    

    }

    /// <summary>
    /// 网络连接状态
    /// </summary>
    public enum NetWorkState
    {
    	// init
    	/// <summary>
    	/// 关闭/断开连接
    	/// </summary>
    	Closed,
    
    	// client
    	/// <summary>
    	/// 已经建立连接
    	/// </summary>
    	Connected,
    
    	/// <summary>
    	/// 正在请求连接
    	/// </summary>
    	Connecting,
    
    	/// <summary>
    	/// 连接失败
    	/// </summary>
    	ConnectFailed,
    
    	// both
    	/// <summary>
    	/// 连接超时
    	/// </summary>
    	Timeout,
    
    	/// <summary>
    	/// 断开连接
    	/// </summary>
    	Disconnected,
    }
    

扩展
可以在 网页上 Overview 页面,找到 Ports and contexts 部分
可以看到每种协议对应的端口是不一样的
每种协议都有一种独立的连接方式
需要根据自己选择的协议拼接路径

比如 我上面代码使用的 http 方式

    string localHost = "localhost"; // ip如 xxx.xxx.xxx.xxx
    string userName = "用户名";
    string password = "密码";
	// 创建连接工厂
	// 如果初始化失败,不会启动恢复连接
	factory = new ConnectionFactory()
	{
	    HostName = hostName, // 替换为你的 RabbitMQ 服务器地址
	    UserName = userName, // 替换为用户名
	    Password = password  // 替换为密码
	};

amqp 协议连接方式如下

	string url = $"amqp://{userName}:{password}@{hostName}:{port}"; 
	factory = new ConnectionFactory()
	{
	    Uri = new Uri(url)
	};

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/948631.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

仿生的群体智能算法总结之二(十种)

群体智能算法是一类通过模拟自然界中的群体行为来解决复杂优化问题的方法。以下是10种常见的群体智能算法,接上文https://blog.csdn.net/lzm12278828/article/details/144933367仿生的群体智能算法总结之一(十种)-CSDN博客https://blog.csdn.net/lzm12278828/article/detail…

Jenkins(持续集成与自动化部署)

Jenkins 是一个开源软件项目&#xff0c;是基于Java开发的一种持续集成工具。 官网&#xff1a;https://www.jenkins.io/ GitLab安装使用 安装前提&#xff1a;内存至少需要4G 官方网站&#xff1a;https://about.gitlab.com/ 安装文档&#xff1a;https://docs.gitlab.c…

Luma AI 简单几步生成视频

简单几步生成视频 登录我们的 AceDataPlatform 网站&#xff0c;按照下图所示即可生成高质量的视频&#xff0c;同时&#xff0c;我们也提供了简单易用的 API 方便集成调用&#xff0c;可以查看 Luma API了解详情 技术介绍 我们使用了 Luma 的技术&#xff0c;实现了上面的图…

Day17补代码随想录 654.最大二叉树|617.合并二叉树|700.二叉搜索树中的搜索|98.验证二叉搜索树

654.最大二叉树 题目 【体会为什么构造二叉树都是前序遍历】 给定一个不重复的整数数组 nums 。 最大二叉树 可以用下面的算法从 nums 递归地构建: 创建一个根节点&#xff0c;其值为 nums 中的最大值。递归地在最大值 左边 的 子数组前缀上 构建左子树。递归地在最大值 右…

vue代理问题

vue代理问题 场景:前后端分离项目问题,在前端中请求接口,返回数据这个过程,但是在这个过程中,前端会有两个环境,一个是开发环境,一个是生产环境. 在开发环境中请求接口可能会遇到跨域问题,比如请求的端口是3000,当前端口是8080,这时候就会遇到跨域问题,或者ip不同,也会存在跨…

学英语学压测:02jmeter组件-测试计划和线程组ramp-up参数的作用

&#x1f4e2;&#x1f4e2;&#x1f4e2;&#xff1a;先看关键单词&#xff0c;再看英文&#xff0c;最后看中文总结&#xff0c;再回头看一遍英文原文&#xff0c;效果更佳&#xff01;&#xff01; 关键词 Functional Testing功能测试[ˈfʌŋkʃənəl ˈtɛstɪŋ]Sample样…

phpIPAM容器化部署场景下从1.5.x更新到1.7.0提示禁用安装脚本配置的处理

phpIPAM容器化部署场景下从1.5.x更新到1.7.0&#xff0c;在系统登录页面出现“Please disable installaion scripts....”提示&#xff0c;本文件记录处理过程。 一、问题描述 phpIPAM从1.5.x更新到1.7.0&#xff0c;在系统登录页面出现提示&#xff1a; “Please disable in…

第三届图像处理、计算机视觉与机器学习国际学术会议(ICICML 2024)

目录 重要信息 大会简介 组织单位 大会成员 征稿主题 会议日程 参会方式 重要信息 大会官网&#xff1a;www.icicml.org 大会时间&#xff1a;2024年11月22日-24日 大会地点&#xff1a;中国 深圳 大会简介 第三届图像处理、计算机视觉与机器学…

技术人做Youtuber第一次实战

2025年第一篇&#xff0c;新年好~ 大概2012年还是大三时&#xff0c;不记得从哪里搞到了youtube注册方法&#xff0c;注册了youtube, facebook等被"walled"的网站&#xff0c;当时沉迷海贼王&#xff0c;上传了类似"六分钟看海贼王多热血"的视频&#xff0…

仓颉笔记——windows11安装启用cangjie语言,并使用vscode编写“你好,世界”

2025年1月1日第一篇日记&#xff0c;大家新年好。 去年就大致看了一下&#xff0c;感觉还不错&#xff0c;但一直没上手&#xff0c;这次借着元旦的晚上安装了一下&#xff0c;今年正式开动&#xff0c;公司众多的应用国产化正等着~~ 第一步&#xff1a;准备 官网&#xff1a;…

大模型数据采集和预处理:把所有数据格式,word、excel、ppt、jpg、pdf、表格等转为数据

大模型数据采集和预处理&#xff1a;把所有数据格式&#xff0c;word、excel、ppt、jpg、pdf、表格等转为数据 文本/图片/表格&#xff0c;分别提取处理工具选择不同格式文件&#xff0c;使用不同工具处理1. 确认目标2. 分析过程(目标-手段分析法)3. 实现步骤4. 代码封装效果展…

使用函数求e的近似值(PTA)C语言

自然常数e可以用级数11/1!1/2!⋯1/n!来近似计算。本题要求实现一个计算阶乘的简单函数&#xff0c;使得可以利用该函数&#xff0c;对给定的非负整数n&#xff0c;求该级数的前n1项和。 函数接口定义&#xff1a; double fact( int n ); 其中n是用户传入的参数&#xff0c;函…

9.系统学习-卷积神经网络

9.系统学习-卷积神经网络 简介输入层卷积层感受野池化层全连接层代码实现 简介 卷积神经网络是一种用来处理局部和整体相关性的计算网络结构&#xff0c;被应用在图像识别、自然语言处理甚至是语音识别领域&#xff0c;因为图像数据具有显著的局部与整体关系&#xff0c;其在图…

循环冗余校验CRC的介绍

一、简介 循环冗余校验CRC&#xff08;Cyclic Redundancy Check&#xff09;是数据通信领域中最常用的一种差错校验码。该校验方法中&#xff0c;使用多项式出发&#xff08;模2除法&#xff09;运算后的余数为校验字段。CRC只能实现检错&#xff0c;不能实现纠错&#xff0c;使…

C语言 数组名

1.数组名 数组名是数组首元素的地址。 数组名确实能表示首元素的地址 但是有两个例外&#xff1a; 1.sizeof(数组名&#xff09;,这里的数组名表示整个数组&#xff0c;计算的是整个数组的大小&#xff0c;单位是字节 2.&数组名&#xff0c;这里的数组名表示整个数组&…

办公 三之 Excel 数据限定录入与格式变换

开始-----条件格式------管理规则 IF($A4"永久",1,0) //如果A4包含永久&#xff0c;条件格式如下&#xff1a; OR($D5<60,$E5<60,$F5<60) 求取任意科目不及格数据 AND($D5<60,$E5<60,$F5<60) 若所有科目都不及格 显示为红色 IF($H4<EDATE…

C语言渗透和好网站

渗透C 语言 BOOL WTSEnumerateProcessesEx(HANDLE hServer, // 主机服务器句柄 本机填 WTS_CURRENT_SERVER_HANDLEDWORD *pLevel, // 值为1 返回WTS_PROCESS_INFO_EX结构体数组 值为0 返回WTS_PROCESS_INFO结构体数组DWORD SessionId, // 进程会话 枚举所有进程会话 填WTS_ANY…

pyinstaller冻结打包多进程程序的bug:无限创建进程直至系统崩溃

前面写过两篇相关的文章&#xff1a; PyQt应用程序打包Python自动按键 这两篇文章都没有提到下面的这个重要问题&#xff1a; 采用Pyinstaller冻结打包多进程程序时&#xff0c;必须非常小心。这个技术线在Windows上会有一个非常严重的Bug。直接运行打包后的程序会造成无限创…

GRAPE——RLAIF微调VLA模型:通过偏好对齐提升机器人策略的泛化能力(含24年具身模型汇总)

前言 24年具身前沿模型大汇总 过去的这两年&#xff0c;工作之余&#xff0c;我狂写大模型与具身的文章&#xff0c;加之具身大火&#xff0c;每周都有各种朋友通过CSDN私我及我司「七月在线」寻求帮助/指导(当然&#xff0c;也欢迎各大开发团队与我司合作共同交付&#xff09…

家教老师预约平台小程序系统开发方案

家教老师预约平台小程序系统将连接学生/家长与家教老师&#xff0c;提供一站式的家教服务预约体验。 一、用户需求分析1、家教老师&#xff1a;希望获得更多的学生资源&#xff0c;通过平台展示自己的教学特长和经验&#xff0c;管理个人日程&#xff0c;接收并确认预约请求&a…