Spring Boot 整合 RocketMQ 之普通消息

前言:

在消息中间件领域中 RocketMQ 是一种非常常见的消息中间件了,并且是由阿⾥巴巴开源的消息中间件 ,本篇简单分享一下 Spring Boot 项目集成 RocketMQ 的过程。

RocketMQ 系列文章传送门

RocketMQ 的介绍及核心概念讲解

Spring Boot 集成 RocketMQ 可以分为三大步,如下:

  • 在 proerties 或者 yml 文件中添加 RocketMQ 配置。
  • 项目 pom.xml 文件中引入 rocketmq-spring-boot-starter 依赖。
  • 注入 RocketMQTemplate 开始使用 RocketMQ ,其实这步以及算是使用了,不能算作集成了,但是集成了总归是要使用的,我把这里也算作一步了。

在 proerties 或者 yml 文件中添加 RabbitMQ 配置如下:

#RocketMQ 地址
rocketmq.name-server= xxx-xxx-rocketmq.xxxx.com:19876
#消费组
rocketmq.consumer.group= consumer-group
#一次拉取消息的最大数量 默认 10 条
rocketmq.consumer.pull-batch-size=10
#发送消息的组 同一类消息发送到同一个组中
rocketmq.producer.group= producer-group
#发送消息的超时时间 默认 3000 毫秒
rocketmq.producer.send-message-timeout=3000
#同步发送消息失败重试次数 默认2
rocketmq.producer.retry-times-when-send-failed=2
#异步发送消息失败重试次数 默认2
rocketmq.producer.retry-times-when-send-async-failed=2
#消息的大小 默认 4M
rocketmq.producer.max-message-size=4096

项目 pom.xml 文件中引入 rocketmq-spring-boot-starter 依赖如下:

<dependency>
	<groupId>org.apache.rocketmq</groupId>
	<artifactId>rocketmq-spring-boot-starter</artifactId>
	<version>2.2.3</version>
</dependency>

RocketMQ 使用

前文我们在分享 RocketMQ 核心概念的时候,我们知道了 RocketMQ 有同步消息、异步消息、顺序消息、延迟消息等,下面我们就根据消息的发送类型来演示 RocketMQ 的使用。

@RocketMQMessageListener 注解详解

我们在使用 RocketMQ 的时候有一个非常重要的注解 @RocketMQMessageListener,使用这个注解我们就可以轻松的完成 RocketMQ 消息的消费,这里对该注解的的属性进行解析,如下:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMQMessageListener {

	// nameServer服务地址 多个用;隔开 可以直接在注解中指定也可以读取配置文件
    String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
	
    // ACL验证key 服务端开启了ACL时使用 可以直接在注解中指定也可以读取配置文件
    String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
	
    // ACL验证密钥 服务端开启了ACL时使用 可以直接在注解中指定也可以读取配置文件
    String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
	
    // 自定义的消息轨迹主题
    String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
	
    String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";

    // 消费者分组 不同消费者分组名称不能重复
    String consumerGroup();

    // topic名称
    String topic();

    // selectorType 消息选择器类型
    // 默认值 SelectorType.TAG 根据TAG选择
	// 仅支持表达式格式如:“tag1 || tag2 || tag3” 如果表达式为null或者“*”标识订阅所有消息
	// SelectorType.SQL92 根据SQL92表达式选择
    SelectorType selectorType() default SelectorType.TAG;


    String selectorExpression() default "*";

    // 消费模式 可以选择并发或有序接收消息 默认并发消费模式 ConsumeMode.CONCURRENTLY
    ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;

    // 控制消息模式 可以选择集群和广播 默认是集群 MessageModel.CLUSTERING
    // 集群模式: 消息只会被一个消费者消费 广播模式:消息被所有消费者都消费一次
    MessageModel messageModel() default MessageModel.CLUSTERING;

    // 消费者最大线程数 在5.x版本该参数已经不推荐使用 因为该实现方式底层线程使用 LinkedBlockingQueue 作为阻塞队列 队列长度使用Integer.MAX_VALUE。
    @Deprecated
    int consumeThreadMax() default 64;

    // 消费线程数 属于 rocketmq-spring-boot-starter 2.2.3新参数 推荐使用该版本
    int consumeThreadNumber() default 20;

    // 消费失败重试次数 在MessageModel.CLUSTERING模式中,-1表示16,消费失败后会重试16次
    int maxReconsumeTimes() default -1;

    // 最大消费时间 默认15分钟
    long consumeTimeout() default 15L;

    // 发送回复消息超时 默认3000毫秒
    int replyTimeout() default 3000;

    // ACL验证key 服务端开启了ACL时使用 可以直接在注解中指定也可以读取配置文件
    String accessKey() default ACCESS_KEY_PLACEHOLDER;

    // ACL验证密钥 服务端开启了ACL时使用 可以直接在注解中指定也可以读取配置文件
    String secretKey() default SECRET_KEY_PLACEHOLDER;

    // 切换消息跟踪的标志实例
    boolean enableMsgTrace() default false;

    // 自定义跟踪主题
    String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;

    // nameServer服务地址 可以直接在注解中指定也可以读取配置文件
    String nameServer() default NAME_SERVER_PLACEHOLDER;

    // The property of "access-channel".
    String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
 
    // The property of "tlsEnable" default false.
    String tlsEnable() default "false";

    // 使用者的命名空间
    String namespace() default "";

    // 并发模式下的消息消耗重试策略 下次消费时的延迟级别
    int delayLevelWhenNextConsume() default 0;

    // 以有序模式暂停拉入的间隔 以毫秒为单位 最小值为10 最大值为30000 默认1000毫秒
    int suspendCurrentQueueTimeMillis() default 1000;

    // 关闭使用者时等待消息消耗的最长时间 以毫秒为单位  最小值为0 默认1000毫秒
    int awaitTerminationMillisWhenShutdown() default 1000;

    // 实例名称
    String instanceName() default "DEFAULT";
}

