【问题系列】消费者与MQ连接断开问题解决方案(一)

1. 问题描述

当使用RabbitMQ作为中间件,而消费者为服务时,可能会出现以下情况:在长时间没有消息传递后,消费者与RabbitMQ之间出现连接断开,导致无法处理新消息。解决这一问题的方法是重启Python消费者服务,之后连接恢复正常。

2. 解决步骤

为了排查和处理这个问题,可以采取以下步骤:

  1. 连接设置审查:
  2. 网络状况检查:
  3. 消费者代码审查:
  4. RabbitMQ服务器检查:
  5. 监控和报警设置:
  6. 版本兼容性:

2.1 连接设置审查

  • 心跳超时: RabbitMQ 默认有一个心跳机制,如果在一段时间内没有收到消费者的心跳,就会关闭连接。确保你的连接设置中心跳时间合理,避免被误判为不活跃而关闭连接。
  • 连接超时: 检查连接参数中的超时时间,确保它足够长,以防止在长时间没有消息的情况下断开连接。

1. 心跳设置示例:

import pika

# RabbitMQ 服务器地址
rabbitmq_host = 'localhost'

# RabbitMQ 服务器端口
rabbitmq_port = 5672

# RabbitMQ 虚拟主机
rabbitmq_virtual_host = '/'

# RabbitMQ 用户名和密码
rabbitmq_credentials = pika.PlainCredentials(username='guest', password='guest')

# 创建连接参数
connection_params = pika.ConnectionParameters(
    host=rabbitmq_host,
    port=rabbitmq_port,
    virtual_host=rabbitmq_virtual_host,
    credentials=rabbitmq_credentials,
    heartbeat=600,  # 设置心跳时间,以秒为单位
)

# 创建连接
connection = pika.BlockingConnection(connection_params)

# 创建通道
channel = connection.channel()

# 在这里添加你的消费者逻辑
# ...

# 关闭连接
connection.close()

 2. 连接超时示例

import pika

# RabbitMQ 服务器地址
rabbitmq_host = 'localhost'

# RabbitMQ 服务器端口
rabbitmq_port = 5672

# RabbitMQ 虚拟主机
rabbitmq_virtual_host = '/'

# RabbitMQ 用户名和密码
rabbitmq_credentials = pika.PlainCredentials(username='guest', password='guest')

# 设置连接超时时间,以秒为单位
connection_timeout = 10

# 创建连接参数
connection_params = pika.ConnectionParameters(
    host=rabbitmq_host,
    port=rabbitmq_port,
    virtual_host=rabbitmq_virtual_host,
    credentials=rabbitmq_credentials,
    connection_attempts=3,  # 设置尝试连接的次数
    retry_delay=5,  # 设置重试连接的延迟时间,以秒为单位
    socket_timeout=connection_timeout,
)

# 创建连接
connection = pika.BlockingConnection(connection_params)

# 创建通道
channel = connection.channel()

# 在这里添加你的消费者逻辑
# ...

# 关闭连接
connection.close()

在上面的示例中,socket_timeout 参数被设置为 connection_timeout,表示连接超时时间。可以根据实际需求将这个值调整为你认为合适的数值。此外,还设置了 connection_attemptsretry_delay 参数,分别表示尝试连接的次数和重试连接的延迟时间。

根据具体情况修改连接参数,确保连接超时设置符合你的预期。连接超时时间要足够长以确保在网络不稳定或服务器繁忙时仍能够成功建立连接。

2.2  网络状况检查

  • 确保RabbitMQ服务端口在防火墙中是开放的,不会阻止连接。
  • 检查网络稳定性,排除因网络不稳定导致的连接问题。

检查和设置防火墙规则,假设 RabbitMQ 默认使用的是5672端口:

1. 查看已有防火墙规则

sudo iptables -L

这将列出当前的防火墙规则。确保有关 RabbitMQ 端口(默认是5672)的规则没有被阻止。

2. 开放 RabbitMQ 端口

sudo iptables -A INPUT -p tcp --dport 5672 -j ACCEPT

2.3 消费者代码审查

  • 确保消费者代码中有健壮的异常处理机制,防止异常导致连接中断。
  • 添加自动重连机制,确保连接断开后能够重新建立连接。

在消费者代码中加入自动重连机制可以提高系统的稳定性。

异常处理和自动重连机制:
import pika
import time

def consume_callback(ch, method, properties, body):
    try:
        # 在这里添加你的消息处理逻辑
        print("Received message:", body.decode('utf-8'))
    except Exception as e:
        # 捕获并处理任何可能的异常
        print(f"Error processing message: {str(e)}")

