消息队列10:为RabbitMq添加连接池

环境:

  • win11
  • rabbitmq-3.8.17
  • .net 6.0
  • RabbitMQ.Client 6.8.1
  • vs2022

安装RabbitMq环境参照:

  • window下安装rabbitmq
  • linux下安装rabbitmq

问题:rabbitmq的c#客户端没有自带连接池,所以需要手动实现。

简易实现如下:

using RabbitMQ.Client;
using System.Collections.Concurrent;
using System.Text;

//测试调用
var channel = await ChannelPool.Default.GetChannelAsync("guid", () =>
{
    var factory = new ConnectionFactory()
    {
        HostName = "localhost",
        Port = 5672,
        UserName = "test",
        Password = "123456",
        VirtualHost = "/",
    };
    return Task.FromResult(factory.CreateConnection());
});
try
{
    var body = Encoding.UTF8.GetBytes("{\"Name\":\"tom\"}");
    channel.RawChannel.BasicPublish(exchange: "", routingKey: "test-queue", body: body);
}
finally
{
    channel.Return();
}


#region 连接池
/// <summary>
/// rabbitmq 本身没有提供链接池, 且 IModel 的建立和释放也需要发送请求, 所以建立 connection 轮训机制和 IModel 的缓冲池机制<br/>
/// 参考: <seealso href="https://www.rabbitmq.com/client-libraries/dotnet-api-guide#connection-and-channel-lifespan"/>
/// </summary>
public class ChannelPool
{
    public static ChannelPool Default = new(8, 50);
    private int connectionCount;
    private int channelCountPerConnection;
    public ChannelPool(int connectionCount = 1, int channelCountPerConnection = 5)
    {
        if (connectionCount > 0) this.connectionCount = connectionCount;
        if (channelCountPerConnection > 0) this.channelCountPerConnection = channelCountPerConnection;
    }
    public class ChannelItem
    {
        public int ConnectionIndex { get; set; }
        public HostItem CacheHost { get; set; }
        public IModel RawChannel { get; set; }
        public void Return() => CacheHost.ChannelPools[ConnectionIndex].Return(this);
    }
    public class HostItem
    {
        public SemaphoreSlim HostLocker { get; set; }
        public List<IConnection> Connections { get; set; }
        public int CurrentConnectionIndex { get; set; }
        public List<SemaphoreSlim> ConnectionLockers { get; set; }
        public List<EasyPool<ChannelItem>> ChannelPools { get; set; }
    }
    #region EasyPool
    public sealed class EasyPool<T> : IDisposable where T : class
    {
        private readonly ConcurrentBag<T> _pool;
        private readonly Func<T> _factory;
        private readonly int _maxCount;
        public EasyPool(Func<T> factory, int maxCount)
        {
            _factory = factory;
            _maxCount = maxCount;
            _pool = new ConcurrentBag<T>();
        }
        public T Get()
        {
            if (!_pool.TryTake(out var result)) return _factory();
            return result;
        }
        public bool Return(T item)
        {
            if (_pool.Count >= _maxCount)
            {
                if (item is IDisposable disposable) try { disposable.Dispose(); } catch { }
                return false;
            }
            _pool.Add(item);
            return true;
        }
        public void Dispose()
        {
            T result;
            while (_pool.TryTake(out result))
            {
                if (result is IDisposable disposable)
                {
                    try { disposable.Dispose(); } catch { }
                }
            }
        }
    }
    #endregion
    private readonly Dictionary<string, HostItem> _cacheHosts = new();
    public async Task<ChannelItem> GetChannelAsync(string key, Func<Task<IConnection>> connectionFactoty)
    {
        var connectionCount = this.connectionCount;
        var maxChannelCountPerConnection = this.channelCountPerConnection;
        //获取 HostItem
        if (!_cacheHosts.TryGetValue(key, out var cacheHost))
        {
            lock (_cacheHosts)
            {
                if (!_cacheHosts.TryGetValue(key, out cacheHost))
                {
                    cacheHost = new HostItem
                    {
                        HostLocker = new(1, 1),
                        CurrentConnectionIndex = -1,
                        Connections = new List<IConnection>(connectionCount),
                        ConnectionLockers = new List<SemaphoreSlim>(connectionCount),
                        ChannelPools = new List<EasyPool<ChannelItem>>(connectionCount),
                    };
                    for (int i = 0; i < connectionCount; i++)
                    {
                        cacheHost.Connections.Add(null);
                        cacheHost.ConnectionLockers.Add(new(1, 1));
                        var idx = i;
                        cacheHost.ChannelPools.Add(new EasyPool<ChannelItem>(() => new ChannelItem
                        {
                            ConnectionIndex = idx,
                            RawChannel = cacheHost.Connections[idx].CreateModel(),
                            CacheHost = cacheHost
                        }, maxChannelCountPerConnection));
                    }
                    _cacheHosts.Add(key, cacheHost);
                }
            }
        }
        //轮训得到连接索引
        await cacheHost.HostLocker.WaitAsync();
        int connectionIdx;
        try
        {
            connectionIdx = ++cacheHost.CurrentConnectionIndex;
            if (connectionIdx >= connectionCount) cacheHost.CurrentConnectionIndex = connectionIdx = connectionIdx % connectionCount;
        }
        finally
        {
            try { cacheHost.HostLocker.Release(); } catch { }
        }
        //检查是否初始化链接
        var conn = cacheHost.Connections[connectionIdx];
        if (conn == null)
        {
            var connectionLocker = cacheHost.ConnectionLockers[connectionIdx];
            await connectionLocker.WaitAsync();
            try
            {
                conn = cacheHost.Connections[connectionIdx];
                if (conn == null)
                {
                    conn = await connectionFactoty();
                    cacheHost.Connections[connectionIdx] = conn;
                }
            }
            finally
            {
                try { connectionLocker.Release(); } catch { }
            }
        }
        //得到 Channel
        return cacheHost.ChannelPools[connectionIdx].Get();
    }
}
#endregion

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/887747.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Android 13源码分析】Activity生命周期之onCreate,onStart,onResume-2

