部署服务
参考RocketMq入门介绍
示例
引入maven依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
完整依赖如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.9</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>rocketMqDemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rocketMqDemo</name>
<description>rocketMqDemo</description>
<properties>
<java.version>8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
修改application.properties文件
配置文件如下:
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
rocketmq.consumer.topic=test-topic
所有的配置参考RocketMQProperties源码中配置。
rocketmq.name-server:服务地址
rocketmq.producer.group:生产者的组名称
rocketmq.consumer.topic:消费者的主题名称
定义生产者
生产者是 Apache RocketMQ 系统中用来构建并传输消息到服务端的运行实体。
生产者通常被集成在业务系统中,将业务消息按照要求封装成 Apache RocketMQ 的消息(Message)并发送至服务端。
生产者和主题的关系为多对多关系,即同一个生产者可以向多个主题发送消息,对于平台类场景如果需要发送消息到多个主题,并不需要创建多个生产者;同一个主题也可以接收多个生产者的消息,以此可以实现生产者性能的水平扩展和容灾。
代码示例如下:
@Component
public class RocketMqProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
/**
* 发送同步消息
* @param msg
*/
public void sendSyncMsg(String msg){
rocketMQTemplate.convertAndSend("test-topic-1", msg);
}
/**
* 发送Spring消息
* @param msg
*/
public void sendSpringMsg(String msg){
rocketMQTemplate.send("test-topic-1"
, MessageBuilder.withPayload(msg).build());
}
/**
* 发送异步消息
* @param msg
*/
public void sendAsyncMsg(String msg){
rocketMQTemplate.asyncSend("test-topic-1", new MsgBean(msg), new SendCallback() {
@Override
public void onSuccess(SendResult var1) {
System.out.printf("async onSucess SendResult=%s %n", var1);
}
@Override
public void onException(Throwable var1) {
System.out.printf("async onException Throwable=%s %n", var1);
}
});
}
/**
* 发送有序消息
* @param msg
*/
public void sendOrderlyMsg(String msg){
rocketMQTemplate.syncSendOrderly("test-topic-1",MessageBuilder.withPayload(msg).build(),"hashkey");
}
}
定义消费者
消费者是 Apache RocketMQ 中用来接收并处理消息的运行实体。 消费者通常被集成在业务系统中,从 Apache RocketMQ 服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。
在消息消费端,可以定义如下传输行为:
-
消费者身份:消费者必须关联一个指定的消费者分组,以获取分组内统一定义的行为配置和消费状态。
-
消费者类型:Apache RocketMQ 面向不同的开发场景提供了多样的消费者类型,包括PushConsumer类型、SimpleConsumer类型、PullConsumer类型(仅推荐流处理场景使用)等。具体信息,请参见消费者分类。
-
消费者本地运行配置:消费者根据不同的消费者类型,控制消费者客户端本地的运行配置。例如消费者客户端的线程数,消费并发度等,实现不同的传输效果。
代码示例如下:
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-group")
@Component
public class RocketMqConsumer implements RocketMQListener<String> {
public void onMessage(String message) {
System.out.println("received message: "+ JSON.toJSONString(message));
}
}
定义Controller调用消费者
代码示例如下:
@Controller
public class RestController {
@Autowired
RocketMqProducer producer;
@RequestMapping(value = "/sendSyncMsg")
@ResponseBody
public String sendSyncMsg(){
producer.sendSyncMsg("hello word");
return "ok";
}
}