DefaultMQPushConsumer 配置
package repayment.config;
import cn.itcast.wanxinp2p.repayment.message.diy.DefaultMessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RocketMQConsumerConfig {
// 如果定义了多个 DefaultMQPushConsumer, 请注意 形参 的名字
@Bean(initMethod = "start", destroyMethod = "shutdown")
public DefaultMQPushConsumer defaultMQPushConsumer(DefaultMessageListenerConcurrently messageListener) throws MQClientException {
// 初始化consumer,并设置consumer group name
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DEFAULT_CONSUMER_GROUP");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
//订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
consumer.subscribe("TEST_TOPIC", "*");
//注册回调接口来处理从Broker中收到的消息
consumer.registerMessageListener(messageListener);
return consumer;
}
}
自定义 MessageListener
需要特别注意 MessageListener 使用的是 @Autowired 注入的是 MessageHandler 类型的接口
并且执行了 MessageHandler 的getELFilter(),[通过SPEL计算得出]和 test()
计算是该MessageExt否符合.
对于符合的MessageHandler , 先对其 MessageExt 提取Body. 再 执行 具体处理消息的逻辑onMessage()
package repayment.message;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.SimpleEvaluationContext;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Optional;
@Slf4j
@Component
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
@Autowired
private List<MessageHandler> rocketMQListenerList;
@SuppressWarnings("unchecked")
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
// rocketMQListener 选择
ExpressionParser parser = new SpelExpressionParser();
EvaluationContext cont = SimpleEvaluationContext.forReadWriteDataBinding().build();
cont.setVariable("messageExt", messageExt);
Optional<MessageHandler> first = rocketMQListenerList.stream()
.filter(rocketMQListener -> {
String elFilter = rocketMQListener.getELFilter();
if (StringUtils.isBlank(elFilter))
return true;
return parser.parseExpression(elFilter).getValue(cont, Boolean.class);
})
.filter(rocketMQListener -> rocketMQListener.test(messageExt))
.findFirst();
// 注意,如果筛选完成没有获取到 rocketMQListener 则自此会抛出异常
MessageHandler rocketMQListener = first.get();
// 转换消息并执行
rocketMQListener.onMessage(rocketMQListener.convertMessage(messageExt));
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) {
log.warn("consume message failed. messageExt:{}", messageExt, e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
MessageHandler 接口定义
package repayment.message;
import org.apache.rocketmq.common.message.MessageExt;
/**
* @param <T> 消息 body 的数据类型,如果没有重写 convertMessage 方法, 则建议<T> 为 String
* 消费 RocketMQ 消息的帮助类
*/
public interface MessageHandler<T> {
/**
* 通过 SPEL 筛选 MessageHandler 的方式
* SPEL 上下文设置的了 #messageExt
*
* @return 稍后用于计算的 SPEL 表达式(默认返回空字符串,代表不过滤)
*/
default String getELFilter() {
return "";
}
/**
* 通过 messageExt 筛选 MessageHandler 的普通方式
* 默认返回 空字符串,代表不过滤。
*
* @param messageExt MessageExt
* @return true:保留,false:丢弃
*/
default boolean test(MessageExt messageExt) {
return true;
}
/**
* @param messageExt MessageExt
* @return 默认为字符串类型的数据
*/
default T convertMessage(MessageExt messageExt) {
return (T) new String(messageExt.getBody());
}
/**
* 具体处理消息的逻辑
*
* @param message messageExt.body
*/
void onMessage(T message);
}
自定义的 HelloMessageHandler
用于解析 topic = TEST_TOPIC, Tags.contains("tag0") 的消息
package repayment.message.handler;
import cn.itcast.wanxinp2p.repayment.ann.MQSelect;
import org.apache.rocketmq.common.message.MessageExt;
// topic = TEST_TOPIC, Tags.contains("tag0")
@Component
public class HelloMessageHandler implements MessageHandler<String> {
@Override
public String getELFilter() {
return "#messageExt.topic == 'TEST_TOPIC'";
}
@Override
public boolean test(MessageExt messageExt) {
return messageExt.getTags().contains("tag0");
}
@Override
public void onMessage(String message) {
System.out.println(message);
}
}