忽然有一天&#xff0c;我想要做一件事&#xff1a;去代码中去验证那些曾经被“灌输”的理论。                                                                                  – 服装…

微服务Sleuth解析部署使用全流程

目录 1、Sleuth链路追踪 1、添加依赖 2、修改日志配置文件 3、测试 2、zipkin可视化界面 1、docker安装 2、添加依赖 3、修改配置文件 4、查看页面 5、ribbon配置 1、Sleuth链路追踪 sleuth是链路追踪框架&#xff0c;用于在微服务架构下开发&#xff0c;各个微服务之…

[水墨:创作周年纪念] 特别篇!

本篇是特别篇&#xff01;&#xff01; 个人主页水墨不写bug // _ooOoo_ // // o8888888o // // 88" . "88 …

GO网络编程(二):客户端与服务端通信【重要】

本节是新知识&#xff0c;偏应用&#xff0c;需要反复练习才能掌握。 目录 1.C/S通信示意图2.服务端通信3.客户端通信4.通信测试5.进阶练习&#xff1a;客户端之间通信 1.C/S通信示意图 客户端与服务端通信的模式也称作C/S模式&#xff0c;流程图如下 其中P是协程调度器。可…

《CUDA编程》5.获得GPU加速的关键

从本章起&#xff0c;将关注CDUA程序的性能&#xff0c;即执行速度 1 用CUDA事件计时 在前几章中&#xff0c;使用的是C的<time.h>库进行程序运行计时&#xff0c;CUDA也提供了一种基于CUDA event的计时方式&#xff0c;用来给一段CUDA代码进行计时&#xff0c;这里只介…

系统架构设计师-下午案例题(2021年下半年)

1.试题一(共25分) 阅读以下关于软件架构设计与评估的叙述在答题纸上回答问题1和问题2。 【说明】某公司拟开发一套机器学习应用开发平台支持用户使用浏览器在线进行基于机器学习的智能应用开发活动。该平台的核心应用场景是用户通过拖拽算法组件灵活定义机器学习流程&#xf…

【含开题报告+文档+PPT+源码】基于SSM + Vue的养老院管理系统【包运行成功】

开题报告 随着社会的发展和经济的进步&#xff0c;人口老龄化问题逐渐凸显。统计数据显示&#xff0c;全球范围内的老龄人口比例正在逐年上升&#xff0c;养老需求也随之增长。养老院作为提供专业养老服务的机构&#xff0c;承担着照料老人、提供医疗保健和社交活动等责任。传…

什么是pip? -- Python 包管理工具

前言 不同的编程语言通常都有自己的包管理工具&#xff0c;这些工具旨在简化项目的依赖管理、构建过程和开发效率&#xff0c;同时促进代码的复用和共享。每个包管理工具都有其独特的特点和优势&#xff0c;开发者可以根据自己的编程语言和项目需求选择合适的包管理工具。 pip是…