RocketMQ 发送单向消息

单向消息是指生产者 Producer 向 Broker 发送消息,执行发送消息的 API 后直接返回,不关注 Broker 的结果,简单说就负责发送消息不关注消息是否发送成功,这种模式的优点是发生消息耗时非常低,一般在微妙级别,通常用在消息可靠性要求不高的场景,例如记录日志等场景,下面我们来演示一下 RocketMQ 单向消息的发送。

单向消息生产者代码如下:

package com.order.service.rocketmq.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class OneWayMessageProducer {

    @Autowired
    private RocketMQTemplate rocketMqTemplate;

    //单向消息发送
    public void sendOneWayMessage(String message){
        rocketMqTemplate.sendOneWay("one-way-topic", MessageBuilder.withPayload(message).build());
    }

}

单向消息消费者代码如下:

package com.order.service.rocketmq.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;


@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "one-way-group", topic = "one-way-topic")
public class OneWayMessageCousumer implements RocketMQListener<String> {


    @Override
    public void onMessage(String message) {
        log.info("单向消息消费成功:{}", message);
    }
}

触发单向消息发送代码如下:

@GetMapping("/send-one-way-message")
public String sendOneWayMessage(@RequestParam String message){
	oneWayMessageProducer.sendOneWayMessage(message);
	return "success";
}

单向消息发送测试结果如下:

2024-10-10 19:51:47.144  INFO 15172 --- [MessageThread_1] c.o.s.r.consumer.OneWayMessageCousumer   : 单向消息消费成功:send-one-way-message

RocketMQ 发送同步消息

发送同步消息是指生产者 Producer 向 Broker 发送消息,执行发送消息的 API 后同步等待, 直到 Broker 返回发送结
果,因为有等待动作,很明显发送同步消息会阻塞线程,因此性能相对会差一些,但是同步消息的可靠性高,因此这种方式得到广泛使用,例如短信通知,邮件通知,站内消息等场景。

同步消息生产者代码如下:

package com.order.service.rocketmq.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;

@Slf4j
@Component
public class SyncMessageProducer {

    @Autowired
    private RocketMQTemplate rocketMqTemplate;

    /**
     * @param message:
     * @date 2024/10/10 17:47
     * @description 同步消息发送
     */
    public void sendSyncMessage(String message) {
        rocketMqTemplate.syncSend("sync-topic", MessageBuilder.withPayload(message).build());
    }

