一、概述
同步消息的特征是消息发出后会有一个返回值,即RocketMQ服务器收到消息后的一个确认,这种方式非常安全,但是性能上却没有那么高,而且在集群模式下,也是要等到所有的从机都复制了消息以后才会返回,适用于重要的消息传递,例如:短信通知
二、案例代码
2.1、pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.star</groupId>
<artifactId>rocketmq-example</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>rocketmq-example</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.25</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.16</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<!-- 普通maven项目中使用Sl4j注解 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.10</version>
</dependency>
</dependencies>
<!-- 锁定Java编译的版本 -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
2.2、RocketMQConstant
package org.star.constants;
/**
* @Author: 一叶浮萍归大海
* @Date: 2023/7/27 16:42
* @Description:
*/
public class RocketMQConstant {
/**
* 配置RocketMQ NameSrv的地址
*/
public static final String NAME_SERVER_ADDR = "192.168.173.219:9876";
}
2.3、SimpleConsumer
package org.star.simple.consumer;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.star.constants.RocketMQConstant;
import java.util.List;
/**
* @Author: 一叶浮萍归大海
* @Date: 2023/8/25 10:20
* @Description: 简单消息消费者
*/
@Slf4j
public class SimpleConsumer {
public static void main(String[] args) throws Exception {
/**
* 消费消息分两种
* (1)拉模式:消费者主动去Broker上拉消息
* (2)推模式:消费者等待Broker把消息推送过来
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SimpleMessageGroup");
consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
consumer.subscribe("SimpleTopic", "*");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if (CollectionUtils.isNotEmpty(list)) {
String body = StrUtil.utf8Str(list.get(0).getBody());
log.info("收到消息 body:{}",body);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("consumer started!");
}
}
2.4、SyncProducer
package org.star.simple.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.star.constants.RocketMQConstant;
import java.nio.charset.StandardCharsets;
/**
* @Author: 一叶浮萍归大海
* @Date: 2023/8/25 10:12
* @Description: 同步发送:等待消息返回后再继续执行下面的操作
*/
@Slf4j
public class SyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("SyncProducerGroup");
producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
producer.start();
for (int i = 0; i < 3; i++) {
Message message = new Message("SimpleTopic", ("我是第[" + i + "]个简单消息").getBytes(StandardCharsets.UTF_8));
SendResult sendResult = producer.send(message);
log.info("第[" + i + "]个简单消息发送成功 sendStatus:{},msgId:{},topic:{}", sendResult.getSendStatus(),sendResult.getMsgId(),sendResult.getMessageQueue().getTopic());
}
producer.shutdown();
}
}
2.5、控制台打印结果
# 生产者端
12:14:32.339 [main] INFO org.star.simple.producer.SyncProducer - 第[0]个简单消息发送成功 sendStatus:SEND_OK,msgId:C0A81FB25D9418B4AAC207C6D9850000,topic:SimpleTopic
12:14:32.343 [main] INFO org.star.simple.producer.SyncProducer - 第[1]个简单消息发送成功 sendStatus:SEND_OK,msgId:C0A81FB25D9418B4AAC207C6D9940001,topic:SimpleTopic
12:14:32.348 [main] INFO org.star.simple.producer.SyncProducer - 第[2]个简单消息发送成功 sendStatus:SEND_OK,msgId:C0A81FB25D9418B4AAC207C6D9970002,topic:SimpleTopic
# 消费者端
12:14:32.337 [ConsumeMessageThread_10] INFO org.star.simple.consumer.SimpleConsumer - 收到消息 body:我是第[0]个简单消息
12:14:32.345 [ConsumeMessageThread_11] INFO org.star.simple.consumer.SimpleConsumer - 收到消息 body:我是第[1]个简单消息
12:14:32.349 [ConsumeMessageThread_12] INFO org.star.simple.consumer.SimpleConsumer - 收到消息 body:我是第[2]个简单消息