【项目实践04】【RocketMQ消息收发拦截器】

文章目录

  • 一、前言
  • 二、项目背景
  • 三、实现方案
    • 1. 关键接口
    • 2. 消息发送方
    • 3. 消息消费方
    • 4. 配置引入类
    • 5. 使用示例
  • 四、思路扩展
    • 1. 消费流程简介


一、前言

本系列用来记录一些在实际项目中的小东西,并记录在过程中想到一些小东西,因为是随笔记录,所以内容不会过于详细。

二、项目背景

原本的目的是想实现 MQ 消息之间的 TraceId 追踪。如下两个拦截器可以实现 MQ 消息之间 TraceId 的传递,不过项目后面转用 TLog 。(但是写都写了,不能浪费

三、实现方案

1. 关键接口

  1. 消息发送方拦截器,当RocketMQ 消息发送的时候会调用对应的方法进行拦截,实现该接口可进行消息发送前后的扩展。

    public interface MqSendInterceptor extends Ordered, Comparable<MqSendInterceptor> {
        /**
         * 消息发送前处理
         *
         * @param message
         */
        default void preHandle(Message message) {
        }
    
        /**
         * 消息发送后处理
         *
         * @param destination
         * @param payload
         * @param e
         */
        default void afterHandle(String destination, org.springframework.messaging.Message<?> payload, Exception e) {
        }
    
    
        /**
         * 拦截器优先级
         *
         * @return
         */
        @Override
        default int getOrder() {
            return Integer.MIN_VALUE;
        }
    
        /**
         * 拦截器排序
         *
         * @param o
         * @return
         */
        @Override
        default int compareTo(MqSendInterceptor o) {
            return getOrder() - o.getOrder();
        }
    }
    
  2. 消息消费方拦截器,当RocketMQ 消息接收消费的时候会调用对应的方法进行拦截,实现该接口可进行消息消费前后的扩展。

    public interface MqReceiveInterceptor extends Ordered, Comparable<MqReceiveInterceptor> {
    
        /**
         * 消息预处理
         *
         * @param conusmerGroup
         * @param messageExt
         * @return
         */
        void preHandle(String conusmerGroup, MessageExt messageExt);
    
        /**
         * 消息后处理
         *
         * @param consumerGroup
         * @param messageExt
         */
        void afterHandle(String consumerGroup, MessageExt messageExt, Exception exception);
    
        /**
         * 拦截器优先级
         *
         * @return
         */
        @Override
        default int getOrder() {
            return Integer.MIN_VALUE;
        }
    
        /**
         * 拦截器排序
         *
         * @param o
         * @return
         */
        @Override
        default int compareTo(MqReceiveInterceptor o) {
            return getOrder() - o.getOrder();
        }
    }
    

2. 消息发送方

消息发送的扩展逻辑实现其实很简单,直接继承的了基本的 RocketMQTemplate ,对一些方法进行了重写。重写的逻辑是在消息发送前后会调用 MqSendInterceptor 的对应方法即可。具体代码如下

@Slf4j
public class RocketMQPlusTemplate extends RocketMQTemplate implements InitializingBean {

    @Autowired(required = false)
    private List<MqSendInterceptor> mqSendInterceptors;


    @Override
    public void afterPropertiesSet() throws Exception {
        if (mqSendInterceptors == null) {
            mqSendInterceptors = Lists.newArrayListWithCapacity(0);
        }
        Collections.sort(mqSendInterceptors);
        super.afterPropertiesSet();
    }

    @Override
    public <T> T sendAndReceive(String destination, Message<?> message, Type type,
                                String hashKey, long timeout, int delayLevel) {
        return handleMsgSupplier(() -> {
            if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
                log.error("send request message failed. destination:{}, message is null ", destination);
                throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
            }
            try {
                org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
                if (delayLevel > 0) {
                    rocketMsg.setDelayTimeLevel(delayLevel);
                }
                MessageExt replyMessage;

                if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
                    replyMessage = (MessageExt) getProducer().request(rocketMsg, timeout);
                } else {
                    replyMessage = (MessageExt) getProducer().request(rocketMsg, getMessageQueueSelector(), hashKey, timeout);
                }
                return replyMessage != null ? (T) doConvertMessage(replyMessage, type) : null;
            } catch (Exception e) {
                log.error("send request message failed. destination:{}, message:{} ", destination, message);
                throw new MessagingException(e.getMessage(), e);
            }
        }, destination, message);
    }

    @Override
    public void sendAndReceive(String destination, Message<?> message,
                               RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey, long t, int delayLevel) {

        handleMsgRunnable(() -> {
            if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
                log.error("send request message failed. destination:{}, message is null ", destination);
                throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
            }

            try {
                long timeout = t;
                org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
                if (delayLevel > 0) {
                    rocketMsg.setDelayTimeLevel(delayLevel);
                }
                if (timeout <= 0) {
                    timeout = getProducer().getSendMsgTimeout();
                }
                RequestCallback requestCallback = null;
                if (rocketMQLocalRequestCallback != null) {
                    requestCallback = new RequestCallback() {
                        @Override
                        public void onSuccess(org.apache.rocketmq.common.message.Message message) {
                            rocketMQLocalRequestCallback.onSuccess(doConvertMessage((MessageExt) message, getMessageType(rocketMQLocalRequestCallback)));
                        }

                        @Override
                        public void onException(Throwable e) {
                            rocketMQLocalRequestCallback.onException(e);
                        }
                    };
                }
                if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
                    getProducer().request(rocketMsg, requestCallback, timeout);
                } else {
                    getProducer().request(rocketMsg, getMessageQueueSelector(), hashKey, requestCallback, timeout);
                }
            } catch (
                    Exception e) {
                log.error("send request message failed. destination:{}, message:{} ", destination, message);
                throw new MessagingException(e.getMessage(), e);
            }
        }, destination, message);

    }

    @Override
    public <T extends Message> SendResult syncSend(String destination, Collection<T> messages, long timeout) {
        return handleMsgsSupplier(() -> {
            if (Objects.isNull(messages) || messages.size() == 0) {
                log.error("syncSend with batch failed. destination:{}, messages is empty ", destination);
                throw new IllegalArgumentException("`messages` can not be empty");
            }

            try {
                long now = System.currentTimeMillis();
                Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
                for (Message msg : messages) {
                    if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {
                        log.warn("Found a message empty in the batch, skip it");
                        continue;
                    }
                    rmqMsgs.add(this.createRocketMqMessage(destination, msg));
                }

                SendResult sendResult = getProducer().send(rmqMsgs, timeout);
                long costTime = System.currentTimeMillis() - now;
                if (log.isDebugEnabled()) {
                    log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
                }
                return sendResult;
            } catch (Exception e) {
                log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());
                throw new MessagingException(e.getMessage(), e);
            }
        }, destination, messages);
    }

    @Override
    public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {
        return handleMsgSupplier(() -> {
            if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
                log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
                throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
            }
            try {
                long now = System.currentTimeMillis();
                org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
                SendResult sendResult = getProducer().send(rocketMsg, getMessageQueueSelector(), hashKey, timeout);
                long costTime = System.currentTimeMillis() - now;
                if (log.isDebugEnabled()) {
                    log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
                }
                return sendResult;
            } catch (Exception e) {
                log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
                throw new MessagingException(e.getMessage(), e);
            }
        }, destination, message);

    }

    @Override
    public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) {

        return handleMsgSupplier(() -> {
            if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
                log.error("syncSend failed. destination:{}, message is null ", destination);
                throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
            }
            try {
                long now = System.currentTimeMillis();
                org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
                if (delayLevel > 0) {
                    rocketMsg.setDelayTimeLevel(delayLevel);
                }
                SendResult sendResult = getProducer().send(rocketMsg, timeout);
                long costTime = System.currentTimeMillis() - now;
                if (log.isDebugEnabled()) {
                    log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
                }
                return sendResult;
            } catch (Exception e) {
                log.error("syncSend failed. destination:{}, message:{} ", destination, message);
                throw new MessagingException(e.getMessage(), e);
            }
        }, destination, message);

    }

    @Override
    public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout,
                          int delayLevel) {
        handleMsgRunnable(() -> {
            if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
                log.error("asyncSend failed. destination:{}, message is null ", destination);
                throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
            }
            try {
                org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
                if (delayLevel > 0) {
                    rocketMsg.setDelayTimeLevel(delayLevel);
                }
                getProducer().send(rocketMsg, sendCallback, timeout);
            } catch (Exception e) {
                log.info("asyncSend failed. destination:{}, message:{} ", destination, message);
                throw new MessagingException(e.getMessage(), e);
            }
        }, destination, message);

    }

    @Override
    public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback,
                                 long timeout) {
        handleMsgRunnable(() -> {
            if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
                log.error("asyncSendOrderly failed. destination:{}, message is null ", destination);
                throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
            }
            try {
                org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
                getProducer().send(rocketMsg, getMessageQueueSelector(), hashKey, sendCallback, timeout);
            } catch (Exception e) {
                log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message);
                throw new MessagingException(e.getMessage(), e);
            }
        }, destination, message);

    }

    @Override
    public void sendOneWay(String destination, Message<?> message) {
        handleMsgRunnable(() -> {

            if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
                log.error("sendOneWay failed. destination:{}, message is null ", destination);
                throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
            }
            try {
                org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
                getProducer().sendOneway(rocketMsg);
            } catch (Exception e) {
                log.error("sendOneWay failed. destination:{}, message:{} ", destination, message);
                throw new MessagingException(e.getMessage(), e);
            }
        }, destination, message);

    }

    @Override
    public void sendOneWayOrderly(String destination, Message<?> message, String hashKey) {
        handleMsgRunnable(() -> {

            if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
                log.error("sendOneWayOrderly failed. destination:{}, message is null ", destination);
                throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
            }
            try {
                org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
                getProducer().sendOneway(rocketMsg, getMessageQueueSelector(), hashKey);
            } catch (Exception e) {
                log.error("sendOneWayOrderly failed. destination:{}, message:{}", destination, message);
                throw new MessagingException(e.getMessage(), e);
            }
        }, destination, message);

    }

    @Override
    public TransactionSendResult sendMessageInTransaction(final String destination,
                                                          final Message<?> message, final Object arg) throws MessagingException {
        return handleMsgSupplier(() -> {
            try {
                if (((TransactionMQProducer) getProducer()).getTransactionListener() == null) {
                    throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener");
                }
                org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
                return getProducer().sendMessageInTransaction(rocketMsg, arg);
            } catch (MQClientException e) {
                throw RocketMQUtil.convert(e);
            }
        }, destination, message);

    }

    protected org.apache.rocketmq.common.message.Message createRocketMqMessage(
            String destination, Message<?> message) {
        Message<?> msg = this.doConvert(message.getPayload(), message.getHeaders(), null);
        final org.apache.rocketmq.common.message.Message rocketMessage =
                RocketMQUtil.convertToRocketMessage(getMessageConverter(), getCharset(),
                        destination, msg);

        preHandle(rocketMessage);
        return rocketMessage;
    }

    private Object doConvertMessage(MessageExt messageExt, Type type) {
        if (Objects.equals(type, MessageExt.class)) {
            return messageExt;
        } else if (Objects.equals(type, byte[].class)) {
            return messageExt.getBody();
        } else {
            String str = new String(messageExt.getBody(), Charset.forName(getCharset()));
            if (Objects.equals(type, String.class)) {
                return str;
            } else {
                // If msgType not string, use objectMapper change it.
                try {
                    if (type instanceof Class) {
                        //if the messageType has not Generic Parameter
                        return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) type);
                    } else {
                        //if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint".
                        //we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter.
                        return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) type).getRawType(), null);
                    }
                } catch (Exception e) {
                    log.error("convert failed. str:{}, msgType:{}", str, type);
                    throw new RuntimeException("cannot convert message to " + type, e);
                }
            }
        }
    }

    private Type getMessageType(RocketMQLocalRequestCallback rocketMQLocalRequestCallback) {
        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(rocketMQLocalRequestCallback);
        Type matchedGenericInterface = null;
        while (Objects.nonNull(targetClass)) {
            Type[] interfaces = targetClass.getGenericInterfaces();
            if (Objects.nonNull(interfaces)) {
                for (Type type : interfaces) {
                    if (type instanceof ParameterizedType && (Objects.equals(((ParameterizedType) type).getRawType(), RocketMQLocalRequestCallback.class))) {
                        matchedGenericInterface = type;
                        break;
                    }
                }
            }
            targetClass = targetClass.getSuperclass();
        }
        if (Objects.isNull(matchedGenericInterface)) {
            return Object.class;
        }

        Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments();
        if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
            return actualTypeArguments[0];
        }
        return Object.class;
    }

    /**
     * 消息
     *
     * @param runnable
     * @param destination
     * @param message
     */
    private void handleMsgRunnable(Runnable runnable, String destination, Message<?> message) {
        Exception exception = null;
        try {
            runnable.run();
        } catch (Exception e) {
            exception = e;
            throw e;
        } finally {
            afterHandle(destination, message, exception);
        }
    }


    /**
     * 消息处理
     *
     * @param supplier
     * @param destination
     * @param message
     * @return
     */
    private <R> R handleMsgSupplier(Supplier<R> supplier, String destination, Message<?> message) {
        Exception exception = null;
        try {
            return supplier.get();
        } catch (Exception e) {
            exception = e;
            throw e;
        } finally {
            afterHandle(destination, message, exception);
        }
    }

    /**
     * 消息
     *
     * @param supplier
     * @param destination
     * @param messages
     * @param <T>
     * @param <P>
     * @return
     */
    private <T, P extends Message<?>> T handleMsgsSupplier(Supplier<T> supplier, String destination, Collection<P> messages) {
        Exception exception = null;
        try {
            return supplier.get();
        } catch (Exception e) {
            exception = e;
            throw e;
        } finally {
            for (Message<?> message : messages) {
                afterHandle(destination, message, exception);
            }
        }
    }


    /**
     * 消息前置处理
     *
     * @param message
     */
    protected void preHandle(org.apache.rocketmq.common.message.Message message) {
        if (!Boolean.TRUE.toString().equalsIgnoreCase(message.getUserProperty(MqContants.NOT_INTERCEPT))) {
            for (MqSendInterceptor mqSendInterceptor : mqSendInterceptors) {
                mqSendInterceptor.preHandle(message);
            }
        }
    }


    /**
     * 消息后置处理
     *
     * @param destination
     * @param message
     * @param exception
     */
    protected void afterHandle(String destination, Message<?> message, Exception exception) {
        if (!Boolean.TRUE.equals(message.getHeaders().get(MqContants.NOT_INTERCEPT))) {
            for (MqSendInterceptor mqSendInterceptor : mqSendInterceptors) {
                mqSendInterceptor.afterHandle(destination, message, exception);
            }
        }
        log.warn("[mq发送][topic = {}, payload = {}, exception = {}]", destination, message.getPayload(), exception);
    }
}

