文章目录
- 一、前言
- 二、项目背景
- 三、实现方案
- 1. 关键接口
- 2. 消息发送方
- 3. 消息消费方
- 4. 配置引入类
- 5. 使用示例
- 四、思路扩展
- 1. 消费流程简介
一、前言
本系列用来记录一些在实际项目中的小东西,并记录在过程中想到一些小东西,因为是随笔记录,所以内容不会过于详细。
二、项目背景
原本的目的是想实现 MQ 消息之间的 TraceId 追踪。如下两个拦截器可以实现 MQ 消息之间 TraceId 的传递,不过项目后面转用 TLog 。(但是写都写了,不能浪费 )
三、实现方案
1. 关键接口
-
消息发送方拦截器,当RocketMQ 消息发送的时候会调用对应的方法进行拦截,实现该接口可进行消息发送前后的扩展。
public interface MqSendInterceptor extends Ordered, Comparable<MqSendInterceptor> { /** * 消息发送前处理 * * @param message */ default void preHandle(Message message) { } /** * 消息发送后处理 * * @param destination * @param payload * @param e */ default void afterHandle(String destination, org.springframework.messaging.Message<?> payload, Exception e) { } /** * 拦截器优先级 * * @return */ @Override default int getOrder() { return Integer.MIN_VALUE; } /** * 拦截器排序 * * @param o * @return */ @Override default int compareTo(MqSendInterceptor o) { return getOrder() - o.getOrder(); } }
-
消息消费方拦截器,当RocketMQ 消息接收消费的时候会调用对应的方法进行拦截,实现该接口可进行消息消费前后的扩展。
public interface MqReceiveInterceptor extends Ordered, Comparable<MqReceiveInterceptor> { /** * 消息预处理 * * @param conusmerGroup * @param messageExt * @return */ void preHandle(String conusmerGroup, MessageExt messageExt); /** * 消息后处理 * * @param consumerGroup * @param messageExt */ void afterHandle(String consumerGroup, MessageExt messageExt, Exception exception); /** * 拦截器优先级 * * @return */ @Override default int getOrder() { return Integer.MIN_VALUE; } /** * 拦截器排序 * * @param o * @return */ @Override default int compareTo(MqReceiveInterceptor o) { return getOrder() - o.getOrder(); } }
2. 消息发送方
消息发送的扩展逻辑实现其实很简单,直接继承的了基本的 RocketMQTemplate ,对一些方法进行了重写。重写的逻辑是在消息发送前后会调用 MqSendInterceptor 的对应方法即可。具体代码如下
@Slf4j
public class RocketMQPlusTemplate extends RocketMQTemplate implements InitializingBean {
@Autowired(required = false)
private List<MqSendInterceptor> mqSendInterceptors;
@Override
public void afterPropertiesSet() throws Exception {
if (mqSendInterceptors == null) {
mqSendInterceptors = Lists.newArrayListWithCapacity(0);
}
Collections.sort(mqSendInterceptors);
super.afterPropertiesSet();
}
@Override
public <T> T sendAndReceive(String destination, Message<?> message, Type type,
String hashKey, long timeout, int delayLevel) {
return handleMsgSupplier(() -> {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("send request message failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
if (delayLevel > 0) {
rocketMsg.setDelayTimeLevel(delayLevel);
}
MessageExt replyMessage;
if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
replyMessage = (MessageExt) getProducer().request(rocketMsg, timeout);
} else {
replyMessage = (MessageExt) getProducer().request(rocketMsg, getMessageQueueSelector(), hashKey, timeout);
}
return replyMessage != null ? (T) doConvertMessage(replyMessage, type) : null;
} catch (Exception e) {
log.error("send request message failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
}, destination, message);
}
@Override
public void sendAndReceive(String destination, Message<?> message,
RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey, long t, int delayLevel) {
handleMsgRunnable(() -> {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("send request message failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
try {
long timeout = t;
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
if (delayLevel > 0) {
rocketMsg.setDelayTimeLevel(delayLevel);
}
if (timeout <= 0) {
timeout = getProducer().getSendMsgTimeout();
}
RequestCallback requestCallback = null;
if (rocketMQLocalRequestCallback != null) {
requestCallback = new RequestCallback() {
@Override
public void onSuccess(org.apache.rocketmq.common.message.Message message) {
rocketMQLocalRequestCallback.onSuccess(doConvertMessage((MessageExt) message, getMessageType(rocketMQLocalRequestCallback)));
}
@Override
public void onException(Throwable e) {
rocketMQLocalRequestCallback.onException(e);
}
};
}
if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
getProducer().request(rocketMsg, requestCallback, timeout);
} else {
getProducer().request(rocketMsg, getMessageQueueSelector(), hashKey, requestCallback, timeout);
}
} catch (
Exception e) {
log.error("send request message failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
}, destination, message);
}
@Override
public <T extends Message> SendResult syncSend(String destination, Collection<T> messages, long timeout) {
return handleMsgsSupplier(() -> {
if (Objects.isNull(messages) || messages.size() == 0) {
log.error("syncSend with batch failed. destination:{}, messages is empty ", destination);
throw new IllegalArgumentException("`messages` can not be empty");
}
try {
long now = System.currentTimeMillis();
Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
for (Message msg : messages) {
if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {
log.warn("Found a message empty in the batch, skip it");
continue;
}
rmqMsgs.add(this.createRocketMqMessage(destination, msg));
}
SendResult sendResult = getProducer().send(rmqMsgs, timeout);
long costTime = System.currentTimeMillis() - now;
if (log.isDebugEnabled()) {
log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
}
return sendResult;
} catch (Exception e) {
log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());
throw new MessagingException(e.getMessage(), e);
}
}, destination, messages);
}
@Override
public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {
return handleMsgSupplier(() -> {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
try {
long now = System.currentTimeMillis();
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
SendResult sendResult = getProducer().send(rocketMsg, getMessageQueueSelector(), hashKey, timeout);
long costTime = System.currentTimeMillis() - now;
if (log.isDebugEnabled()) {
log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
}
return sendResult;
} catch (Exception e) {
log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
}, destination, message);
}
@Override
public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) {
return handleMsgSupplier(() -> {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("syncSend failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
try {
long now = System.currentTimeMillis();
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
if (delayLevel > 0) {
rocketMsg.setDelayTimeLevel(delayLevel);
}
SendResult sendResult = getProducer().send(rocketMsg, timeout);
long costTime = System.currentTimeMillis() - now;
if (log.isDebugEnabled()) {
log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
}
return sendResult;
} catch (Exception e) {
log.error("syncSend failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
}, destination, message);
}
@Override
public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout,
int delayLevel) {
handleMsgRunnable(() -> {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("asyncSend failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
if (delayLevel > 0) {
rocketMsg.setDelayTimeLevel(delayLevel);
}
getProducer().send(rocketMsg, sendCallback, timeout);
} catch (Exception e) {
log.info("asyncSend failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
}, destination, message);
}
@Override
public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback,
long timeout) {
handleMsgRunnable(() -> {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("asyncSendOrderly failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
getProducer().send(rocketMsg, getMessageQueueSelector(), hashKey, sendCallback, timeout);
} catch (Exception e) {
log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
}, destination, message);
}
@Override
public void sendOneWay(String destination, Message<?> message) {
handleMsgRunnable(() -> {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("sendOneWay failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
getProducer().sendOneway(rocketMsg);
} catch (Exception e) {
log.error("sendOneWay failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
}, destination, message);
}
@Override
public void sendOneWayOrderly(String destination, Message<?> message, String hashKey) {
handleMsgRunnable(() -> {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("sendOneWayOrderly failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
getProducer().sendOneway(rocketMsg, getMessageQueueSelector(), hashKey);
} catch (Exception e) {
log.error("sendOneWayOrderly failed. destination:{}, message:{}", destination, message);
throw new MessagingException(e.getMessage(), e);
}
}, destination, message);
}
@Override
public TransactionSendResult sendMessageInTransaction(final String destination,
final Message<?> message, final Object arg) throws MessagingException {
return handleMsgSupplier(() -> {
try {
if (((TransactionMQProducer) getProducer()).getTransactionListener() == null) {
throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener");
}
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
return getProducer().sendMessageInTransaction(rocketMsg, arg);
} catch (MQClientException e) {
throw RocketMQUtil.convert(e);
}
}, destination, message);
}
protected org.apache.rocketmq.common.message.Message createRocketMqMessage(
String destination, Message<?> message) {
Message<?> msg = this.doConvert(message.getPayload(), message.getHeaders(), null);
final org.apache.rocketmq.common.message.Message rocketMessage =
RocketMQUtil.convertToRocketMessage(getMessageConverter(), getCharset(),
destination, msg);
preHandle(rocketMessage);
return rocketMessage;
}
private Object doConvertMessage(MessageExt messageExt, Type type) {
if (Objects.equals(type, MessageExt.class)) {
return messageExt;
} else if (Objects.equals(type, byte[].class)) {
return messageExt.getBody();
} else {
String str = new String(messageExt.getBody(), Charset.forName(getCharset()));
if (Objects.equals(type, String.class)) {
return str;
} else {
// If msgType not string, use objectMapper change it.
try {
if (type instanceof Class) {
//if the messageType has not Generic Parameter
return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) type);
} else {
//if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint".
//we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter.
return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) type).getRawType(), null);
}
} catch (Exception e) {
log.error("convert failed. str:{}, msgType:{}", str, type);
throw new RuntimeException("cannot convert message to " + type, e);
}
}
}
}
private Type getMessageType(RocketMQLocalRequestCallback rocketMQLocalRequestCallback) {
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(rocketMQLocalRequestCallback);
Type matchedGenericInterface = null;
while (Objects.nonNull(targetClass)) {
Type[] interfaces = targetClass.getGenericInterfaces();
if (Objects.nonNull(interfaces)) {
for (Type type : interfaces) {
if (type instanceof ParameterizedType && (Objects.equals(((ParameterizedType) type).getRawType(), RocketMQLocalRequestCallback.class))) {
matchedGenericInterface = type;
break;
}
}
}
targetClass = targetClass.getSuperclass();
}
if (Objects.isNull(matchedGenericInterface)) {
return Object.class;
}
Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments();
if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
return actualTypeArguments[0];
}
return Object.class;
}
/**
* 消息
*
* @param runnable
* @param destination
* @param message
*/
private void handleMsgRunnable(Runnable runnable, String destination, Message<?> message) {
Exception exception = null;
try {
runnable.run();
} catch (Exception e) {
exception = e;
throw e;
} finally {
afterHandle(destination, message, exception);
}
}
/**
* 消息处理
*
* @param supplier
* @param destination
* @param message
* @return
*/
private <R> R handleMsgSupplier(Supplier<R> supplier, String destination, Message<?> message) {
Exception exception = null;
try {
return supplier.get();
} catch (Exception e) {
exception = e;
throw e;
} finally {
afterHandle(destination, message, exception);
}
}
/**
* 消息
*
* @param supplier
* @param destination
* @param messages
* @param <T>
* @param <P>
* @return
*/
private <T, P extends Message<?>> T handleMsgsSupplier(Supplier<T> supplier, String destination, Collection<P> messages) {
Exception exception = null;
try {
return supplier.get();
} catch (Exception e) {
exception = e;
throw e;
} finally {
for (Message<?> message : messages) {
afterHandle(destination, message, exception);
}
}
}
/**
* 消息前置处理
*
* @param message
*/
protected void preHandle(org.apache.rocketmq.common.message.Message message) {
if (!Boolean.TRUE.toString().equalsIgnoreCase(message.getUserProperty(MqContants.NOT_INTERCEPT))) {
for (MqSendInterceptor mqSendInterceptor : mqSendInterceptors) {
mqSendInterceptor.preHandle(message);
}
}
}
/**
* 消息后置处理
*
* @param destination
* @param message
* @param exception
*/
protected void afterHandle(String destination, Message<?> message, Exception exception) {
if (!Boolean.TRUE.equals(message.getHeaders().get(MqContants.NOT_INTERCEPT))) {
for (MqSendInterceptor mqSendInterceptor : mqSendInterceptors) {
mqSendInterceptor.afterHandle(destination, message, exception);
}
}
log.warn("[mq发送][topic = {}, payload = {}, exception = {}]", destination, message.getPayload(), exception);
}
}
3. 消息消费方
消息消费方的实现会更加复杂一点,通过 BeanPostProcessor 接口在 DefaultRocketMQListenerContainer 对象创建的时候为其创建代理对象。如下:
public class RocketMqBeanPostProcessor implements InstantiationAwareBeanPostProcessor, InitializingBean {
@Autowired(required = false)
private List<MqReceiveInterceptor> mqReceiveInterceptors;
/**
* 获取消费者组
*
* @param container
* @return
*/
private static String getConusmerGroup(DefaultRocketMQListenerContainer container) {
String consumerGroup = null;
RocketMQListener<?> rocketMQListener = container.getRocketMQListener();
if (rocketMQListener != null) {
final RocketMQMessageListener annotation =
rocketMQListener.getClass().getAnnotation(RocketMQMessageListener.class);
if (annotation != null) {
consumerGroup = annotation.consumerGroup();
}
} else {
RocketMQReplyListener<?, ?> rocketMQReplyListener = container.getRocketMQReplyListener();
if (rocketMQReplyListener != null) {
final RocketMQMessageListener annotation =
rocketMQReplyListener.getClass().getAnnotation(RocketMQMessageListener.class);
if (annotation != null) {
consumerGroup = annotation.consumerGroup();
}
}
}
return consumerGroup;
}
@Override
public void afterPropertiesSet() {
if (mqReceiveInterceptors == null) {
mqReceiveInterceptors = Lists.newArrayListWithCapacity(0);
}
Collections.sort(mqReceiveInterceptors);
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
// 对 DefaultRocketMQListenerContainer 处理,代理 messageListener 对象
// 引入RocketMq 对消息的处理是交由 DefaultRocketMQListenerContainer 中的 messageListener 对象的
if (bean instanceof DefaultRocketMQListenerContainer) {
final DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
final DefaultMQPushConsumer consumer = container.getConsumer();
if (consumer != null) {
final MessageListener messageListener = consumer.getMessageListener();
if (messageListener instanceof MessageListenerOrderly
|| messageListener instanceof MessageListenerConcurrently) {
// 重新设置代理后的对象
consumer.setMessageListener((MessageListener)
createProxy(getConusmerGroup(container), messageListener));
}
}
}
return bean;
}
/**
* 为 MessageListener 创建代理对象
*
* @param consumerGroup
* @param messageListener
* @return
*/
private Object createProxy(String consumerGroup, MessageListener messageListener) {
return Proxy.newProxyInstance(MessageListener.class.getClassLoader(), messageListener.getClass().getInterfaces(),
(proxy, method, args) -> {
// 代理对象判断如果调用的是 consumeMessage 方法则进行增强,调用前后调用消息拦截器。
if ("consumeMessage".equals(method.getName())) {
Exception exception = null;
try {
for (MessageExt message : (List<MessageExt>) args[0]) {
if (!Boolean.TRUE.toString().equalsIgnoreCase(
message.getUserProperty(MqContants.NOT_INTERCEPT))) {
for (MqReceiveInterceptor mqReceiveInterceptor : mqReceiveInterceptors) {
mqReceiveInterceptor.preHandle(consumerGroup, message);
}
}
}
return method.invoke(messageListener, args);
} catch (Exception e) {
exception = e;
throw e;
} finally {
for (MessageExt message : (List<MessageExt>) args[0]) {
if (!Boolean.TRUE.toString().equalsIgnoreCase(
message.getUserProperty(MqContants.NOT_INTERCEPT))) {
for (MqReceiveInterceptor mqReceiveInterceptor : mqReceiveInterceptors) {
mqReceiveInterceptor.afterHandle(consumerGroup, message, exception);
}
}
}
}
}
return method.invoke(messageListener, args);
});
}
}
4. 配置引入类
项目实际上是做了一个 spring-boot-starter 的jar 包供其他服务引用的因此,这里是通过自动装配将 RocketMQAutoConfig 引入,然后 RocketMQAutoConfig 通过 @Import 引入了 RocketmqConfig 类。
@Import(RocketmqConfig.class)
public class RocketMQAutoConfig {
}
其中 RocketmqConfig 的实现如下:
@Configuration
@Configuration
public class RocketmqConfig implements ApplicationContextAware {
public static final String PRODUCER_BEAN_NAME = "defaultMQProducer";
public static final String CONSUMER_BEAN_NAME = "defaultLitePullConsumer";
private ApplicationContext applicationContext;
@Value("${rocketmq.name-server}")
private String nameServer;
@Value("${rocketmq.producer.group:default}")
private String producerGroup;
@Value("${rocketmq.producer.sendMessageTimeout}")
private int sendMessageTimeout;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
/**
* DefaultMQProducer 注入
* @return
*/
@Bean("defaultMQProducer")
public DefaultMQProducer defaultMQProducer() {
DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
defaultMQProducer.setProducerGroup(producerGroup);
defaultMQProducer.setNamesrvAddr(nameServer);
defaultMQProducer.setVipChannelEnabled(false);
defaultMQProducer.setSendMsgTimeout(sendMessageTimeout);
return defaultMQProducer;
}
/**
* MQ 消息转换器
* @return
*/
@Bean
@ConditionalOnProperty(prefix = "oms", name = "rocket.bak", havingValue = "true")
@ConditionalOnMissingBean(RocketMQMessageConverter.class)
public RocketMQMessageConverter rocketmqMessageConverter() {
return new RocketMQMessageConverter();
}
/**
* mq 扩展,支持发送前后拦截
* 这里直接参考 org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration#rocketMQTemplate(org.apache.rocketmq.spring.support.RocketMQMessageConverter) 的实现
*
* @param rocketMQMessageConverter
* @return
*/
@Bean(destroyMethod = "destroy")
@ConditionalOnProperty(prefix = "oms", name = "rocket.plus", havingValue = "true")
public RocketMQTemplate rocketMQTemplate(RocketMQMessageConverter rocketMQMessageConverter) {
RocketMQTemplate rocketMQTemplate = new RocketMQPlusTemplate();
if (applicationContext.containsBean(PRODUCER_BEAN_NAME)) {
rocketMQTemplate.setProducer((DefaultMQProducer) applicationContext.getBean(PRODUCER_BEAN_NAME));
}
if (applicationContext.containsBean(CONSUMER_BEAN_NAME)) {
rocketMQTemplate.setConsumer((DefaultLitePullConsumer) applicationContext.getBean(CONSUMER_BEAN_NAME));
}
rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
return rocketMQTemplate;
}
/**
* mq 扩展,支持接收前后拦截
*
* @return
*/
@Bean
@ConditionalOnProperty(prefix = "oms", name = "rocket.plus", havingValue = "true")
public RocketMqBeanPostProcessor rocketMqBeanPostProcessor() {
return new RocketMqBeanPostProcessor();
}
}
这里为 RocketMQ 收发增强功能做了一个配置项,如果需要开启增强功能则需要再服务的配置项中增加如下配置 :
# mq 扩展功能开启
oms.rocket.plus=true
5. 使用示例
-
编写两个MQ 日志打印拦截器
// 日志打印的两个拦截器 @Slf4j public class LogMqSendInterceptor implements MqSendInterceptor { @Override public void afterHandle(String destination, Message<?> payload, Exception e) { try { log.info("[{} 发送消息][payLoad = {}, exception = {}]", destination, JSON.toJSONString(payload.getPayload()), e); } catch (Exception ex) { log.debug("[{} 日志打印错误]", destination, ex); } } } @Slf4j public class LogMqReceiveInterceptor implements MqReceiveInterceptor { @Override public void preHandle(String conusmerGroup, MessageExt messageExt) { try { log.info("[{} 收到消息][conusmerGroup = {}, message = {}]", messageExt.getTopic(), conusmerGroup, new String(messageExt.getBody())); } catch (Exception e) { log.debug("[{} 日志打印错误]", messageExt.getTopic(), e); } } @Override public void afterHandle(String consumerGroup, MessageExt messageExt, Exception exception) { } } /******************************************/ // 日志追踪的两个拦截器 @Slf4j public class TraceMqSendInterceptor implements MqSendInterceptor { @Value("${spring.application.name:''}") private String applicationName; @Override public void preHandle(Message message) { String traceId = MDC.get(RpcConstants.MDC_TRACE_ID); if (StringUtils.isBlank(traceId)) { traceId = UUID.randomUUID().toString(); log.info("[发送到Mq 消息][traceId 为空,随机生成 traceId = {}]", traceId); } message.putUserProperty(RpcConstants.MDC_TRACE_ID, traceId); if (StringUtils.isNotBlank(applicationName)) { message.putUserProperty(RpcConstants.APPLICATION_NAME, applicationName); } } @Override public void afterHandle(String destination, org.springframework.messaging.Message<?> message, Exception e) { } } @Slf4j public class TraceMqReceiveInterceptor implements MqReceiveInterceptor { @Override public void preHandle(String conusmerGroup, MessageExt messageExt) { String traceId = messageExt.getUserProperty(RpcConstants.MDC_TRACE_ID); final String applicationName = messageExt.getUserProperty(RpcConstants.APPLICATION_NAME); log.info("[接收到Mq 消息][messageId = {}, traceId = {}, sendAppName = {}]", messageExt.getMsgId(), traceId, applicationName); if (StringUtils.isBlank(traceId)) { traceId = UUID.randomUUID().toString(); log.info("[接收到Mq 消息][traceId 为空, messageId = {}, 随机生成 traceId = {}]", messageExt.getMsgId(), traceId); } MDC.put(RpcConstants.MDC_TRACE_ID, traceId); } @Override public void afterHandle(String consumerGroup, MessageExt messageExt, Exception exception) { } }
-
创建MQ消息收发者
// 消费者 @Slf4j @Component @RocketMQMessageListener(topic = "topic01", consumerGroup = "cg01") public class DemoMQListener implements RocketMQListener<String> { @Autowired private ApplicationEventPublisher applicationEventPublisher; @Override public void onMessage(String message) { log.info("收到消息 {}", message); } } // 生产者 @Component public class DemoRunner implements ApplicationRunner { @Autowired private RocketMQTemplate rocketMQTemplate; @Override public void run(ApplicationArguments args) throws Exception { rocketMQTemplate.send("topic01", MessageBuilder.withPayload("hello world").build()); } }
-
启用增强功能:服务配置中增加该属性
oms.rocket.plus=true
-
MQ 收发日志如下(我懒得开两个服务所以一个服务进行收发),收发日志如下:
可以看到:- LogMqSendInterceptor 和 LogMqReceiveInterceptor 对MQ 收发日志进行了打印
- TraceMqSendInterceptor 和 TraceMqReceiveInterceptor 进行了 TracceId 传递:生产者由于没有获取到 TraceId 直接随机生成一个TraceId 传递,消费者接收到 TraceId 后存入上下文,当前线程再进行日志打印时携带了TraceId。
四、思路扩展
1. 消费流程简介
消息发送方的原理很简单,这里不再赘述。主要看消息接受方的原理,RocketMQ 消费者启动消费的大体流程:
- 服务器启动时 ListenerContainerConfiguration#afterSingletonsInstantiated 方法中会为所有被 @RocketMQMessageListener 注解修饰的对象创建一个 DefaultRocketMQListenerContainer 实例注册到容器中,并调用 DefaultRocketMQListenerContainer#start 方法启动
- DefaultRocketMQListenerContainer实例创建时会调用 afterPropertiesSet 方法初始化DefaultRocketMQListenerContainer#DefaultMQPushConsumer对象。在初始化的过程中会赋值messageListener 属性为 DefaultMessageListenerOrderly 或 DefaultMessageListenerConcurrently 对象。
- 当消费者进行消息消费时会调用 DefaultMessageListenerOrderly 或 DefaultMessageListenerConcurrently 的 consumeMessage 方法,而在该方法中会去调用我们自己注入的被 @RocketMQMessageListener 注解修饰的类的 onMessage 方法。
其中具体代码如下:
@Override
public void afterSingletonsInstantiated() {
// 扫描容器中所有被 RocketMQMessageListener 注解修饰的类
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class)
.entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// 对消费方进行注册
beans.forEach(this::registerContainer);
}
private void registerContainer(String beanName, Object bean) {
...
if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName());
}
// 获取并解析注解信息
RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
String topic = this.environment.resolvePlaceholders(annotation.topic());
...
// 注解校验
validate(annotation);
// 获取当前对象在 容器中的beanName,其中通过 AtomicLong 的自增来确保 BeanName 的唯一性
String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
counter.incrementAndGet());
GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
// 创建 DefaultRocketMQListenerContainer 并注册到容器中。
genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
() -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
// 获取容器中刚注册的 DefaultRocketMQListenerContainer 判断其如果没有启动则启动。
DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
DefaultRocketMQListenerContainer.class);
if (!container.isRunning()) {
try {
container.start();
} catch (Exception e) {
log.error("Started container failed. {}", container, e);
throw new RuntimeException(e);
}
}
log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}
// 创建 DefaultRocketMQListenerContainer
private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
RocketMQMessageListener annotation) {
DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
container.setRocketMQMessageListener(annotation);
String nameServer = environment.resolvePlaceholders(annotation.nameServer());
nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;
String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
container.setNameServer(nameServer);
if (!StringUtils.isEmpty(accessChannel)) {
container.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
container.setTopic(environment.resolvePlaceholders(annotation.topic()));
String tags = environment.resolvePlaceholders(annotation.selectorExpression());
if (!StringUtils.isEmpty(tags)) {
container.setSelectorExpression(tags);
}
container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
// 将实际的 Bean 保存到 DefaultRocketMQListenerContainer#rocketMQListener 或 rocketMQReplyListener 属性中
if (RocketMQListener.class.isAssignableFrom(bean.getClass())) {
container.setRocketMQListener((RocketMQListener) bean);
} else if (RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
container.setRocketMQReplyListener((RocketMQReplyListener) bean);
}
container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
container.setName(name);
return container;
}
DefaultRocketMQListenerContainer 实现了 InitializingBean 接口,因此在其实例创建的时候会调用 afterPropertiesSet 方法,在启动会初始化 DefaultRocketMQListenerContainer #DefaultMQPushConsumer 对象,这里我们主要关注 DefaultMessageListenerOrderly 和 DefaultMessageListenerConcurrently 两个类。
@Override
public void afterPropertiesSet() throws Exception {
initRocketMQPushConsumer();
this.messageType = getMessageType();
this.methodParameter = getMethodParameter();
log.debug("RocketMQ messageType: {}", messageType);
}
//初始化 consumer 对象
private void initRocketMQPushConsumer() throws MQClientException {
...
// 消费模式:顺序还是并发,根据不同的模式赋值不同的 MessageListener
switch (consumeMode) {
case ORDERLY:
consumer.setMessageListener(new DefaultMessageListenerOrderly());
break;
case CONCURRENTLY:
consumer.setMessageListener(new DefaultMessageListenerConcurrently());
break;
default:
throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
}
...
}
而在 DefaultMessageListenerConcurrently 和 DefaultMessageListenerOrderly中消息的处理是交由 rocketMQListener#onMessage 来处理。
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
@SuppressWarnings("unchecked")
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
handleMessage(messageExt);
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) {
log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
@SuppressWarnings("unchecked")
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
handleMessage(messageExt);
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) {
log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
}
private void handleMessage(
MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {
if (rocketMQListener != null) {
rocketMQListener.onMessage(doConvertMessage(messageExt));
} else if (rocketMQReplyListener != null) {
Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt));
Message<?> message = MessageBuilder.withPayload(replyContent).build();
org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(replyMessage, new SendCallback() {
@Override public void onSuccess(SendResult sendResult) {
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
} else {
log.debug("Consumer replies message success.");
}
}
@Override public void onException(Throwable e) {
log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());
}
});
}
}