车辆重识别(2021ICML改进的去噪扩散概率模型)论文阅读2024/9/29

所谓改进的去噪扩散概率模型主要改进在哪些方面&#xff1a; ①对数似然值的改进 通过对噪声的那个方差和T进行调参&#xff0c;来实现改进。 ②学习 这个参数也就是后验概率的方差。通过数据分析&#xff0c;发现在T非常大的情况下对样本质量几乎没有影响&#xff0c;也就是说…

TIM的PWM模式

定时器的工作流程: 定时器对时钟传来的脉冲次数计数&#xff0c;并且在次数到达范围值时触发中断。如向下计数模式时为0&#xff0c;向上计数为达到自动重装载计时器的值时触发中断。 四个输出比较单元 更改占空比的函数 STM32里面的定时器有多个定时器。 如TIM1、TIM2…

k8s 之安装metrics-server

作者&#xff1a;程序那点事儿 日期&#xff1a;2024/01/29 18:25 metrics-server可帮助我们查看pod的cpu和内存占用情况 kubectl top po nginx-deploy-56696fbb5-mzsgg # 报错&#xff0c;需要Metrics API 下载 Metrics 解决 wget https://github.com/kubernetes-sigs/metri…

nginx 负载均衡1

遇到的问题 大型网站都要面对庞大的用户量&#xff0c;高并发&#xff0c;海量数据等挑战。为了提升系统整体的性能&#xff0c;可以采用垂直扩展和水平扩展两种方式。 垂直扩展&#xff1a;在网站发展早期&#xff0c;可以从单机的角度通过增加硬件处理能力&#xff0c;比如 C…

LeetCode讲解篇之239. 滑动窗口最大值

文章目录 题目描述题解思路题解代码题目链接 题目描述 题解思路 我们维护一个长度为k的窗口&#xff0c;然后窗口从数组最左边一直移动到最右边&#xff0c;记录过程中窗口中的最大值&#xff0c;就是答案 我们每次查询长度为k的窗口最大值是什么时间复杂度是O(k)的&#xff0…

黑神话:仙童,数据库自动反射魔法棒

黑神话&#xff1a;仙童&#xff0c;数据库自动反射魔法棒 Golang 通用代码生成器仙童发布了最新版本电音仙女尝鲜版十一及其介绍视频&#xff0c;视频请见&#xff1a;https://www.bilibili.com/video/BV1ET4wecEBk/ 此视频介绍了使用最新版的仙童代码生成器&#xff0c;将 …

使用 Python 遍历文件夹

要解决这个问题&#xff0c;使用 Python 的标准库可以很好地完成。我们要做的是遍历目录树&#xff0c;找到所有的 text 文件&#xff0c;读取内容&#xff0c;处理空行和空格&#xff0c;并将处理后的内容合并到一个新的文件中。 整体思路&#xff1a; 遍历子目录&#xff1…

计算机毕业设计 基于Hadoop的智慧校园数据共享平台的设计与实现 Python 数据分析 可视化大屏 附源码 文档

&#x1f34a;作者&#xff1a;计算机编程-吉哥 &#x1f34a;简介&#xff1a;专业从事JavaWeb程序开发&#xff0c;微信小程序开发&#xff0c;定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事&#xff0c;生活就是快乐的。 &#x1f34a;心愿&#xff1a;点…

国外电商系统开发-运维系统拓扑布局

点击列表中设备字段&#xff0c;然后定位到【拓扑布局】中&#xff0c;可以看到拓扑发生了变化 再回头&#xff0c;您再次添加一个服务器到系统中&#xff0c;并且选择该服务器的连接节点为您刚才创建的“SDN路由器”&#xff0c;保存后&#xff0c;您可以看到这个服务器连接着…

RabbbitMQ篇(环境搭建 - 下载 安装)(持续更新迭代)

目录 一、Windows 1. 下载安装程序 2. 安装配置erlang 3. 安装rabbitMQ 4. 验证 二、Linux 1. 下载rpm包 1.1. 下载Erlang的rpm包 1.2. 下载socat的rpm包 1.3. 下载RabbitMQ的rpm包 2. 安装 2.1. 安装Erlang 2.2. 安装socat 2.3. 安装RabbitMQ 3. 启动RabbitMQ服…

小程序原生-利用setData()对不同类型的数据进行增删改

1. 声明和绑定数据 wxml文件 <view> {{school}} </view> <view>{{obj.name}}</view> <view id"{{id}}" > 绑定属性值 </view> <checkbox checked"{{isChecked}}"/> <!--算数运算--> <view>{{ id …