3. 消息消费方

消息消费方的实现会更加复杂一点,通过 BeanPostProcessor 接口在 DefaultRocketMQListenerContainer 对象创建的时候为其创建代理对象。如下:

public class RocketMqBeanPostProcessor implements InstantiationAwareBeanPostProcessor, InitializingBean {

    @Autowired(required = false)
    private List<MqReceiveInterceptor> mqReceiveInterceptors;

    /**
     * 获取消费者组
     *
     * @param container
     * @return
     */
    private static String getConusmerGroup(DefaultRocketMQListenerContainer container) {
        String consumerGroup = null;
        RocketMQListener<?> rocketMQListener = container.getRocketMQListener();
        if (rocketMQListener != null) {
            final RocketMQMessageListener annotation =
                    rocketMQListener.getClass().getAnnotation(RocketMQMessageListener.class);
            if (annotation != null) {
                consumerGroup = annotation.consumerGroup();
            }
        } else {
            RocketMQReplyListener<?, ?> rocketMQReplyListener = container.getRocketMQReplyListener();
            if (rocketMQReplyListener != null) {
                final RocketMQMessageListener annotation =
                        rocketMQReplyListener.getClass().getAnnotation(RocketMQMessageListener.class);
                if (annotation != null) {
                    consumerGroup = annotation.consumerGroup();
                }
            }
        }
        return consumerGroup;
    }