    /**
     * @param message:
     * @date 2024/10/10 17:47
     * @description 批量发送同步消息
     */
    public void sendSyncMessageBatch(String message) {
        Message<String> build = MessageBuilder.withPayload(message).build();
        List<Message<String>> msgList = new ArrayList<>();
        msgList.add(build);
        msgList.add(build);
        msgList.add(build);
        msgList.add(build);
        rocketMqTemplate.syncSend("sync-topic", msgList);
    }

    /**
     * @param message:
     * @date 2024/10/10 17:47
     * @description 发送同步消息设置超时时间 超时时间 1毫秒
     */
    public void sendSyncMessageTimeout(String message) {
        //超时时间为 1 毫秒
        rocketMqTemplate.syncSend("sync-topic", MessageBuilder.withPayload(message).build(), 200);
    }

    /**
     * @param message:
     * @date 2024/10/10 17:47
     * @description 批量发送同步消息 超时时间 1毫秒
     */
    public void sendSyncMessageBatchTimeout(String message) {
        Message<String> build = MessageBuilder.withPayload(message).build();
        List<Message<String>> msgList = new ArrayList<>();
        msgList.add(build);
        msgList.add(build);
        msgList.add(build);
        msgList.add(build);
        rocketMqTemplate.syncSend("sync-topic", msgList, 200);
    }

}

在上面的同步消息发送代码中一共有四个方法,分别实现了同步消息发送、同步消息批量发送、带超时时间的同步消息发送、带超时时间的同步消息批量发送。

同步消息消息消费者代码如下:

package com.order.service.rocketmq.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "sync-group", topic = "sync-topic")
public class SyncMessageConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("同步消息消费成功:{}", message);
    }
}

触发单向消息发送代码如下:

@GetMapping("/send-sync-message")
public String sendSyncMessage(@RequestParam String message){
	syncMessageProducer.sendSyncMessage(message);
	return "success";
}

@GetMapping("/send-sync-message-batch")
public String sendSyncMessageBatch(@RequestParam String message){
	syncMessageProducer.sendSyncMessageBatch(message);
	return "success";
}

@GetMapping("/send-sync-message-timeout")
public String sendSyncMessageTimeout(@RequestParam String message){
	syncMessageProducer.sendSyncMessageTimeout(message);
	return "success";
}

@GetMapping("/send-sync-message-batch-timeout")
public String sendSyncMessageBatchTimeout(@RequestParam String message){
	syncMessageProducer.sendSyncMessageBatchTimeout(message);
	return "success";
}

同步消息发送测试结果如下:

2024-10-14 14:37:22.346  INFO 26640 --- [MessageThread_1] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message

结果验证符合预期。

同步消息批量发送测试结果如下:

2024-10-14 14:38:04.120  INFO 26640 --- [MessageThread_4] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-batch
2024-10-14 14:38:04.120  INFO 26640 --- [MessageThread_3] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-batch
2024-10-14 14:38:04.120  INFO 26640 --- [MessageThread_5] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-batch
2024-10-14 14:38:04.122  INFO 26640 --- [MessageThread_6] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-batch

结果验证符合预期。

带超时时间同步消息发送测试结果如下:

2024-10-14 14:46:07.889  INFO 16760 --- [MessageThread_1] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-timeout

结果验证符合预期,如果想要验证超时效果,直接把超时时间设置的小一点即可,后面我会统一演示超时效果。

带超时时间同步消息批量发送测试结果如下:

2024-10-14 14:47:05.539  INFO 16760 --- [MessageThread_3] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-batch-timeout
2024-10-14 14:47:05.539  INFO 16760 --- [MessageThread_4] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-batch-timeout
2024-10-14 14:47:05.539  INFO 16760 --- [MessageThread_5] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-batch-timeout
2024-10-14 14:47:05.539  INFO 16760 --- [MessageThread_2] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-batch-timeout

结果验证符合预期,如果想要验证超时效果,直接把超时时间设置的小一点即可,后面我会统一演示超时效果。

