发送失败的RocktMQ消息,你遇到过吗?

背景

需要通过flink同时向测试和线上的RocketMQ中写入数据

现象

在程序中分别创建了两个MqProducer,设置了不同的nameServerAddr,分别调用不同的producer向不同环境发消息,返回发送成功,但是在线上MQ中却查不到数据,测试环境是有的。
代码如下:

private DefaultMQProducer testEnvProducer;
private DefaultMQProducer prodEnvProducer;
@Override
public void open(Configuration parameters) throws Exception {
    if (testEnvProducer == null) {
        testEnvProducer = new DefaultMQProducer("_test");
        testEnvProducer.setNamesrvAddr(SINK_ADDRESS);
        testEnvProducer.start();
    }
    if (prodEnvProducer == null) {
        prodEnvProducer = new DefaultMQProducer("_prod");
        prodEnvProducer.setNamesrvAddr(SOURCE_ADDRESS);
        prodEnvProducer.start();
    }
}

在这里插入图片描述

解决过程及方案

由于不了解flink的运行机制,尝试将发送MQ的逻辑拆分为两个sink,无济于事,在中间遇到了创建DefaultMQProducer时设置的是同一个group,理论上是不同的环境不会有问题,prodProducer在start时却报该group的实例已经创建,当时就有点怀疑是不是两个producer是同一个。后又通过在消息体中增加profile明确区分开线上和测试的数据,发现应该发送到线上的数据却发送到了测试环境,此时断定是两个producer为同一个实例。
查看RocketMQ Client源码发现了factory这个参数
image.png
在这里插入图片描述

那问题大概率就是这个工厂导致的,工厂内做了缓存,让我们来看一看
image.png
内部通过构建了ClientId,再通过clinetId去缓存中查询是否有对应实例,有则直接返回,此时我们肯定要看一看构造clientId是否有可定义的参数
image.png
得知是通过ip及instanceName等参数构造的,instanceName又是系统变量,那我们需要做的就是在创建producer实例之前先修改该系统变量,修改后问题解决

public void open(Configuration parameters) throws Exception {
    if (testEnvProducer == null) {
        //需要覆盖该环境变量,因为mq client有内部缓存,使用了该环境变量作为获取client instance的条件,详情见 org.apache.rocketmq.client.ClientConfig#buildMQClientId
        System.setProperty("rocketmq.client.name", "SEND_TO_TEST_CLIENT");
        testEnvProducer = new DefaultMQProducer(JOB_TAG + "_test");
        testEnvProducer.setNamesrvAddr(SINK_ADDRESS);
        testEnvProducer.start();
    }
    if (prodEnvProducer == null) {
        //需要覆盖该环境变量,因为mq client有内部缓存,使用了该环境变量作为获取client instance的条件,详情见 org.apache.rocketmq.client.ClientConfig#buildMQClientId
        System.setProperty("rocketmq.client.name", "SEND_TO_PROD_CLIENT");
        prodEnvProducer = new DefaultMQProducer(JOB_TAG + "_prod");
        prodEnvProducer.setNamesrvAddr(SOURCE_ADDRESS);
        prodEnvProducer.start();
    }
}

大家在实际开发中如果有这种场景的话也要注意哦!
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/135178.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

分布式理论基础:CAP定理

什么是CAP CAP原则又称CAP定理,指的是在一个分布式系统中,Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性)这三个基本需求,最多只能同时…

【学习辅助】Axure手机时间管理APP原型,告别手机控高保真模板

作品概况 页面数量:共 30 页 兼容软件:Axure RP 9/10,不支持低版本 应用领域:时间管理、系统工具 作品申明:页面内容仅用于功能演示,无实际功能 作品特色 本品为「手机时间管理」APP原型,…

RK3568平台 在alsa框架中添加音频功放芯片

一.alsa框架概述 ALSA,全称是Advanced Linux Sound Architecture,是Linux中提供声音设备驱动的内核组件,应用可以通过ALSA接口实现音频播放、录音、设备通路控制、音量控制、通话等功能。 在 Linux 内核设备驱动层,ALSA 提供了 …

焕新古文化传承之路,AI为古彝文识别赋能

目录 1 古彝文与古典保护 2 古文识别的挑战 2.1 西文与汉文OCR 2.2 古彝文识别难点 3 合合信息:古彝文保护新思路 3.1 图像矫正 3.2 图像增强 3.3 语义理解 3.4 工程技巧 4 总结 1 古彝文与古典保护 彝文指的是云南、贵州、四川等地的彝族人使用的文字&am…

AI小镇Generative Agents: Interactive Simulacra of Human Behavior

文章目录 1 Introduction2 Related Works2.1 Human-AI Interaction2.2 Belivable Proxies for Human Behavior2.3 Large Language Model and Human Behavior 3 Generative agent behavior and interaction(行为与交互)3.1 Agent Avatar and Communicatio…