    @Override
    public void afterPropertiesSet() {
        if (mqReceiveInterceptors == null) {
            mqReceiveInterceptors = Lists.newArrayListWithCapacity(0);
        }
        Collections.sort(mqReceiveInterceptors);
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        // 对 DefaultRocketMQListenerContainer 处理,代理 messageListener 对象
        // 引入RocketMq 对消息的处理是交由 DefaultRocketMQListenerContainer 中的 messageListener 对象的
        if (bean instanceof DefaultRocketMQListenerContainer) {
            final DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
            final DefaultMQPushConsumer consumer = container.getConsumer();
            if (consumer != null) {
                final MessageListener messageListener = consumer.getMessageListener();
                if (messageListener instanceof MessageListenerOrderly
                        || messageListener instanceof MessageListenerConcurrently) {
                    // 重新设置代理后的对象
                    consumer.setMessageListener((MessageListener)
                            createProxy(getConusmerGroup(container), messageListener));
                }
            }
        }
        return bean;
    }

    /**
     * 为 MessageListener 创建代理对象
     *
     * @param consumerGroup
     * @param messageListener
     * @return
     */
    private Object createProxy(String consumerGroup, MessageListener messageListener) {
        return Proxy.newProxyInstance(MessageListener.class.getClassLoader(), messageListener.getClass().getInterfaces(),
                (proxy, method, args) -> {
                	// 代理对象判断如果调用的是 consumeMessage 方法则进行增强,调用前后调用消息拦截器。
                    if ("consumeMessage".equals(method.getName())) {
                        Exception exception = null;
                        try {
                            for (MessageExt message : (List<MessageExt>) args[0]) {
                                if (!Boolean.TRUE.toString().equalsIgnoreCase(
                                        message.getUserProperty(MqContants.NOT_INTERCEPT))) {
                                    for (MqReceiveInterceptor mqReceiveInterceptor : mqReceiveInterceptors) {
                                        mqReceiveInterceptor.preHandle(consumerGroup, message);
                                    }
                                }
                            }
                            return method.invoke(messageListener, args);
                        } catch (Exception e) {
                            exception = e;
                            throw e;
                        } finally {
                            for (MessageExt message : (List<MessageExt>) args[0]) {
                                if (!Boolean.TRUE.toString().equalsIgnoreCase(
                                        message.getUserProperty(MqContants.NOT_INTERCEPT))) {
                                    for (MqReceiveInterceptor mqReceiveInterceptor : mqReceiveInterceptors) {
                                        mqReceiveInterceptor.afterHandle(consumerGroup, message, exception);
                                    }
                                }
                            }
                        }
                    }
                    return method.invoke(messageListener, args);
                });
    }

}