def connect_rabbitmq():
    # 创建连接参数
    connection_params = pika.ConnectionParameters(
        host=rabbitmq_host,
        port=rabbitmq_port,
        virtual_host=rabbitmq_virtual_host,
        credentials=rabbitmq_credentials,
    )

    while True:
        try:
            # 创建连接
            connection = pika.BlockingConnection(connection_params)

            # 创建通道
            channel = connection.channel()

            # 声明队列
            channel.queue_declare(queue='your_queue_name', durable=True)

            # 设置消费者回调函数
            channel.basic_consume(queue='your_queue_name', on_message_callback=consume_callback, auto_ack=True)

            # 开始消费消息
            print('Waiting for messages. To exit press CTRL+C')
            channel.start_consuming()

        except Exception as e:
            # 捕获连接过程中的异常
            print(f"Error connecting to RabbitMQ: {str(e)}")
            print("Retrying in 5 seconds...")
            time.sleep(5)

        finally:
            # 在最终块中确保关闭连接
            if connection and connection.is_open:
                connection.close()

# RabbitMQ 服务器地址
rabbitmq_host = 'localhost'

# RabbitMQ 服务器端口
rabbitmq_port = 5672

# RabbitMQ 虚拟主机
rabbitmq_virtual_host = '/'

# RabbitMQ 用户名和密码
rabbitmq_credentials = pika.PlainCredentials(username='guest', password='guest')

if __name__ == "__main__":
    connect_rabbitmq()

综合采取以上策略,可以大大提高消费者与消息队列连接的稳定性,确保系统能够正常处理消息并做出相应的响应。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/198075.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

redis运维(二十二)redis 的扩展应用 lua(四)

一 最佳实践 ① 铺垫 最佳实践:1、把redis操作所需的key通过KEYS进行参数传递2、其它的lua脚本所需的参数通过ARGV进行传递. redis lua脚本原理 Redis Lua脚本的执行原理 ② 删除指定的脚本缓存 ③ redis集群模式下使用lua脚本注意事项 1、常见报错现象 C…

草图大师sketchup道路怎么快速种树?

草图大师sketchup道路怎么快速种树?草图大师中的道路图纸想要在道路两旁种树,该怎么快速给道路种树呢?下面我们就来看看详细的教程,需要的朋友可以参考下 草图大师sketchup中想要快速种树,该怎么种多棵树呢&#xff1…

别太担心,人类只是把一小部分理性和感性放到了AI里

尽管人工智能(AI)在许多方面已经取得了重大进展,但它仍然无法完全复制人类的理性和感性。AI目前主要侧重于处理逻辑和分析任务,而人类则具有更复杂的思维能力和情感经验。 人类已经成功地将一些可以数据化和程序化的理性和感性特征…

JavaEE进阶学习:Bean 作用域和生命周期

1.Bean 作用域 .通过一个案例来看 Bean 作用域的问题 假设现在有一个公共的 Bean,提供给 A 用户和 B 用户使用,然而在使用的途中 A 用户却“悄悄”地修改了公共 Bean 的数据,导致 B 用户在使用时发生了预期之外的逻辑错误。 我们预期的结果…

leaflet对线设置渐变色

效果展示: 引用leaflet-polycolor组件 npm install leaflet-polycolor .vue文件中使用 import leafletPolycolor from leaflet-polycolor; leafletPolycolor(L); const latLngs [[37.03, 111.92], [37.53444, 111.98555], [36.88, 112.12], [37.53444, 112.24], […

Redis深入理解-主从架构下内核数据结构、主从同步以及主节点选举

Redis 主从挂载后的内核数据结构分析 主节点中,会通过 clusteNode 中的 slaves 来记录该主节点包含了哪些从节点,这个 slaves 是一个指向 *clusterNode[] 数组的数据结构从节点中,会通过 clusterNode 中的 slaveof 来记录该从节点属于哪个主…

04_Flutter自定义Slider滑块

04_Flutter自定义Slider滑块 一.Slider控件基本用法 Column(mainAxisAlignment: MainAxisAlignment.start,children: <Widget>[Text("sliderValue: ${_sliderValue.toInt()}"),Slider(value: _sliderValue,min: 0,max: 100,divisions: 10,thumbColor: Colors.…

《微信小程序开发从入门到实战》学习三十四

4.2 云开发JSON数据库 MySQL、Oracle之类的“关系型数据库”。JSON数据库是“非关系型数据库”&#xff0c;没有行表列的概念。 4.2.1 JSON数据库基本概念 集合:一个数据库有多个集合&#xff0c;一个集合存储通常是同一类数据&#xff0c;可看作为JSON数组&#xff0c;数组…

webpack具体实现--未完

1、前端模块打包工具webpack webpack 是 Webpack 的核心模块&#xff0c;webpack-cli 是 Webpack 的 CLI 程序&#xff0c;用来在命令行中调用 Webpack。webpack-cli 所提供的 CLI 程序就会出现在 node_modules/.bin 目录当中&#xff0c;我们可以通过 npx 快速找到 CLI 并运行…

Chrome 访问不了项目?10080端口 ERR_UNSAFE_PORT:问题原因 / 解决方案

文章目录 被禁用端口列表解决方法方法一、更换端口 / 使用代理 / 使用域名方法二、对浏览器下手WindowsMac 最近有客户反馈&#xff0c;在chrome浏览器中访问不了项目&#xff0c;其他浏览器都是正常的。 &#xff1f;奇了怪了&#xff0c;难道客户对chrome做了什么操作&#x…

Asp.net core WebApi 配置自定义swaggerUI和中文注释

1.创建asp.net core webApi项目 默认会引入swagger的Nuget包 <PackageReference Include"Swashbuckle.AspNetCore" Version"6.2.3" />2.配置基本信息和中文注释&#xff08;默认是没有中文注释的&#xff09; 2.1创建一个新的controller using Micr…

直接插入排序和希尔排序

前言 我们前面几期介绍了线性和非线性的基本数据结构。例如顺序表、链表、栈和队列、二叉树等~&#xff01;本期和接下来的几期我们来详解介绍各个排序的概念、实现以及性能分析&#xff01; 本期内容 排序的概念以及其运用 常见的排序算法 直接插入排序 希尔排序 一、排序的…

Leetcode算法系列| 3. 无重复字符的最长子串

目录 1.题目2.题解C# 解法一&#xff1a;滑动窗口算法C# 解法二&#xff1a;索引寻找Java 解法一&#xff1a;滑动窗口算法Java 解法二&#xff1a;遍历字符串 1.题目 给定一个字符串 s &#xff0c;请你找出其中不含有重复字符的 最长子串 的长度。 示例1: 输入: s "ab…

力扣141-环形链表

文章目录 力扣141-环形链表示例代码实现要点剖析 力扣141-环形链表 给你一个链表的头节点 head &#xff0c;判断链表中是否有环。 如果链表中有某个节点&#xff0c;可以通过连续跟踪 next 指针再次到达&#xff0c;则链表中存在环。 为了表示给定链表中的环&#xff0c;评测…

源码剖析 Spring Security 的实现原理

Spring Security 是一个轻量级的安全框架&#xff0c;可以和 Spring 项目很好地集成&#xff0c;提供了丰富的身份认证和授权相关的功能&#xff0c;而且还能防止一些常见的网络攻击。我在工作中有很多项目都使用了 Spring Security 框架&#xff0c;但基本上都是浅尝辄止&…

C语言——输入两个正整数 m 和 n。求其最大公约数和最小公倍数。

#define _CRT_SECURE_NO_WARNINGS 1#include<stdio.h> int main() {int m, n;int i;int x 1;int y 0;printf("请输入两个正整数m和n&#xff1a;\n");scanf("%d,%d", &m, &n);for (i 1; i < m && i < n; i) {if (m % i 0 …

【doccano】文本标注工具——安装运行教程

笔记为自我总结整理的学习笔记&#xff0c;若有错误欢迎指出哟~ 【doccano】文本标注工具 doccano简介安装doccano1. 创建并激活虚拟环境2. 安装doccano 运行Doccano访问Doccano doccano简介 doccano是一个开源的文本注释工具。它为文本分类、序列标记和序列到序列任务提供注释…

Axios 并发请求指南 - 3 种简单实用的方法

在实际开发中&#xff0c;我们经常需要同时发送多个请求&#xff0c;并在所有请求完成后进行处理&#xff0c;这就是所谓的并发请求。实现 Axios 并发请求的关键是使用 Axios.all 方法&#xff0c;它接受一个 Promise 的数组作为参数&#xff0c;当这些 Promise 都 resolve 时&…

【C++】杨辉三角详解和C++代码示例

杨辉三角的每行第i个数是由上一行的第i-1个数和第i个数相加得到的&#xff0c;且每行的第一个数和最后一个数都是1&#xff0c;每行的中间个数等于它两肩上的数字相加。 目录 C代码输出结果8行输出15行输出25行输出 C代码 #include <iostream> #include <vector>…

Python Selenium 图片资源自动搜索保存 项目实践

实现访问首页 from os.path import dirnamefrom selenium import webdriverclass ImageAutoSearchAndSave:"""图片自动搜索保存"""def __init__(self):"""初始化"""self.driver webdriver.Chrome(executable_pa…