文章目录
- 1.背景
- 2.通用方法改造
- 2.1添加maven依赖
- 2.2 RocketMq基础配置
- 2.3 配置类
- 2.5 消息传输的对象和结果
- 2.4 消息生产者
- 2.5 消息消费者
- 2.6 功能测试
1.背景
在第二章:《RocketMq详解:二、SpringBoot集成RocketMq》中我们已经实现了消费基本生产和消费的实现,但是在真实的开发环境中如果按照这种方式去实现,冗余代码较多,且通过实现RocketMQListener中onMessage的方法去完成消息消费无返回结果,在后期的流程中不易维护,因此,本章将对这些问题进行二次改造和优化。
为了防止新同学从头开始学,本章将如何配置和实现简单在复述一下,至于具体怎么安装RocketMq,本文提供两种安装方法:
- 《MacOS环境下RocketMQ安装及部署 RocketMQ Dashboard 可视化》
- 《docker安装rocketMq》
2.通用方法改造
2.1添加maven依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
2.2 RocketMq基础配置
接着,在application.yml或application.properties文件中配置RocketMQ的相关参数,如NameServer地址、生产者组、消费者组等:
rocketmq:
name-server: 127.0.0.1:9876
# 生产者
producer:
group: boot_group_1
# 消息发送超时时间
send-message-timeout: 3000
# 消息最大长度4M
max-message-size: 4096
# 消息发送失败重试次数
retry-times-when-send-failed: 3
# 异步消息发送失败重试次数
retry-times-when-send-async-failed: 2
# 消费者
consumer:
group: boot_group_1
# 每次提取的最大消息数
pull-batch-size: 5
上面的配置如果是在分布式环境下也可以配置在Apollo或nacos等配置中心里进行动态配置
2.3 配置类
在配置类中主要定义两个Bean的加载,即RocketMQTemplate和DefaultMQProducer,主要是提供消息发送的能力,即生产消息;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author ninesun
* @ClassName RocketMqConfig
* @description: 消息中间件配置类
* @date 2024年05月19日
* @version: 1.0
*/
@Configuration
public class RocketMqConfig {
@Value("${rocketmq.name-server}")
private String nameServer;
@Value("${rocketmq.producer.group}")
private String producerGroup;
@Value("${rocketmq.producer.send-message-timeout}")
private Integer sendMsgTimeout;
@Value("${rocketmq.producer.max-message-size}")
private Integer maxMessageSize;
@Value("${rocketmq.producer.retry-times-when-send-failed}")
private Integer retryTimesWhenSendFailed;
@Value("${rocketmq.producer.retry-times-when-send-async-failed}")
private Integer retryTimesWhenSendAsyncFailed;
@Bean
public RocketMQTemplate rocketMqTemplate() {
RocketMQTemplate rocketMqTemplate = new RocketMQTemplate();
rocketMqTemplate.setProducer(defaultMqProducer());
return rocketMqTemplate;
}
@Bean
public DefaultMQProducer defaultMqProducer() {
DefaultMQProducer producer = new DefaultMQProducer();
producer.setNamesrvAddr(this.nameServer);
producer.setProducerGroup(this.producerGroup);
producer.setSendMsgTimeout(this.sendMsgTimeout);
producer.setMaxMessageSize(this.maxMessageSize);
producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
producer.setRetryTimesWhenSendAsyncFailed(this.retryTimesWhenSendAsyncFailed);
return producer;
}
}
在编写消费者和生产者之前,我们先统一一下消息传输的对象,以及消费的结果
2.5 消息传输的对象和结果
- 基本传输对象
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class MessageDTO<T> {
private T data;
private Integer delayTime;
@NotBlank(message = "Topic must not be blank")
private String topic;
@NotBlank(message = "Key must not be blank")
private String key;
private List<String> tags;
private String messageType;
}
- 消费结果枚举
public enum MsgRetryStatus {
SUCCEED(-1), FAILURE(0), RETRY(0), RETRY_1S(1), RETRY_5S(2),
RETRY_10S(3), RETRY_30S(4), RETRY_1M(5), RETRY_2M(6), RETRY_3M(7),
RETRY_4M(8), RETRY_5M(9), RETRY_6M(10), RETRY_7M(11), RETRY_8M(12),
RETRY_9M(13), RETRY_10M(14), RETRY_20M(15), RETRY_30M(16), RETRY_1H(17), RETRY_2H(18),
RETRY_1D(19), RETRY_3D(20), RETRY_7D(21), RETRY_14D(22), RETRY_21D(23), RETRY_28D(24),
RETRY_35D(25);
int level;
MsgRetryStatus(int level) {
this.level = level;
}
public int getLevel() {
return level;
}
}
2.4 消息生产者
@Component
public class MessageProduct {
@Resource
private RocketMQTemplate rocketMqTemplate;
public SendResult SendMessage(MessageDTO data) {
// 创建一个Message对象
org.springframework.messaging.Message<?> message = MessageBuilder.withPayload(JSON.toJSONString(data))
.setHeader(RocketMQHeaders.KEYS,data.getKey())
.setHeader(RocketMQHeaders.TAGS,data.getTags())
.build();
return rocketMqTemplate.syncSendDelayTimeSeconds(data.getTopic(), message, data.getDelayTime());
}
}
2.5 消息消费者
在原始的实现中,我们通过实现RocketMQListener接口中的onMessage方法来完成消息的消费。
然而,这种方式存在一个问题:如果消息消费失败,我们无法获取到返回结果,也不便于进行错误处理和重试。
为了优化这个问题,我们可以使用@RocketMQMessageListener注解的returnTopic属性来指定一个返回主题,当消息消费失败时,将消息发送到这个返回主题中。
同时,我们可以创建一个专门的处理失败消息的消费者来处理这些返回的消息。
另外,我们还可以在onMessage方法中添加异常处理逻辑,以便在消费失败时进行错误处理和记录日志。
新增一个抽象类CommonConsumer去实现RocketMQListener,并提供一个有返回值的doConsumerProcess方法,去实现具体的消费逻辑,具体实现如下:
@Slf4j
@Component
public abstract class CommonConsumer implements RocketMQListener<MessageDTO> {
public void onMessage(MessageDTO message) {
log.info("收到延迟消息成功,消息体:{}", message);
doConsumerProcess(message);
}
public abstract MsgRetryStatus doConsumerProcess(MessageDTO messageDTO);
}
以boot-mq-topic为例,我们实现具体的消费,新增一个对象BootMqConsumer继承我们的CommonConsumer,来实现具体的消费逻辑
@Slf4j
@Component
public abstract class CommonConsumer implements RocketMQListener<MessageDTO> {
public void onMessage(MessageDTO message) {
try {
// 处理消息的逻辑
log.info("收到延迟消息成功,消息体:{}", message);
MsgRetryStatus msgRetryStatus = doConsumerProcess(message);
if (MsgRetryStatus.RETRY.equals(msgRetryStatus)
|| MsgRetryStatus.FAILURE.equals(msgRetryStatus)) {
//TODO 消费失败或重试 则发送重试Topic
}
} catch (Exception e) {
// 记录错误日志
e.printStackTrace();
// 可以选择将失败消息发送到指定Topic
this.sendReturnMessgae();
}
}
public abstract MsgRetryStatus doConsumerProcess(MessageDTO messageDTO);
private void doRetrytConsumerProcess() {
//TODO:待实现,后面补上
}
}
2.6 功能测试
package com.example.demo.controller;
import com.alibaba.fastjson.JSON;
import com.example.demo.annoation.Idempotent;
import com.example.demo.mq.producer.MessageProduct;
import com.example.demo.po.MessageDTO;
import com.example.demo.po.User;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.UUID;
@RestController
@Slf4j
@Validated
public class TestController01 {
@Resource
private RocketMQTemplate rocketMqTemplate;
@Resource
private DefaultMQProducer defaultMqProducer;
@Resource
private MessageProduct messageProduct;
@GetMapping("/send/msg4")
public String sendMsg4() {
try {
User user = User.builder()
.id(1)
.name("ninesun")
.build();
MessageDTO<User> messageDTO = MessageDTO.<User>builder()
.data(user)
.delayTime(3)
.topic("boot-mq-topic")
.key(String.valueOf(UUID.randomUUID()))
.build();
SendResult sendResult = messageProduct.SendMessage(messageDTO);
log.info("msgId:{},sendStatus:{},data:{}", sendResult.getMsgId(), sendResult.getSendStatus(), JSON.toJSONString(messageDTO));
} catch (Exception e) {
e.printStackTrace();
}
return "OK";
}
}
附:User对象
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class User {
private Integer id;
private String name;
private String desc;
}
访问:/send/msg4接口,结果如下:
至此,我们便可以在真实环境中很容易的去集成消息队列实现功能的解耦,流量的削峰。