Leetcode—2471.逐层排序二叉树所需的最少操作数目【中等】(置换环解法!)

2023每日刷题(二十七) Leetcode—2471.逐层排序二叉树所需的最少操作数目 置换环解题思想 参考自网络 总交换次数 每一层最小交换次数之和 每一层元素个数 - 置换环数 实现代码 /*** Definition for a binary tree node.* struct TreeNode {* …

照片放大软件 Topaz Gigapixel AI mac中文版简介

Topaz Gigapixel AI mac是一款使用人工智能功能扩展图像的桌面应用程序,同时添加自然细节以获得惊人的效果。使用深度学习技术,A.I.Gigapixel™可以放大图像并填写其他调整大小的产品遗漏的细节,使用A.I.Gigapixel™,您可以裁剪照…

【LeetCode刷题-二分查找】--658.找到K个最接近的元素

658.找到K个最接近的元素 方法一:二分查找双指针 假设数组长度为n,数组arr已经按照升序排序,可以将数组arr分为两部分,前一部分所有元素[0,left]都小于x,后一部分[right,n-1]都大于等于x,left与right都可以…

【JS】判断字符串是否为 url 的方法

文章目录 用法解析 用法解析 当你传递一个字符串给 URL 构造函数时: 如果字符串是一个有效的 URL,它将返回一个新的 URL 对象。否则,它将返回一个错误。 const url new URL("https://www.baidu.com/"); console.log(url);函数封装&#xf…

YOLO目标检测——猫狗目标检测数据集下载分享【含对应voc、coco和yolo三种格式标签】

实际项目应用:宠物识别、猫狗分类数据集说明:猫狗分类检测数据集,真实场景的高质量图片数据,数据场景丰富,含有猫和狗图片标签说明:使用lableimg标注软件标注,标注框质量高,含voc(xm…

安装纯净版Linux后的必备设置

目录 一:网络设置 1,设置yum源 2,配置网络 二:samba服务设置 1,安装samba 2,设置samba 3,windows上挂载 三:安装必备的开发软件 1,GCC安装 2,Pyth…

jdk安装

.概览 1.jdk下载 JDK(Java Development Kit) 是 Java 语言的软件开发工具包(SDK)。 安装JDK后,会在电脑中同时安装:java的运行环境jre 和 开发环境jdk。 安装 JDK时,不建议安装太旧或太新的版本。目前的最新版本是jdk9。目前jdk8比较稳定&am…

【ARM入门】ARM、SOC、ARM授权 概念篇

什么是ARM ARM前身是Acorn公司设计的第一款微处理器,叫ARM:Acorn RISC Machine ARM公司的名字叫ARM:Advanced RISC Machines ARM内核 包括了寄存器组、指令集、总线、存储器映射规则、中断逻辑和调试组件等 内核是有ARM公司设计并以销售方…

从C++软件调试实战的角度去看多线程编程中的若干细节问题

目录 1、线程与线程函数基础知识 1.1、创建线程的函数返回时不代表代码执行到线程函数中了 1.2、创建线程的函数返回后要调用CloseHandle将线程句柄(引用计数)释放掉 1.3、线程何时退出并结束? 2、线程函数的几个细节 3、回调函数运行在…

扭矩传感器信号模拟地、数据地与电源地

在电子电路中,电源地、信号地、数字地和模拟地都是不同的地(ground)节点,它们在电路中有不同的作用。 电源地(Power Ground)是指用于连接电源电源回路的地节点。在大多数电子设备中,电源地通常是…

线性代数理解笔记

一.向量引入: 向量:只由大小和方向决定,不由位置决定。 二.向量加减法 向量的加法是首尾相连,减法是尾尾相连。 而向量v向量w为平行四边形主对角线。 向量v-向量w为平行四边形副对角线。 2.向量内积点乘(内积) 内积…

《数据结构、算法与应用C++语言描述》-队列的应用-工厂仿真

工厂仿真 完整可编译运行代码见:Github::Data-Structures-Algorithms-and-Applications/_19Factory simulation/ 问题描述 一个工厂有m台机器。工厂的每项任务都需要若干道工序才能完成。每台机器都执行一道工序,不同的机器执行不同的工序。一台机器一…

如何让useEffet支持async/await

前言 刚开始学react写过类似下面的代码,就是想直接在useEffect中使用async/await。然后浏览器就会报错如下图: useEffect(async () > {const res await Promise.resolve({ code: 200, mes: });}, [])报错的意思: useEffect 期望接受一…

算法导论笔记5:贪心算法

P216 第15章动态规划 最优子结构 具有它可能意味着适合应用贪心策略 动态规划(Dynamic Programming)算法的核心思想是:将大问题划分为小问题进行解决,从而一步步获取最优解的处理算法。 剪切-粘贴技术证明 每个子问题的解就是它本身的最优解(利用反证法&#xff0…