RocketMQ—RocketMQ快速入门
RocketMQ提供了发送多种发送消息的模式,例如同步消息,异步消息,顺序消息,延迟消息,事务消息等。
消息发送和监听的流程
消息生产者
- 创建消息生产者producer,并制定生产者组名
- 指定Nameserver地址
- 启动producer
- 创建消息对象,指定主题Topic、Tag和消息体等
- 发送消息
- 关闭生产者producer
消息消费者
- 创建消费者consumer,制定消费者组名
- 指定Nameserver地址
- 创建监听订阅主题Topic和Tag等
- 处理消息
- 启动消费者consumer
demo程序
简单程序
新建一个springboot项目。
引入以下依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
</dependencies>
生产者简单案例如下:
/**
* 生产者
*/
@Test
public void simpleProducer() throws Exception {
//创建一个生产者,并指定一个组名
DefaultMQProducer producer = new DefaultMQProducer("test-1-producer-group");
//连接namesrv,参数是namesrv的ip地址:端口号
producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
//启动
producer.start();
//指定topic,创建一个消息
Message message = new Message("testTopic1", "一条简单消息".getBytes());
//发送消息,并获得状态
SendResult sendResult = producer.send(message);
System.out.println("发送消息的状态为:"+sendResult);
//关闭生产者
producer.shutdown();
}
运行程序,可以看到运行结果如下:
可以看到出现了新的topic
消费者简单案例如下:
/**
* 接受消息
*/
@Test
public void simpleConsumer() throws Exception {
//创建一个消费者,并指定一个组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-1-consumer-group");
//连接namesrv,参数是namesrv的ip地址:端口号
consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
//订阅一个主题 *号表示订阅这个主题中所有的消息
consumer.subscribe("testTopic1","*");
//设置一个监听器(一直监听,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//这里就是消费的方式 做业务处理
System.out.println("消费者");
System.out.println(list.get(0).toString());
System.out.println("消息内容:"+new String(list.get(0).getBody()));
System.out.println("消费上下文:"+consumeConcurrentlyContext);
//返回消费的状态 ,CONSUME_SUCCESS表示成功,消息会从队列中出队
//如果返回RECONSUME_LATER/报错/null,表示消费失败,消息会重新回到队列,给当前消费者或者其他消费者消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();
//挂起当前jvm,防止主线程结束,让监听器一直监听
System.in.read();
}
运行截图如下: