目录
一、引入依赖
二、配置文件
三、生产者
四、消费者
五、结果
一、引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
二、配置文件
#Rocketmq配置
rocketmq.name-server=192.168.11.99:9876
# 必须指定生产者组
rocketmq.producer.group=group01
# 消息发送超时时长,默认3s
rocketmq.producer.send-message-timeout=3000
# 同步发送消息失败重试次数,默认2
rocketmq.producer.retry-times-when-send-failed=3
# 异步发送消息失败重试次数,默认2
rocketmq.producer.retry-times-when-send-async-failed=3
三、生产者
package com.beiyou.rocket.provider;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class Provider1 {
/**
* 生产者
*/
@Autowired
private RocketMQTemplate rocketTemplate;
public void send(String msg) {
Message<String> build = MessageBuilder.withPayload(msg).build();
// 发送消息
rocketTemplate.convertAndSend("topic_01", build);
}
}
四、消费者
package com.beiyou.rocket.consumer;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "topic_01", consumerGroup = "group_205")
public class Consumer1 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("消费者收到了消息: " + message);
}
}