我们来演示一下超时效果,我们把超时时间修改为 10 毫秒时候,带超时时间同步消息发送测试结果如下:

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:667) ~[rocketmq-client-4.8.0.jar:4.8.0]
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1343) ~[rocketmq-client-4.8.0.jar:4.8.0]
	at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:344) ~[rocketmq-client-4.8.0.jar:4.8.0]
	at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:555) ~[rocketmq-spring-boot-2.2.0.jar:2.2.0]
	at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:484) ~[rocketmq-spring-boot-2.2.0.jar:2.2.0]
	at com.order.service.rocketmq.producer.SyncMessageProducer.sendSyncMessageTimeout(SyncMessageProducer.java:57) ~[classes/:na]
	at com.order.service.controller.RocketMqController.sendSyncMessageTimeout(RocketMqController.java:47) ~[classes/:na]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_121]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_121]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_121]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_121]
	at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197) ~[spring-web-5.3.6.jar:5.3.6]
	at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141) ~[spring-web-5.3.6.jar:5.3.6]
	at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106) ~[spring-webmvc-5.3.6.jar:5.3.6]
	at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894) ~[spring-webmvc-5.3.6.jar:5.3.6]
	at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.6.jar:5.3.6]
	at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.6.jar:5.3.6]
	at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1060) ~[spring-webmvc-5.3.6.jar:5.3.6]
	at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:962) ~[spring-webmvc-5.3.6.jar:5.3.6]
	at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.6.jar:5.3.6]
	at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) ~[spring-webmvc-5.3.6.jar:5.3.6]
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:626) ~[tomcat-embed-core-9.0.45.jar:4.0.FR]
	at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.6.jar:5.3.6]
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:733) ~[tomcat-embed-core-9.0.45.jar:4.0.FR]
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
	at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.45.jar:9.0.45]
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
	at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.3.6.jar:5.3.6]
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
	at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.3.6.jar:5.3.6]
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
	at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:93) ~[spring-boot-actuator-2.4.5.jar:2.4.5]
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
	at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.3.6.jar:5.3.6]
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
	at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
	at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) [tomcat-embed-core-9.0.45.jar:9.0.45]
	at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542) [tomcat-embed-core-9.0.45.jar:9.0.45]
	at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143) [tomcat-embed-core-9.0.45.jar:9.0.45]
	at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.45.jar:9.0.45]
	at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) [tomcat-embed-core-9.0.45.jar:9.0.45]
	at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357) [tomcat-embed-core-9.0.45.jar:9.0.45]
	at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374) [tomcat-embed-core-9.0.45.jar:9.0.45]
	at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.45.jar:9.0.45]
	at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893) [tomcat-embed-core-9.0.45.jar:9.0.45]
	at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1707) [tomcat-embed-core-9.0.45.jar:9.0.45]
	at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.45.jar:9.0.45]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_121]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_121]
	at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.45.jar:9.0.45]
	at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]

很明显提示超时了,超时测试依赖服务器的性能,因此如果想测试到理想的超时结果,建议将超时时间往小了设置。

RocketMQ 发送异步消息

发送异步消息是指生产者 Producer 向 Broker 发送消息,发送消息时指定消息发送成功及发送异常的回调方法,执行发送消息的 API 后立即返回,Producer 发送消息线程无需等待、不阻塞,对比同步消息,很明显异步消息的性能会更高,可靠性会略差,适用于对响应时间要求高的场景。

异步消息生产者代码如下:

package com.order.service.rocketmq.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;


@Slf4j
@Component
public class AsyncMessageProducer {

    @Autowired
    private RocketMQTemplate rocketMqTemplate;

