前言:
在消息中间件领域中 RocketMQ 是一种非常常见的消息中间件了,并且是由阿⾥巴巴开源的消息中间件 ,本篇简单分享一下 Spring Boot 项目集成 RocketMQ 的过程。
RocketMQ 系列文章传送门
RocketMQ 的介绍及核心概念讲解
Spring Boot 集成 RocketMQ 可以分为三大步,如下:
- 在 proerties 或者 yml 文件中添加 RocketMQ 配置。
- 项目 pom.xml 文件中引入 rocketmq-spring-boot-starter 依赖。
- 注入 RocketMQTemplate 开始使用 RocketMQ ,其实这步以及算是使用了,不能算作集成了,但是集成了总归是要使用的,我把这里也算作一步了。
在 proerties 或者 yml 文件中添加 RabbitMQ 配置如下:
#RocketMQ 地址
rocketmq.name-server= xxx-xxx-rocketmq.xxxx.com:19876
#消费组
rocketmq.consumer.group= consumer-group
#一次拉取消息的最大数量 默认 10 条
rocketmq.consumer.pull-batch-size=10
#发送消息的组 同一类消息发送到同一个组中
rocketmq.producer.group= producer-group
#发送消息的超时时间 默认 3000 毫秒
rocketmq.producer.send-message-timeout=3000
#同步发送消息失败重试次数 默认2
rocketmq.producer.retry-times-when-send-failed=2
#异步发送消息失败重试次数 默认2
rocketmq.producer.retry-times-when-send-async-failed=2
#消息的大小 默认 4M
rocketmq.producer.max-message-size=4096
项目 pom.xml 文件中引入 rocketmq-spring-boot-starter 依赖如下:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
RocketMQ 使用
前文我们在分享 RocketMQ 核心概念的时候,我们知道了 RocketMQ 有同步消息、异步消息、顺序消息、延迟消息等,下面我们就根据消息的发送类型来演示 RocketMQ 的使用。
@RocketMQMessageListener 注解详解
我们在使用 RocketMQ 的时候有一个非常重要的注解 @RocketMQMessageListener,使用这个注解我们就可以轻松的完成 RocketMQ 消息的消费,这里对该注解的的属性进行解析,如下:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMQMessageListener {
// nameServer服务地址 多个用;隔开 可以直接在注解中指定也可以读取配置文件
String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
// ACL验证key 服务端开启了ACL时使用 可以直接在注解中指定也可以读取配置文件
String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
// ACL验证密钥 服务端开启了ACL时使用 可以直接在注解中指定也可以读取配置文件
String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
// 自定义的消息轨迹主题
String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
// 消费者分组 不同消费者分组名称不能重复
String consumerGroup();
// topic名称
String topic();
// selectorType 消息选择器类型
// 默认值 SelectorType.TAG 根据TAG选择
// 仅支持表达式格式如:“tag1 || tag2 || tag3” 如果表达式为null或者“*”标识订阅所有消息
// SelectorType.SQL92 根据SQL92表达式选择
SelectorType selectorType() default SelectorType.TAG;
String selectorExpression() default "*";
// 消费模式 可以选择并发或有序接收消息 默认并发消费模式 ConsumeMode.CONCURRENTLY
ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
// 控制消息模式 可以选择集群和广播 默认是集群 MessageModel.CLUSTERING
// 集群模式: 消息只会被一个消费者消费 广播模式:消息被所有消费者都消费一次
MessageModel messageModel() default MessageModel.CLUSTERING;
// 消费者最大线程数 在5.x版本该参数已经不推荐使用 因为该实现方式底层线程使用 LinkedBlockingQueue 作为阻塞队列 队列长度使用Integer.MAX_VALUE。
@Deprecated
int consumeThreadMax() default 64;
// 消费线程数 属于 rocketmq-spring-boot-starter 2.2.3新参数 推荐使用该版本
int consumeThreadNumber() default 20;
// 消费失败重试次数 在MessageModel.CLUSTERING模式中,-1表示16,消费失败后会重试16次
int maxReconsumeTimes() default -1;
// 最大消费时间 默认15分钟
long consumeTimeout() default 15L;
// 发送回复消息超时 默认3000毫秒
int replyTimeout() default 3000;
// ACL验证key 服务端开启了ACL时使用 可以直接在注解中指定也可以读取配置文件
String accessKey() default ACCESS_KEY_PLACEHOLDER;
// ACL验证密钥 服务端开启了ACL时使用 可以直接在注解中指定也可以读取配置文件
String secretKey() default SECRET_KEY_PLACEHOLDER;
// 切换消息跟踪的标志实例
boolean enableMsgTrace() default false;
// 自定义跟踪主题
String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
// nameServer服务地址 可以直接在注解中指定也可以读取配置文件
String nameServer() default NAME_SERVER_PLACEHOLDER;
// The property of "access-channel".
String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
// The property of "tlsEnable" default false.
String tlsEnable() default "false";
// 使用者的命名空间
String namespace() default "";
// 并发模式下的消息消耗重试策略 下次消费时的延迟级别
int delayLevelWhenNextConsume() default 0;
// 以有序模式暂停拉入的间隔 以毫秒为单位 最小值为10 最大值为30000 默认1000毫秒
int suspendCurrentQueueTimeMillis() default 1000;
// 关闭使用者时等待消息消耗的最长时间 以毫秒为单位 最小值为0 默认1000毫秒
int awaitTerminationMillisWhenShutdown() default 1000;
// 实例名称
String instanceName() default "DEFAULT";
}
RocketMQ 发送单向消息
单向消息是指生产者 Producer 向 Broker 发送消息,执行发送消息的 API 后直接返回,不关注 Broker 的结果,简单说就负责发送消息不关注消息是否发送成功,这种模式的优点是发生消息耗时非常低,一般在微妙级别,通常用在消息可靠性要求不高的场景,例如记录日志等场景,下面我们来演示一下 RocketMQ 单向消息的发送。
单向消息生产者代码如下:
package com.order.service.rocketmq.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class OneWayMessageProducer {
@Autowired
private RocketMQTemplate rocketMqTemplate;
//单向消息发送
public void sendOneWayMessage(String message){
rocketMqTemplate.sendOneWay("one-way-topic", MessageBuilder.withPayload(message).build());
}
}
单向消息消费者代码如下:
package com.order.service.rocketmq.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "one-way-group", topic = "one-way-topic")
public class OneWayMessageCousumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("单向消息消费成功:{}", message);
}
}
触发单向消息发送代码如下:
@GetMapping("/send-one-way-message")
public String sendOneWayMessage(@RequestParam String message){
oneWayMessageProducer.sendOneWayMessage(message);
return "success";
}
单向消息发送测试结果如下:
2024-10-10 19:51:47.144 INFO 15172 --- [MessageThread_1] c.o.s.r.consumer.OneWayMessageCousumer : 单向消息消费成功:send-one-way-message
RocketMQ 发送同步消息
发送同步消息是指生产者 Producer 向 Broker 发送消息,执行发送消息的 API 后同步等待, 直到 Broker 返回发送结
果,因为有等待动作,很明显发送同步消息会阻塞线程,因此性能相对会差一些,但是同步消息的可靠性高,因此这种方式得到广泛使用,例如短信通知,邮件通知,站内消息等场景。
同步消息生产者代码如下:
package com.order.service.rocketmq.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@Component
public class SyncMessageProducer {
@Autowired
private RocketMQTemplate rocketMqTemplate;
/**
* @param message:
* @date 2024/10/10 17:47
* @description 同步消息发送
*/
public void sendSyncMessage(String message) {
rocketMqTemplate.syncSend("sync-topic", MessageBuilder.withPayload(message).build());
}
/**
* @param message:
* @date 2024/10/10 17:47
* @description 批量发送同步消息
*/
public void sendSyncMessageBatch(String message) {
Message<String> build = MessageBuilder.withPayload(message).build();
List<Message<String>> msgList = new ArrayList<>();
msgList.add(build);
msgList.add(build);
msgList.add(build);
msgList.add(build);
rocketMqTemplate.syncSend("sync-topic", msgList);
}
/**
* @param message:
* @date 2024/10/10 17:47
* @description 发送同步消息设置超时时间 超时时间 1毫秒
*/
public void sendSyncMessageTimeout(String message) {
//超时时间为 1 毫秒
rocketMqTemplate.syncSend("sync-topic", MessageBuilder.withPayload(message).build(), 200);
}
/**
* @param message:
* @date 2024/10/10 17:47
* @description 批量发送同步消息 超时时间 1毫秒
*/
public void sendSyncMessageBatchTimeout(String message) {
Message<String> build = MessageBuilder.withPayload(message).build();
List<Message<String>> msgList = new ArrayList<>();
msgList.add(build);
msgList.add(build);
msgList.add(build);
msgList.add(build);
rocketMqTemplate.syncSend("sync-topic", msgList, 200);
}
}
在上面的同步消息发送代码中一共有四个方法,分别实现了同步消息发送、同步消息批量发送、带超时时间的同步消息发送、带超时时间的同步消息批量发送。
同步消息消息消费者代码如下:
package com.order.service.rocketmq.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "sync-group", topic = "sync-topic")
public class SyncMessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("同步消息消费成功:{}", message);
}
}
触发单向消息发送代码如下:
@GetMapping("/send-sync-message")
public String sendSyncMessage(@RequestParam String message){
syncMessageProducer.sendSyncMessage(message);
return "success";
}
@GetMapping("/send-sync-message-batch")
public String sendSyncMessageBatch(@RequestParam String message){
syncMessageProducer.sendSyncMessageBatch(message);
return "success";
}
@GetMapping("/send-sync-message-timeout")
public String sendSyncMessageTimeout(@RequestParam String message){
syncMessageProducer.sendSyncMessageTimeout(message);
return "success";
}
@GetMapping("/send-sync-message-batch-timeout")
public String sendSyncMessageBatchTimeout(@RequestParam String message){
syncMessageProducer.sendSyncMessageBatchTimeout(message);
return "success";
}
同步消息发送测试结果如下:
2024-10-14 14:37:22.346 INFO 26640 --- [MessageThread_1] c.o.s.r.consumer.SyncMessageConsumer : 同步消息消费成功:send-sync-message
结果验证符合预期。
同步消息批量发送测试结果如下:
2024-10-14 14:38:04.120 INFO 26640 --- [MessageThread_4] c.o.s.r.consumer.SyncMessageConsumer : 同步消息消费成功:send-sync-message-batch
2024-10-14 14:38:04.120 INFO 26640 --- [MessageThread_3] c.o.s.r.consumer.SyncMessageConsumer : 同步消息消费成功:send-sync-message-batch
2024-10-14 14:38:04.120 INFO 26640 --- [MessageThread_5] c.o.s.r.consumer.SyncMessageConsumer : 同步消息消费成功:send-sync-message-batch
2024-10-14 14:38:04.122 INFO 26640 --- [MessageThread_6] c.o.s.r.consumer.SyncMessageConsumer : 同步消息消费成功:send-sync-message-batch
结果验证符合预期。
带超时时间同步消息发送测试结果如下:
2024-10-14 14:46:07.889 INFO 16760 --- [MessageThread_1] c.o.s.r.consumer.SyncMessageConsumer : 同步消息消费成功:send-sync-message-timeout
结果验证符合预期,如果想要验证超时效果,直接把超时时间设置的小一点即可,后面我会统一演示超时效果。
带超时时间同步消息批量发送测试结果如下:
2024-10-14 14:47:05.539 INFO 16760 --- [MessageThread_3] c.o.s.r.consumer.SyncMessageConsumer : 同步消息消费成功:send-sync-message-batch-timeout
2024-10-14 14:47:05.539 INFO 16760 --- [MessageThread_4] c.o.s.r.consumer.SyncMessageConsumer : 同步消息消费成功:send-sync-message-batch-timeout
2024-10-14 14:47:05.539 INFO 16760 --- [MessageThread_5] c.o.s.r.consumer.SyncMessageConsumer : 同步消息消费成功:send-sync-message-batch-timeout
2024-10-14 14:47:05.539 INFO 16760 --- [MessageThread_2] c.o.s.r.consumer.SyncMessageConsumer : 同步消息消费成功:send-sync-message-batch-timeout
结果验证符合预期,如果想要验证超时效果,直接把超时时间设置的小一点即可,后面我会统一演示超时效果。
我们来演示一下超时效果,我们把超时时间修改为 10 毫秒时候,带超时时间同步消息发送测试结果如下:
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:667) ~[rocketmq-client-4.8.0.jar:4.8.0]
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1343) ~[rocketmq-client-4.8.0.jar:4.8.0]
at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:344) ~[rocketmq-client-4.8.0.jar:4.8.0]
at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:555) ~[rocketmq-spring-boot-2.2.0.jar:2.2.0]
at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:484) ~[rocketmq-spring-boot-2.2.0.jar:2.2.0]
at com.order.service.rocketmq.producer.SyncMessageProducer.sendSyncMessageTimeout(SyncMessageProducer.java:57) ~[classes/:na]
at com.order.service.controller.RocketMqController.sendSyncMessageTimeout(RocketMqController.java:47) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_121]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_121]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_121]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_121]
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197) ~[spring-web-5.3.6.jar:5.3.6]
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141) ~[spring-web-5.3.6.jar:5.3.6]
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106) ~[spring-webmvc-5.3.6.jar:5.3.6]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894) ~[spring-webmvc-5.3.6.jar:5.3.6]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.6.jar:5.3.6]
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.6.jar:5.3.6]
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1060) ~[spring-webmvc-5.3.6.jar:5.3.6]
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:962) ~[spring-webmvc-5.3.6.jar:5.3.6]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.6.jar:5.3.6]
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) ~[spring-webmvc-5.3.6.jar:5.3.6]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:626) ~[tomcat-embed-core-9.0.45.jar:4.0.FR]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.6.jar:5.3.6]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:733) ~[tomcat-embed-core-9.0.45.jar:4.0.FR]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.45.jar:9.0.45]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.3.6.jar:5.3.6]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.3.6.jar:5.3.6]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:93) ~[spring-boot-actuator-2.4.5.jar:2.4.5]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.3.6.jar:5.3.6]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) [tomcat-embed-core-9.0.45.jar:9.0.45]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542) [tomcat-embed-core-9.0.45.jar:9.0.45]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143) [tomcat-embed-core-9.0.45.jar:9.0.45]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.45.jar:9.0.45]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) [tomcat-embed-core-9.0.45.jar:9.0.45]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357) [tomcat-embed-core-9.0.45.jar:9.0.45]
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374) [tomcat-embed-core-9.0.45.jar:9.0.45]
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.45.jar:9.0.45]
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893) [tomcat-embed-core-9.0.45.jar:9.0.45]
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1707) [tomcat-embed-core-9.0.45.jar:9.0.45]
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.45.jar:9.0.45]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_121]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.45.jar:9.0.45]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
很明显提示超时了,超时测试依赖服务器的性能,因此如果想测试到理想的超时结果,建议将超时时间往小了设置。
RocketMQ 发送异步消息
发送异步消息是指生产者 Producer 向 Broker 发送消息,发送消息时指定消息发送成功及发送异常的回调方法,执行发送消息的 API 后立即返回,Producer 发送消息线程无需等待、不阻塞,对比同步消息,很明显异步消息的性能会更高,可靠性会略差,适用于对响应时间要求高的场景。
异步消息生产者代码如下:
package com.order.service.rocketmq.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@Component
public class AsyncMessageProducer {
@Autowired
private RocketMQTemplate rocketMqTemplate;
/**
* @param message:
* @date 2024/200/200 17:47
* @description 异步消息发送
*/
public void sendAsyncMessage(String message) {
rocketMqTemplate.asyncSend("async-topic", MessageBuilder.withPayload(message).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("普通异步消息发送成功");
}
@Override
public void onException(Throwable throwable) {
log.info("普通异步消息发送失败");
}
});
}
/**
* @param message:
* @date 2024/200/200 17:47
* @description 批量发送异步消息
*/
public void sendAsyncMessageBatch(String message) {
Message<String> build = MessageBuilder.withPayload(message).build();
List<Message<String>> msgList = new ArrayList<>();
msgList.add(build);
msgList.add(build);
msgList.add(build);
msgList.add(build);
rocketMqTemplate.asyncSend("async-topic", msgList, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("批量异步消息发送成功");
}
@Override
public void onException(Throwable throwable) {
log.info("批量异步消息发送失败");
}
});
}
/**
* @param message:
* @date 2024/200/200 17:47
* @description 发送异步消息设置超时时间 超时时间 1毫秒
*/
public void sendAsyncMessageTimeout(String message) {
//超时时间为 1 毫秒
rocketMqTemplate.asyncSend("async-topic", MessageBuilder.withPayload(message).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("普通异步带超时消息发送成功");
}
@Override
public void onException(Throwable throwable) {
log.info("普通异步带超时消息发送失败");
}
}, 200);
}
/**
* @param message:
* @date 2024/200/200 17:47
* @description 批量发送异步消息 超时时间 1毫秒
*/
public void sendAsyncMessageBatchTimeout(String message) {
Message<String> build = MessageBuilder.withPayload(message).build();
List<Message<String>> msgList = new ArrayList<>();
msgList.add(build);
msgList.add(build);
msgList.add(build);
msgList.add(build);
rocketMqTemplate.asyncSend("async-topic", msgList, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("批量异步带超时消息发送成功");
}
@Override
public void onException(Throwable throwable) {
log.info("批量异步带超时消息发送失败");
}
}, 200);
}
}
异步消息消费者代码如下:
package com.order.service.rocketmq.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "async-group", topic = "async-topic")
public class AsyncMessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("异步消息消费成功:{}", message);
}
}
异步消息发送测试结果如下:
2024-10-14 15:15:08.215 INFO 16760 --- [ublicExecutor_1] c.o.s.r.producer.AsyncMessageProducer : 普通异步消息发送成功
2024-10-14 15:15:08.222 INFO 16760 --- [MessageThread_1] c.o.s.r.consumer.AsyncMessageConsumer : 异步消息消费成功:send-async-message
结果验证符合预期。
批量异步消息发送测试结果如下:
2024-10-14 15:15:39.681 INFO 16760 --- [ublicExecutor_2] c.o.s.r.producer.AsyncMessageProducer : 批量异步消息发送成功
2024-10-14 15:15:39.682 INFO 16760 --- [MessageThread_2] c.o.s.r.consumer.AsyncMessageConsumer : 异步消息消费成功:[{"payload":"send-async-message-batch","headers":{"id":"4bda5246-fbe0-ef6c-cd27-f5ae9123e01d","timestamp":1728890139672}},{"payload":"send-async-message-batch","headers":{"id":"4bda5246-fbe0-ef6c-cd27-f5ae9123e01d","timestamp":1728890139672}},{"payload":"send-async-message-batch","headers":{"id":"4bda5246-fbe0-ef6c-cd27-f5ae9123e01d","timestamp":1728890139672}},{"payload":"send-async-message-batch","headers":{"id":"4bda5246-fbe0-ef6c-cd27-f5ae9123e01d","timestamp":1728890139672}}]
结果验证符合预期。
异步带超时消息发送测试结果如下:
2024-10-14 15:16:20.643 INFO 16760 --- [ublicExecutor_3] c.o.s.r.producer.AsyncMessageProducer : 普通异步带超时消息发送成功
2024-10-14 15:16:20.650 INFO 16760 --- [MessageThread_3] c.o.s.r.consumer.AsyncMessageConsumer : 异步消息消费成功:send-async-message-timeout
结果验证符合预期。
批量异步带超时消息发送测试结果如下:
2024-10-14 15:16:43.326 INFO 16760 --- [ublicExecutor_4] c.o.s.r.producer.AsyncMessageProducer : 批量异步带超时消息发送成功
2024-10-14 15:16:43.327 INFO 16760 --- [MessageThread_4] c.o.s.r.consumer.AsyncMessageConsumer : 异步消息消费成功:[{"payload":"send-async-message-batch-timeout","headers":{"id":"a518ded2-c9e1-9588-58bf-5cd8ddd00024","timestamp":1728890203322}},{"payload":"send-async-message-batch-timeout","headers":{"id":"a518ded2-c9e1-9588-58bf-5cd8ddd00024","timestamp":1728890203322}},{"payload":"send-async-message-batch-timeout","headers":{"id":"a518ded2-c9e1-9588-58bf-5cd8ddd00024","timestamp":1728890203322}},{"payload":"send-async-message-batch-timeout","headers":{"id":"a518ded2-c9e1-9588-58bf-5cd8ddd00024","timestamp":1728890203322}}]
结果验证符合预期。
超时场景这里不再测试了,如果想验证超时效果,只需要将超时时间设置的尽可能小一点即可。
总结:本篇简单分享了 Spring 整合 RocketMQ,并完成单向消息、同步消息、异步消息的案例演示,在实际业务中只需要对案例代码进行丰富填充业务逻辑即可,希望可以帮助到大家,后面会持续分享延时消息、顺序消息、事务消息的使用案例。
如有不正确的地方欢迎各位指出纠正。