4. 配置引入类

项目实际上是做了一个 spring-boot-starter 的jar 包供其他服务引用的因此,这里是通过自动装配将 RocketMQAutoConfig 引入,然后 RocketMQAutoConfig 通过 @Import 引入了 RocketmqConfig 类。

@Import(RocketmqConfig.class)
public class RocketMQAutoConfig {
}

其中 RocketmqConfig 的实现如下:

@Configuration
@Configuration
public class RocketmqConfig implements ApplicationContextAware {

    public static final String PRODUCER_BEAN_NAME = "defaultMQProducer";
    public static final String CONSUMER_BEAN_NAME = "defaultLitePullConsumer";
    private ApplicationContext applicationContext;
    @Value("${rocketmq.name-server}")
    private String nameServer;
    @Value("${rocketmq.producer.group:default}")
    private String producerGroup;
    @Value("${rocketmq.producer.sendMessageTimeout}")
    private int sendMessageTimeout;


    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    /**
     * DefaultMQProducer 注入
     * @return
     */
    @Bean("defaultMQProducer")
    public DefaultMQProducer defaultMQProducer() {
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
        defaultMQProducer.setProducerGroup(producerGroup);
        defaultMQProducer.setNamesrvAddr(nameServer);
        defaultMQProducer.setVipChannelEnabled(false);
        defaultMQProducer.setSendMsgTimeout(sendMessageTimeout);
        return defaultMQProducer;
    }

