背景
需要通过flink同时向测试和线上的RocketMQ中写入数据
现象
在程序中分别创建了两个MqProducer,设置了不同的nameServerAddr,分别调用不同的producer向不同环境发消息,返回发送成功,但是在线上MQ中却查不到数据,测试环境是有的。
代码如下:
private DefaultMQProducer testEnvProducer;
private DefaultMQProducer prodEnvProducer;
@Override
public void open(Configuration parameters) throws Exception {
if (testEnvProducer == null) {
testEnvProducer = new DefaultMQProducer("_test");
testEnvProducer.setNamesrvAddr(SINK_ADDRESS);
testEnvProducer.start();
}
if (prodEnvProducer == null) {
prodEnvProducer = new DefaultMQProducer("_prod");
prodEnvProducer.setNamesrvAddr(SOURCE_ADDRESS);
prodEnvProducer.start();
}
}
解决过程及方案
由于不了解flink的运行机制,尝试将发送MQ的逻辑拆分为两个sink,无济于事,在中间遇到了创建DefaultMQProducer时设置的是同一个group,理论上是不同的环境不会有问题,prodProducer在start时却报该group的实例已经创建,当时就有点怀疑是不是两个producer是同一个。后又通过在消息体中增加profile明确区分开线上和测试的数据,发现应该发送到线上的数据却发送到了测试环境,此时断定是两个producer为同一个实例。
查看RocketMQ Client源码发现了factory这个参数
那问题大概率就是这个工厂导致的,工厂内做了缓存,让我们来看一看
内部通过构建了ClientId,再通过clinetId去缓存中查询是否有对应实例,有则直接返回,此时我们肯定要看一看构造clientId是否有可定义的参数
得知是通过ip及instanceName等参数构造的,instanceName又是系统变量,那我们需要做的就是在创建producer实例之前先修改该系统变量,修改后问题解决
public void open(Configuration parameters) throws Exception {
if (testEnvProducer == null) {
//需要覆盖该环境变量,因为mq client有内部缓存,使用了该环境变量作为获取client instance的条件,详情见 org.apache.rocketmq.client.ClientConfig#buildMQClientId
System.setProperty("rocketmq.client.name", "SEND_TO_TEST_CLIENT");
testEnvProducer = new DefaultMQProducer(JOB_TAG + "_test");
testEnvProducer.setNamesrvAddr(SINK_ADDRESS);
testEnvProducer.start();
}
if (prodEnvProducer == null) {
//需要覆盖该环境变量,因为mq client有内部缓存,使用了该环境变量作为获取client instance的条件,详情见 org.apache.rocketmq.client.ClientConfig#buildMQClientId
System.setProperty("rocketmq.client.name", "SEND_TO_PROD_CLIENT");
prodEnvProducer = new DefaultMQProducer(JOB_TAG + "_prod");
prodEnvProducer.setNamesrvAddr(SOURCE_ADDRESS);
prodEnvProducer.start();
}
}
大家在实际开发中如果有这种场景的话也要注意哦!