作用
1. 限流削峰
2. 异步解耦
组成
Producer:消息的发送者,生产者;举例:发件人
Consumer:消息接收者,消费者;举例:收件人
Broker:暂存和传输消息的通道;举例:快递
NameServer:管理Broker;举例:各个快递公司的管理机构 相当于broker的注册中心,保留了broker的信息
Queue:队列,消息存放的位置,一个Broker中可以有多个队列
Topic:主题,消息的分类
快速入门
生产者
1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体等
5.发送消息
6.关闭生产者producer
消费者
1.创建消费者consumer,制定消费者组名
2.指定Nameserver地址
3.创建监听订阅主题Topic和Tag等
4.处理消息
5.启动消费者consumer
搭建pom依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.2</version>
</dependency>
编写生产者
@Test
void simpleProducer() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
//创建一个生产者,指定组名
DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");
//连接namesrv
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
//启动
producer.start();
//创建一个消息
Message message = new Message("testTopic", "hello world".getBytes());
//发送消息
SendResult send = producer.send(message);
System.out.println(send.getSendStatus());
//关闭生产者
producer.shutdown();
}
编写消费者
@Test
void simpleConsumer() throws Exception{
//创建一个消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer-group");
//连接
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
//订阅一个主题 *表示所有的消息,后期会有消息过滤
consumer.subscribe("testTopic", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
//业务处理
System.out.println("我是消费者");
System.out.println("消费:"+new String(list.get(0).getBody()));
System.out.println("消费上下文"+consumeConcurrentlyContext);
//返回值
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
//启动
consumer.start();
System.in.read();
}
消费模式
Push是服务端【MQ】主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
Pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。
Push模式也是基于pull模式的,只能客户端内部封装了api,一般场景下,上游消息生产量小或者均速的时候,选择push模式。在特殊场景下,例如电商大促,抢优惠券等场景可以选择pull模式