    /**
     * MQ 消息转换器
     * @return
     */
    @Bean
    @ConditionalOnProperty(prefix = "oms", name = "rocket.bak", havingValue = "true")
    @ConditionalOnMissingBean(RocketMQMessageConverter.class)
    public RocketMQMessageConverter rocketmqMessageConverter() {
        return new RocketMQMessageConverter();
    }

    /**
     * mq 扩展,支持发送前后拦截
     * 这里直接参考 org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration#rocketMQTemplate(org.apache.rocketmq.spring.support.RocketMQMessageConverter) 的实现
     *
     * @param rocketMQMessageConverter
     * @return
     */
    @Bean(destroyMethod = "destroy")
    @ConditionalOnProperty(prefix = "oms", name = "rocket.plus", havingValue = "true")
    public RocketMQTemplate rocketMQTemplate(RocketMQMessageConverter rocketMQMessageConverter) {
        RocketMQTemplate rocketMQTemplate = new RocketMQPlusTemplate();
        if (applicationContext.containsBean(PRODUCER_BEAN_NAME)) {
            rocketMQTemplate.setProducer((DefaultMQProducer) applicationContext.getBean(PRODUCER_BEAN_NAME));
        }
        if (applicationContext.containsBean(CONSUMER_BEAN_NAME)) {
            rocketMQTemplate.setConsumer((DefaultLitePullConsumer) applicationContext.getBean(CONSUMER_BEAN_NAME));
        }
        rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
        return rocketMQTemplate;
    }

    /**
     * mq 扩展,支持接收前后拦截
     *
     * @return
     */
    @Bean
    @ConditionalOnProperty(prefix = "oms", name = "rocket.plus", havingValue = "true")
    public RocketMqBeanPostProcessor rocketMqBeanPostProcessor() {
        return new RocketMqBeanPostProcessor();
    }

}

这里为 RocketMQ 收发增强功能做了一个配置项,如果需要开启增强功能则需要再服务的配置项中增加如下配置 :

# mq 扩展功能开启
oms.rocket.plus=true

5. 使用示例

  1. 编写两个MQ 日志打印拦截器

    // 日志打印的两个拦截器
    @Slf4j
    public class LogMqSendInterceptor implements MqSendInterceptor {
    
        @Override
        public void afterHandle(String destination, Message<?> payload, Exception e) {
            try {
                log.info("[{} 发送消息][payLoad = {}, exception = {}]",
                        destination, JSON.toJSONString(payload.getPayload()), e);
            } catch (Exception ex) {
                log.debug("[{} 日志打印错误]", destination, ex);
            }
        }
    }
    
    
    @Slf4j
    public class LogMqReceiveInterceptor implements MqReceiveInterceptor {
    
        @Override
        public void preHandle(String conusmerGroup, MessageExt messageExt) {
            try {
                log.info("[{} 收到消息][conusmerGroup = {}, message = {}]",
                        messageExt.getTopic(), conusmerGroup, new String(messageExt.getBody()));
            } catch (Exception e) {
                log.debug("[{} 日志打印错误]", messageExt.getTopic(), e);
            }
        }
    
        @Override
        public void afterHandle(String consumerGroup, MessageExt messageExt, Exception exception) {
    
        }
    }
    
    /******************************************/
    // 日志追踪的两个拦截器
    @Slf4j
    public class TraceMqSendInterceptor implements MqSendInterceptor {
    
        @Value("${spring.application.name:''}")
        private String applicationName;
    
        @Override
        public void preHandle(Message message) {
            String traceId = MDC.get(RpcConstants.MDC_TRACE_ID);
            if (StringUtils.isBlank(traceId)) {
                traceId = UUID.randomUUID().toString();
                log.info("[发送到Mq 消息][traceId 为空,随机生成 traceId = {}]", traceId);
            }
            message.putUserProperty(RpcConstants.MDC_TRACE_ID, traceId);
            if (StringUtils.isNotBlank(applicationName)) {
                message.putUserProperty(RpcConstants.APPLICATION_NAME, applicationName);
            }
        }
    
        @Override
        public void afterHandle(String destination, org.springframework.messaging.Message<?> message, Exception e) {
    
        }
    
    }
    
    @Slf4j
    public class TraceMqReceiveInterceptor implements MqReceiveInterceptor {
    
        @Override
        public void preHandle(String conusmerGroup, MessageExt messageExt) {
            String traceId = messageExt.getUserProperty(RpcConstants.MDC_TRACE_ID);
            final String applicationName = messageExt.getUserProperty(RpcConstants.APPLICATION_NAME);
    
            log.info("[接收到Mq 消息][messageId = {}, traceId = {}, sendAppName = {}]",
                    messageExt.getMsgId(), traceId, applicationName);
            if (StringUtils.isBlank(traceId)) {
                traceId = UUID.randomUUID().toString();
                log.info("[接收到Mq 消息][traceId 为空, messageId = {}, 随机生成 traceId = {}]",
                        messageExt.getMsgId(), traceId);
            }
            MDC.put(RpcConstants.MDC_TRACE_ID, traceId);
        }
    
        @Override
        public void afterHandle(String consumerGroup, MessageExt messageExt, Exception exception) {
    
        }
    }
    
  2. 创建MQ消息收发者

    // 消费者
    @Slf4j
    @Component
    @RocketMQMessageListener(topic = "topic01", consumerGroup = "cg01")
    public class DemoMQListener implements RocketMQListener<String> {
        @Autowired
        private ApplicationEventPublisher applicationEventPublisher;
    
        @Override
        public void onMessage(String message) {
            log.info("收到消息 {}", message);
        }
    
    }
    
    // 生产者
    @Component
    public class DemoRunner implements ApplicationRunner {
    
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        @Override
        public void run(ApplicationArguments args) throws Exception {
            rocketMQTemplate.send("topic01",  MessageBuilder.withPayload("hello world").build());
        }
    }
    
  3. 启用增强功能:服务配置中增加该属性

    oms.rocket.plus=true
    
  4. MQ 收发日志如下(我懒得开两个服务所以一个服务进行收发),收发日志如下:
    在这里插入图片描述
    可以看到:

    1. LogMqSendInterceptor 和 LogMqReceiveInterceptor 对MQ 收发日志进行了打印
    2. TraceMqSendInterceptor 和 TraceMqReceiveInterceptor 进行了 TracceId 传递:生产者由于没有获取到 TraceId 直接随机生成一个TraceId 传递,消费者接收到 TraceId 后存入上下文,当前线程再进行日志打印时携带了TraceId。

