一、基本概念
- 生产者(Producer):也称为消息发布者,是RocketMQ中用来构建并传输消息到服务端的运行实体,举例:发信者
- 主题(Topic):Topic是RocketMQ中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息;Topic是一个逻辑概念,并不是实际的消息容器;
- 消息队列(MessageQueue):队列是RocketMQ中消息存储和传输的实际容器,也是消息的最小存储单元,相当于是Topic的分区;用于并行发送和接收消息;
- 消费者(Consumer):也称为消息订阅者,是RocketMQ中用来接收并处理消息的运行实体,举例:收信者
- 消费者组(ConsumerGroup):消费者组是RocketMQ中承载多个消费行为一致的消费者负载均衡分组(不同的消费者组,可以消费同一个topic的消息)。和消费者不同,消费者组是一个逻辑概念。
- NameServer:可以理解成注册中心,负责更新和发现Broker服务。在NameServer的集群中,NameServer与NameServer之间是没有任何通信的,它是无状态的,举例:各个邮局的管理机构
- Broker:可以理解为消息中转角色,负责消息的存储和转发,接收生产者产生的消息并持久化消息;当用户发送的消息被发送到Broker时,Broker会将消息转发到与之关联的Topic中,以便让更多的接收者进行处理,举例:邮局
1.1.消息模型
1.2.部署模型
1.3 RocketMQ下载地址
RocketMQ的官网地址:https://rocketmq.apache.org/
Github地址:https://github.com/apache/rocketmq
下载地址:https://rocketmq.apache.org/zh/download/
二、实战
我们需要先搭建一个基于Maven的springboot项目,只需要加入以下依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.1.0</version>
</dependency>
2.1 基本样例
生产者生产消息
消息生产者分别通过三种方式发送消息:
- 同步发送:等待消息返回后再继续进行下面的操作。
- 异步发送:不等待消息返回直接进入后续流程。broker将结果返回后调用callback函数,并使用CountDownLatch计数。
- 单向发送:只负责发送,不管消息是否发送成功。
同步发送
package Simple;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
/**
* 同步发送
* Created by BaiLi
*/
public class SyncProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
producer.setNamesrvAddr("192.168.43.137:9876");
producer.start();
for (int i = 0; i < 2; i++) {
Message msg = new Message("Simple", //主题
"TagA", //设置消息Tag,用于消费端根据指定Tag过滤消息。
"Simple-Sync".getBytes(StandardCharsets.UTF_8) //消息体。
);
SendResult send = producer.send(msg);
System.out.printf(i + ".发送消息成功:%s%n", send);
}
producer.shutdown();
}
}
异步发送
package Simple;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* 异步发送
* Created by BaiLi
*/
public class AsyncProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");
producer.setNamesrvAddr("192.168.43.137:9876");
producer.start();
CountDownLatch countDownLatch = new CountDownLatch(100);//计数
for (int i = 0; i < 100; i++) {
Message message = new Message("Simple", "TagA", "Simple-Async".getBytes(StandardCharsets.UTF_8));
final int index = i;
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%d 消息发送成功%s%n", index, sendResult);
}
@Override
public void onException(Throwable throwable) {
countDownLatch.countDown();
System.out.printf("%d 消息失败%s%n", index, throwable);
throwable.printStackTrace();
}
}
);
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}
单向发送
package Simple;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
/**
* 单向发送
* Created by BaiLi
*/
public class OnewayProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");
producer.setNamesrvAddr("192.168.43.137:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("Simple","TagA", "Simple-Oneway".getBytes(StandardCharsets.UTF_8));
producer.sendOneway(message);
System.out.printf("%d 消息发送完成 %n" , i);
}
Thread.sleep(5000);
producer.shutdown();
}
}
消费者消费消息
消费者消费消息分两种:
- 拉模式:消费者主动去Broker上拉取消息。
- 推模式:消费者等待Broker把消息推送过来。
拉模式
package Simple;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.HashSet;
import java.util.Set;
/**
* 拉模式
* Created by BaiLi
*/
public class PullConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("SimplePullConsumer");
pullConsumer.setNamesrvAddr("192.168.43.137:9876");//执行nameserver地址
Set<String> topics = new HashSet<>();
topics.add("Simple");//添加Topic
topics.add("TopicTest");
pullConsumer.setRegisterTopics(topics);
pullConsumer.start();
while (true) { //循环拉取消息
pullConsumer.getRegisterTopics().forEach(n -> {
try {
Set<MessageQueue> messageQueues = pullConsumer.fetchSubscribeMessageQueues(n);//获取主题中的Queue
messageQueues.forEach(l -> {
try {
//获取Queue中的偏移量
long offset = pullConsumer.getOffsetStore().readOffset(l, ReadOffsetType.READ_FROM_MEMORY);
if (offset < 0) {
offset = pullConsumer.getOffsetStore().readOffset(l, ReadOffsetType.READ_FROM_STORE);
}
if (offset < 0) {
offset = pullConsumer.maxOffset(l);
}
if (offset < 0) {
offset = 0;
}
//拉取Queue中的消息。每次获取32条
PullResult pullResult = pullConsumer.pull(l, "*", offset, 32);
System.out.printf("循环拉取消息ing %s%n",pullResult);
switch (pullResult.getPullStatus()) {
case FOUND:
pullResult.getMsgFoundList().forEach(p -> {
System.out.printf("拉取消息成功%s%n", p);
});
//更新偏移量
pullConsumer.updateConsumeOffset(l, pullResult.getNextBeginOffset());
}
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
} catch (MQClientException e) {
e.printStackTrace();
}
});
}
}
}
推模式
package Simple;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 推模式
* Created by BaiLi
*/
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("SimplePushConsumer");
pushConsumer.setNamesrvAddr("192.168.43.137:9876");
pushConsumer.subscribe("Simple","*");
pushConsumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach( n->{
System.out.printf("收到消息: %s%n" , n);
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
pushConsumer.start();
System.out.printf("Consumer Started.%n");
}
}