这里需要创建2.x版本的springboot项目
导入依赖
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.6</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
定义配置文件
server:
port: 3000
rocketmq:
name-server: xxx.xxx.xxx.xxx:9876 # NameServer 地址
producer:
group: rocketmq-4x-service_common-message-execute_pg # 全局发送者组定义
生产者定义
这里的生产者有两个,一个是普通的,一个是延时。
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.yhy.MessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.messaging.support.MessageBuilder;
@Component
@Slf4j
public class GeneralMessageDemoProduce {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public SendResult sendMessage(String topic, String tag, String keys, MessageEvent messageSendEvent) {
SendResult sendResult;
try{
StringBuilder destinationBuilder = StrUtil.builder().append(topic);
if(StrUtil.isNotBlank(tag)){
destinationBuilder.append(":").append(tag);
}
Message<?> message = MessageBuilder
.withPayload(messageSendEvent)
.setHeader(MessageConst.PROPERTY_KEYS,keys)
.setHeader(MessageConst.PROPERTY_TAGS, tag)
.build();
// 设置消息的延时级别
sendResult=rocketMQTemplate.syncSend(
destinationBuilder.toString(),
message,
2000L
);
log.info("[普通消息] 消息发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);
}catch(Throwable ex){
log.error("[普通消息] 消息发送失败,消息体:{}", JSON.toJSONString(messageSendEvent), ex);
throw ex;
}
return sendResult;
}
}
延时的
@Component
@Slf4j
public class ScheduleProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public SendResult sendMessage(String topic, String tag, String keys, MessageEvent messageSendEvent ) {
SendResult sendResult;
try {
StringBuilder destinationBuilder = StrUtil.builder().append(topic);
if(StrUtil.isNotBlank(tag)){
destinationBuilder.append(":").append(tag);
}
Message<?> message = MessageBuilder
.withPayload(messageSendEvent)
.setHeader(MessageConst.PROPERTY_KEYS,keys)
.setHeader(MessageConst.PROPERTY_TAGS, tag)
.build();
// 设置消息的延时级别
sendResult=rocketMQTemplate.syncSend(
destinationBuilder.toString(),
message,
2000L,
6
);
log.info("[延时消息] 消息发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);
}catch(Throwable ex){
log.error("[延时消息] 消息发送失败,消息体:{}", JSON.toJSONString(messageSendEvent), ex);
throw ex;
}
return sendResult;
}
}
消费者定义
这里也是两个消费者,普通的和延时的不在同一个主题的内
@Slf4j
@Component
@RocketMQMessageListener(
topic = "rocketmq-yhy_topic",
selectorExpression = "general",
consumerGroup = "rocketmq-demo_general-message_cg"
)
public class GeneralMessageDemoConsume implements RocketMQListener<MessageEvent> {
@Override
public void onMessage(MessageEvent message) {
log.info("接到RocketMQ消息,消息体:{}", JSON.toJSONString(message));
}
}
import com.alibaba.fastjson.JSON;
import com.yhy.MessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(
topic = "Delay",
selectorExpression = "general",
consumerGroup = "rocketmq-demo_general-message_cg"
)
public class GeneralMessageDemoConsume_Delay implements RocketMQListener<MessageEvent> {
@Override
public void onMessage(MessageEvent message) {
log.info("接到RocketMQ的延时消息,消息体:{}", JSON.toJSONString(message));
}
}
发送消息
这里直接在启动类发送。
@SpringBootApplication
@RestController
public class RocketMQDemoApplication {
@Autowired
private GeneralMessageDemoProduce generalMessageDemoProduce;
@Autowired
private ScheduleProducer scheduleProducer;
@PostMapping("/test/send/general-message")
public String sendGeneralMessage() {
String keys= UUID.randomUUID().toString();
MessageEvent messageEvent=new MessageEvent("消息具体内容——yhy",keys);
SendResult sendResult=generalMessageDemoProduce.sendMessage(
"rocketmq-yhy_topic",
"general",
keys,
messageEvent
);
SendResult sendResult2=scheduleProducer.sendMessage(
"Delay",
"general",
keys,
messageEvent
);
System.out.println(sendResult.getSendStatus().name() );
System.out.println(sendResult2.getSendStatus().name());
return sendResult.getSendStatus().name();
}
public static void main(String[] args) {
SpringApplication.run(RocketMQDemoApplication.class, args);
}
}