四、思路扩展

1. 消费流程简介

消息发送方的原理很简单,这里不再赘述。主要看消息接受方的原理,RocketMQ 消费者启动消费的大体流程:

  1. 服务器启动时 ListenerContainerConfiguration#afterSingletonsInstantiated 方法中会为所有被 @RocketMQMessageListener 注解修饰的对象创建一个 DefaultRocketMQListenerContainer 实例注册到容器中,并调用 DefaultRocketMQListenerContainer#start 方法启动
  2. DefaultRocketMQListenerContainer实例创建时会调用 afterPropertiesSet 方法初始化DefaultRocketMQListenerContainer#DefaultMQPushConsumer对象。在初始化的过程中会赋值messageListener 属性为 DefaultMessageListenerOrderly 或 DefaultMessageListenerConcurrently 对象。
  3. 当消费者进行消息消费时会调用 DefaultMessageListenerOrderly 或 DefaultMessageListenerConcurrently 的 consumeMessage 方法,而在该方法中会去调用我们自己注入的被 @RocketMQMessageListener 注解修饰的类的 onMessage 方法。

其中具体代码如下:

    @Override
    public void afterSingletonsInstantiated() {
    	// 扫描容器中所有被 RocketMQMessageListener 注解修饰的类
        Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class)
            .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
		// 对消费方进行注册
        beans.forEach(this::registerContainer);
    }

  private void registerContainer(String beanName, Object bean) {
		... 
        if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
            throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName());
        }
		// 获取并解析注解信息
        RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);

        String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
        String topic = this.environment.resolvePlaceholders(annotation.topic());
		... 
        // 注解校验
        validate(annotation);
		// 获取当前对象在 容器中的beanName,其中通过 AtomicLong 的自增来确保 BeanName 的唯一性
        String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
            counter.incrementAndGet());
        GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
		// 创建 DefaultRocketMQListenerContainer 并注册到容器中。
        genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
            () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
        // 获取容器中刚注册的 DefaultRocketMQListenerContainer 判断其如果没有启动则启动。
        DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
            DefaultRocketMQListenerContainer.class);
        if (!container.isRunning()) {
            try {
                container.start();
            } catch (Exception e) {
                log.error("Started container failed. {}", container, e);
                throw new RuntimeException(e);
            }
        }

        log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
    }

	// 创建 DefaultRocketMQListenerContainer 
    private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
        RocketMQMessageListener annotation) {
        DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();

        container.setRocketMQMessageListener(annotation);

        String nameServer = environment.resolvePlaceholders(annotation.nameServer());
        nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;
        String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
        container.setNameServer(nameServer);
        if (!StringUtils.isEmpty(accessChannel)) {
            container.setAccessChannel(AccessChannel.valueOf(accessChannel));
        }
        container.setTopic(environment.resolvePlaceholders(annotation.topic()));
        String tags = environment.resolvePlaceholders(annotation.selectorExpression());
        if (!StringUtils.isEmpty(tags)) {
            container.setSelectorExpression(tags);
        }
        container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
        // 将实际的 Bean 保存到 DefaultRocketMQListenerContainer#rocketMQListener 或 rocketMQReplyListener 属性中
        if (RocketMQListener.class.isAssignableFrom(bean.getClass())) {
            container.setRocketMQListener((RocketMQListener) bean);
        } else if (RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
            container.setRocketMQReplyListener((RocketMQReplyListener) bean);
        }
        container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
        container.setName(name);

        return container;
    }