    /**
     * @param message:
     * @date 2024/200/200 17:47
     * @description 异步消息发送
     */
    public void sendAsyncMessage(String message) {
        rocketMqTemplate.asyncSend("async-topic", MessageBuilder.withPayload(message).build(), new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("普通异步消息发送成功");
            }

            @Override
            public void onException(Throwable throwable) {
                log.info("普通异步消息发送失败");
            }
        });
    }

    /**
     * @param message:
     * @date 2024/200/200 17:47
     * @description 批量发送异步消息
     */
    public void sendAsyncMessageBatch(String message) {
        Message<String> build = MessageBuilder.withPayload(message).build();
        List<Message<String>> msgList = new ArrayList<>();
        msgList.add(build);
        msgList.add(build);
        msgList.add(build);
        msgList.add(build);
        rocketMqTemplate.asyncSend("async-topic", msgList, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("批量异步消息发送成功");
            }

            @Override
            public void onException(Throwable throwable) {
                log.info("批量异步消息发送失败");
            }
        });
    }

    /**
     * @param message:
     * @date 2024/200/200 17:47
     * @description 发送异步消息设置超时时间 超时时间 1毫秒
     */
    public void sendAsyncMessageTimeout(String message) {
        //超时时间为 1 毫秒
        rocketMqTemplate.asyncSend("async-topic", MessageBuilder.withPayload(message).build(), new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("普通异步带超时消息发送成功");
            }

            @Override
            public void onException(Throwable throwable) {
                log.info("普通异步带超时消息发送失败");
            }
        }, 200);
    }

    /**
     * @param message:
     * @date 2024/200/200 17:47
     * @description 批量发送异步消息 超时时间 1毫秒
     */
    public void sendAsyncMessageBatchTimeout(String message) {
        Message<String> build = MessageBuilder.withPayload(message).build();
        List<Message<String>> msgList = new ArrayList<>();
        msgList.add(build);
        msgList.add(build);
        msgList.add(build);
        msgList.add(build);
        rocketMqTemplate.asyncSend("async-topic", msgList, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("批量异步带超时消息发送成功");
            }

            @Override
            public void onException(Throwable throwable) {
                log.info("批量异步带超时消息发送失败");
            }
        }, 200);
    }

}

异步消息消费者代码如下:

package com.order.service.rocketmq.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "async-group", topic = "async-topic")
public class AsyncMessageConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("异步消息消费成功:{}", message);
    }
}

异步消息发送测试结果如下:

2024-10-14 15:15:08.215  INFO 16760 --- [ublicExecutor_1] c.o.s.r.producer.AsyncMessageProducer    : 普通异步消息发送成功
2024-10-14 15:15:08.222  INFO 16760 --- [MessageThread_1] c.o.s.r.consumer.AsyncMessageConsumer    : 异步消息消费成功:send-async-message

结果验证符合预期。

批量异步消息发送测试结果如下:

2024-10-14 15:15:39.681  INFO 16760 --- [ublicExecutor_2] c.o.s.r.producer.AsyncMessageProducer    : 批量异步消息发送成功
2024-10-14 15:15:39.682  INFO 16760 --- [MessageThread_2] c.o.s.r.consumer.AsyncMessageConsumer    : 异步消息消费成功:[{"payload":"send-async-message-batch","headers":{"id":"4bda5246-fbe0-ef6c-cd27-f5ae9123e01d","timestamp":1728890139672}},{"payload":"send-async-message-batch","headers":{"id":"4bda5246-fbe0-ef6c-cd27-f5ae9123e01d","timestamp":1728890139672}},{"payload":"send-async-message-batch","headers":{"id":"4bda5246-fbe0-ef6c-cd27-f5ae9123e01d","timestamp":1728890139672}},{"payload":"send-async-message-batch","headers":{"id":"4bda5246-fbe0-ef6c-cd27-f5ae9123e01d","timestamp":1728890139672}}]

结果验证符合预期。

异步带超时消息发送测试结果如下:

2024-10-14 15:16:20.643  INFO 16760 --- [ublicExecutor_3] c.o.s.r.producer.AsyncMessageProducer    : 普通异步带超时消息发送成功
2024-10-14 15:16:20.650  INFO 16760 --- [MessageThread_3] c.o.s.r.consumer.AsyncMessageConsumer    : 异步消息消费成功:send-async-message-timeout

结果验证符合预期。

批量异步带超时消息发送测试结果如下:

2024-10-14 15:16:43.326  INFO 16760 --- [ublicExecutor_4] c.o.s.r.producer.AsyncMessageProducer    : 批量异步带超时消息发送成功
2024-10-14 15:16:43.327  INFO 16760 --- [MessageThread_4] c.o.s.r.consumer.AsyncMessageConsumer    : 异步消息消费成功:[{"payload":"send-async-message-batch-timeout","headers":{"id":"a518ded2-c9e1-9588-58bf-5cd8ddd00024","timestamp":1728890203322}},{"payload":"send-async-message-batch-timeout","headers":{"id":"a518ded2-c9e1-9588-58bf-5cd8ddd00024","timestamp":1728890203322}},{"payload":"send-async-message-batch-timeout","headers":{"id":"a518ded2-c9e1-9588-58bf-5cd8ddd00024","timestamp":1728890203322}},{"payload":"send-async-message-batch-timeout","headers":{"id":"a518ded2-c9e1-9588-58bf-5cd8ddd00024","timestamp":1728890203322}}]

结果验证符合预期。

超时场景这里不再测试了,如果想验证超时效果,只需要将超时时间设置的尽可能小一点即可。

总结:本篇简单分享了 Spring 整合 RocketMQ,并完成单向消息、同步消息、异步消息的案例演示,在实际业务中只需要对案例代码进行丰富填充业务逻辑即可,希望可以帮助到大家,后面会持续分享延时消息、顺序消息、事务消息的使用案例。

如有不正确的地方欢迎各位指出纠正。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/899916.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

java实现redis的消息发送和消费,类似kafka功能

确保在 pom.xml 中添加了 Spring Data Redis 和 Jedis 的依赖。如下所示&#xff1a;<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency&g…

C数组与字符串

1.数组 数组是一组有序的、类型相同的数据的集合&#xff0c;这些数据被称为数组的元素。 每个数组都有一个名字&#xff0c;我们称之为数组名。 数组名代表数组的起始地址。 数组元素由索引或下标标识&#xff0c;索引或下标从0开始 数组的特性必须在使用前定义&#xff1…

Mycat 详细介绍及入门实战,解决数据库性能问题

一、基本原理 1、数据分片 &#xff08;1&#xff09;、水平分片 Mycat 将一个大表的数据按照一定的规则拆分成多个小表&#xff0c;分布在不同的数据库节点上。例如&#xff0c;可以根据某个字段的值进行哈希取模&#xff0c;将数据均匀的分布到不同的节点上。 这样做的好处…

OpenIPC开源FPV之Ardupilot配置

OpenIPC开源FPV之Ardupilot配置 1. 源由2. 问题3. 分析3.1 MAVLINK_MSG_ID_RAW_IMU3.2 MAVLINK_MSG_ID_SYS_STATUS3.3 MAVLINK_MSG_ID_BATTERY_STATUS3.4 MAVLINK_MSG_ID_RC_CHANNELS_RAW3.5 MAVLINK_MSG_ID_GPS_RAW_INT3.6 MAVLINK_MSG_ID_VFR_HUD3.7 MAVLINK_MSG_ID_GLOBAL_P…

ActiveMQ消息模式Queue和Topic机制讲解

Docker安装ActiveMQ镜像以及通过Java生产消费activemq示例_docker activemq-CSDN博客 背景 周末由于服务器异常宕机&#xff0c;导致业务系统重启后出现ActiveMQ中的数据没有被正常消费&#xff0c;运维认为是消息积压&#xff0c;便联系博主排查。 最终发现并不存在消息积压…

GIS常见前端开发框架

#1024程序员节&#xff5c;征文# 伴随GIS的发展&#xff0c;陆续出现了众多开源地图框架&#xff0c;这些地图框架与众多行业应用融合&#xff0c;极大地拓展了GIS的生命力&#xff0c;这里介绍几个常见的GIS前端开发框架&#xff0c;排名不分先后。 1.Leaflet https://leafl…

Spring--1

spring是一个轻量级的&#xff0c;采用IOC与AOP编程思想的java后端开发框架&#xff0c;简化了企业级的应用开发。 Spring体系 数据访问层&#xff0c;Web层&#xff0c;配置中心&#xff0c;测试区 IOC 控制反转&#xff0c;将创建对象的控制权交由Spring框架&#xff0c;需…

Tongweb7049m4+THS6010-6012版本 传真实ip到后端(by yjm+lwq)

遇到客户需要通过ths传真实ip到后端也就是部署到tongweb的需求&#xff0c;在ths的httpserver.conf里的location块配置了以下内容&#xff1a; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwar…

Redis技术解析(基于Redis的项目实战)

本项目源码请从作者仓库中拉取 Redis复盘: 本项目将通过实战讲解Redis的应用&#xff0c;包括使用Redis共享session实现短信登录、处理商户查询缓存问题、进行优惠券秒杀活动、基于GEOHash定位附近商户、实现UV统计、管理用户签到、构建好友关注系统&#xff0c;以及使用List和…

数字后端实现静态时序分析STA Timing Signoff之min period violation

今天给大家分享一个在高性能数字IC后端实现timing signoff阶段经常遇到的min period violation。大部分时候出现memory min period问题基本上都是需要返工重新生成memory的。这是非常致命的错误&#xff0c;希望大家在做静态时序分析时一定要查看min period violation。 什么是…

Oracle 常见索引扫描方式概述,哪种索引扫描最快!

一.常见的索引扫描方式 INDEX RANGE SCANINDEX FAST FULL SCANINDEX FULL SCAN(MIN/MAX)INDEX FULL SCAN 二.分别模拟使用这些索引的场景 1.INDEX RANGE SCAN create table t1 as select rownum as id, rownum/2 as id2 from dual connect by level<500000; create inde…

Unity RPG梦幻场景素材(附下载链接)

Unity RPG梦幻场景素材 点击下载资源 效果图&#xff1a; 资源链接

CORS预检请求配置流程图 srpingboot和uniapp

首先要会判断预检请求 还是简单请求 简单请求 预检请求 #mermaid-svg-1R9nYRa7P9Pll4AK {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-1R9nYRa7P9Pll4AK .error-icon{fill:#552222;}#mermaid-svg-1R9nYRa7P9Pll4…

geoserver解析元数据获取图层相关参数

需求&#xff1a; 1、通过geoserver地址获取所有图层名称&#xff1b; 2、加载wms服务&#xff0c;实现自动定位。 获取图层名和范围视图有两种思路&#xff1a; 1、调取geoserver的rest接口。缺点就是需要验证登录。 rest接口官方文档&#xff1a;GeoServer API Docs 2、…

C++(标准输入输出流、命名空间、string字符串、引用)

C特点及优势 &#xff08;1&#xff09;实现了面向对象&#xff0c;在高级语言中&#xff0c;处理运行速度是最快&#xff1b; &#xff08;2&#xff09;非常灵活&#xff0c;功能非常强大&#xff0c;相对于C的指针优势&#xff0c;C的优势为性能和类层次结构&#x…

书生营 L0G4000 玩转HF/魔搭/魔乐社区

模型下载 在codespace上给环境装包&#xff0c;按照教材即可 运行后下载成功 建立下载json文件 新建下载internlm2_5-chat-1_8b的json文件 运行结果 基本上没啥问题&#xff0c;照着教程来就行 模型上传&#xff08;可选&#xff09; push的时候需要先认证token 最后的…

人工智能+医学

医学影响的内型&#xff1a;(X光片、计算机断层扫描、磁共振成像、超声波&#xff09; ITK snap医学图像读取 医学影像领域常见任务: 图像分类、语义分割、疾病预测、目标检测、图像配准、图像生成(应用少)、图像增强、生成放射学报告。 需要有很强的可解释…

Xshell上Linux的基础指令

目录 1、Xshell的使用 2、Linux的常用命令 2.1 位置跳转命令 1、ls 2、cd 3、pwd 2.2 文件操作 1、touch 2、cat 3、echo 4、vim 2.3 目录操作 1、mkdir 2、rm 2.4 移动操作 1、mv 2、cp 2.5 命令手册 2.6 查找操作 2.7 进程展示 2.8 网络信息 3、搭建w…

JS | 详解图片懒加载的6种实现方案

目录 一、什么是懒加载&#xff1f; 二、为什么要懒加载&#xff1f; 三、图片懒加载的实现原理 四、图片懒加载实现方式 3.1 方案一&#xff1a;设置 img 标签属性 loading“lazy” 3.2 方案二&#xff1a;利用JS监听scroll滚动事件 3.3 方案三&#xff1a;利用元素的…

Aatrox-Bert-VITS2部署指南

一、模型介绍 【AI 剑魔 ①】在线语音合成&#xff08;Bert-Vits2&#xff09;&#xff0c;将输入文字转化成暗裔剑魔亚托克斯音色的音频输出。 作者&#xff1a;Xz 乔希 https://space.bilibili.com/5859321 声音归属&#xff1a;Riot Games《英雄联盟》暗裔剑魔亚托克斯 …