DefaultRocketMQListenerContainer 实现了 InitializingBean 接口,因此在其实例创建的时候会调用 afterPropertiesSet 方法,在启动会初始化 DefaultRocketMQListenerContainer #DefaultMQPushConsumer 对象,这里我们主要关注 DefaultMessageListenerOrderly 和 DefaultMessageListenerConcurrently 两个类。

    @Override
    public void afterPropertiesSet() throws Exception {
        initRocketMQPushConsumer();

        this.messageType = getMessageType();
        this.methodParameter = getMethodParameter();
        log.debug("RocketMQ messageType: {}", messageType);
    }

	//初始化 consumer 对象
    private void initRocketMQPushConsumer() throws MQClientException {
		...
		// 消费模式:顺序还是并发,根据不同的模式赋值不同的 MessageListener
        switch (consumeMode) {
            case ORDERLY:
                consumer.setMessageListener(new DefaultMessageListenerOrderly());
                break;
            case CONCURRENTLY:
                consumer.setMessageListener(new DefaultMessageListenerConcurrently());
                break;
            default:
                throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
        }
		
		... 

    }

而在 DefaultMessageListenerConcurrently 和 DefaultMessageListenerOrderly中消息的处理是交由 rocketMQListener#onMessage 来处理。


    public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {

        @SuppressWarnings("unchecked")
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt messageExt : msgs) {
                log.debug("received msg: {}", messageExt);
                try {
                    long now = System.currentTimeMillis();
                    handleMessage(messageExt);
                    long costTime = System.currentTimeMillis() - now;
                    log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
                } catch (Exception e) {
                    log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
                    context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }

 public class DefaultMessageListenerOrderly implements MessageListenerOrderly {

        @SuppressWarnings("unchecked")
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            for (MessageExt messageExt : msgs) {
                log.debug("received msg: {}", messageExt);
                try {
                    long now = System.currentTimeMillis();
                    handleMessage(messageExt);
                    long costTime = System.currentTimeMillis() - now;
                    log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
                } catch (Exception e) {
                    log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
                    context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }

            return ConsumeOrderlyStatus.SUCCESS;
        }
    }

    private void handleMessage(
        MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {
        if (rocketMQListener != null) {
            rocketMQListener.onMessage(doConvertMessage(messageExt));
        } else if (rocketMQReplyListener != null) {
            Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt));
            Message<?> message = MessageBuilder.withPayload(replyContent).build();

            org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
            consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(replyMessage, new SendCallback() {
                @Override public void onSuccess(SendResult sendResult) {
                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                        log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
                    } else {
                        log.debug("Consumer replies message success.");
                    }
                }

                @Override public void onException(Throwable e) {
                    log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());
                }
            });
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/437036.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Web】浅聊JDBC的SPI机制是怎么实现的——DriverManager

目录 前言 分析 前言 【Web】浅浅地聊JDBC java.sql.Driver的SPI后门-CSDN博客 上篇文章我们做到了知其然&#xff0c;知道了JDBC有SPI机制&#xff0c;并且可以利用其Driver后门 这篇文章希望可以做到知其所以然&#xff0c;对JDBC的SPI机制的来源做到心里有数 分析 先是…

如何实现数据中心布线变更管理?

前言 随着科技的不断发展&#xff0c;数据中心作为企业的核心基础设施之一&#xff0c;承载着大量重要的业务数据。在数据中心运维过程中&#xff0c;变更管理流程变得尤为重要&#xff0c;它是确保数据中心基础设施稳定运行和保障数据安全的关键环节。变更管理的定义是指在维…

电商效果图云渲染优势是什么?

电商效果图云渲染指的是利用云计算技术&#xff0c;将电商所需的效果图渲染任务转移至云服务器进行处理。这些云服务器凭借其卓越的计算能力与庞大的存储空间&#xff0c;能够迅速完成复杂的渲染任务&#xff0c;从而释放本地电脑资源&#xff0c;提升工作效率。 电商效果图云…

常见四种限流算法详解(附:javaDemo)

限流简介 现代互联网很多业务场景&#xff0c;比如秒杀、下单、查询商品详情&#xff0c;最大特点就是高并发&#xff0c;而往往我们的系统不能承受这么大的流量&#xff0c;继而产生了很多的应对措施&#xff1a;CDN、消息队列、多级缓存、异地多活。 但是无论如何优化&…

今日学习总结2024.3.2

最近的学习状态比较好&#xff0c;感觉非常享受知识进入脑子的过程&#xff0c;有点上头。 实验室一个星期唯一一天的假期周六&#xff0c;也就是今天&#xff0c;也完全不想放假出去玩啊&#xff0c;在实验室泡了一天。 很后悔之前胆小&#xff0c;没有提前投简历找实习&…

基于毕奥-萨伐尔定律的交流电机的4极旋转磁场matlab模拟与仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 5.完整工程文件 1.课题概述 基于毕奥-萨伐尔定律的交流电机的4极旋转磁场&#xff0c;对比不同定子半径&#xff0c;对比2级旋转磁场。 2.系统仿真结果 3.核心程序与模型 版本&#xff1a;MATLAB2022a…

UE5数字孪生系列笔记(一)

智慧城市数字孪生系统 虚幻引擎连接数据库 将自己的mysql版本的libmysql.dll替换掉插件里面的libmysql.dll 然后将这个插件目录复制到虚幻项目目录下 然后添加这个插件即可 新建一个UMG&#xff0c;添加一个按钮试试&#xff0c;数据库是否连接 将UI添加到视口 打印是否连接…

ChaosBlade故障注入工具--cpu,内存,磁盘占用\IO,网络注入等

前言&#xff1a; 本文介绍一款开源的故障注入工具chaosblade&#xff0c;该工具原本由阿里研发&#xff0c;现已开源&#xff1b;工具特点&#xff1a;功能强大&#xff0c;使用简单。 该工具故障注入包含&#xff1a;cpu&#xff0c;内存&#xff0c;磁盘io&#xff0c;磁盘…

第一讲 计算机组成与结构(初稿)

计算机组成与结构 计算机指令常见CPU寄存器类型有哪些&#xff1f;存储器分类&#xff1f;内存&#xff1f;存储器基本组成&#xff1a; 控制器的基本组成主机完成指令的过程以取数指令为例以存数指令为例ax^2bxc程序的运行过程 机器字长存储容量小试牛刀&#xff08;答案及解析…

Chapter20-Ideal gases-CIE课本要点摘录、总结(编辑中)

20.1 Particles of a gas Brownian motion Fast modules 速率的数值大概了解下&#xff1a; average speed of the molecules:400m/s speed of sound:approximately 330m/s at STP&#xff08;standard temperature and pressure&#xff09; Standard Temperature and Pres…

【论文阅读】(2024.03.05-2024.03.15)论文阅读简单记录和汇总

(2024.03.05-2024.03.15)论文阅读简单记录和汇总 2024/03/05&#xff1a;随便简单写写&#xff0c;以后不会把太详细的记录在CSDN&#xff0c;有道的Markdown又感觉不好用。 目录 &#xff08;ICMM 2024&#xff09;Quality Scalable Video Coding Based on Neural Represent…

JAVA开发第一个Springboot WebApi项目

一、创建项目 1、用IDEA新建一个SpringBoot项目 注意JDK与Java版本的匹配,如果想选择jdk低版本,先要更改服务器URL:start.aliyun.com 2、添加依赖 (1)、Lombok (2)、Spring Web (3)、Mybatis Framework (4)、MySqlDriver 项目中的配置 pom.xml 如下 <?…

Jellyfin影音站点搭建并结合内网穿透实现远程观看本地影视资源

文章目录 1. 前言2. Jellyfin服务网站搭建2.1. Jellyfin下载和安装2.2. Jellyfin网页测试 3.本地网页发布3.1 cpolar的安装和注册3.2 Cpolar云端设置3.3 Cpolar本地设置 4.公网访问测试5. 结语 1. 前言 随着移动智能设备的普及&#xff0c;各种各样的使用需求也被开发出来&…

LeetCode每日一题之 快乐数

目录 题目介绍&#xff1a; 算法原理&#xff1a; 鸽巢原理&#xff1a; 如何找到环里元素&#xff1a; 代码实现&#xff1a; 题目介绍&#xff1a; 题目链接&#xff1a;. - 力扣&#xff08;LeetCode&#xff09; 算法原理&#xff1a; 我先简单举两个例子&#xff…

让照片说话唱歌的软件,盘点这3款!

在数字时代&#xff0c;我们总是渴望找到新的方式来表达自我、分享生活。近年来&#xff0c;随着人工智能和图像处理技术的飞速发展&#xff0c;一种新型的软件应运而生&#xff0c;它们能够让照片“说话”甚至“唱歌”&#xff0c;给我们的生活带来了无限乐趣和创意空间。那么…

光线追踪10 - Dielectrics( 电介质 )

水、玻璃和钻石等透明物质都属于电介质。当光线射入这些物质时&#xff0c;会分为反射光线和折射&#xff08;透射&#xff09;光线。我们将通过随机选择反射或折射来处理这一现象&#xff0c;每次相互作用只生成一条散射光线。11.1 Refraction 最难调试的部分是折射光线。通常…

SpringBoot 热部署。

SpringBoot 热部署。 文章目录 SpringBoot 热部署。 pom.xml。 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional…

.Net利用Microsoft.Extensions.DependencyInjection配置依赖注入

一、概述 为了让接口程序更加模块化和可测试&#xff0c;采用依赖注入的方式调用接口方法。 二、安装Microsoft.Extensions.DependencyInjection 在NuGet里面搜索Microsoft.Extensions.DependencyInjection&#xff0c;并进行安装。 三、代码编写 3.1 创建Service 实现类…

SpringMVC-异步调用,拦截器与异常处理

1.异步调用 1.发送异步请求 <a href"javascript:void(0);" id"testAjax">访问controller</a> <script type"text/javascript" src"js/jquery-3.7.1.js"></script> <script type"text/javascript&qu…

mysql 数据库查询 查询字段用逗号隔开 关联另一个表并显示

文章目录 问题描述解决方案 问题描述 如下如所示&#xff1a; 表一&#xff1a;wechat_dynamically_config表&#xff0c;重点字段&#xff1a;wechat_object 表二&#xff1a;wechat_object表&#xff0c;重点字段&#xff1a;wxid 需求&#xff1a;根据wechat_dynamically_…