1.Docker安装
1.1创建docker-compose.yml文件
version: '3.5'
services:
rmqnamesrv:
image: foxiswho/rocketmq:server
container_name: rmqnamesrv
ports:
- 9876:9876
networks:
rmq:
aliases:
- rmqnamesrv
rmqbroker:
image: foxiswho/rocketmq:broker
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
volumes:
- ./broker.conf:/etc/rocketmq/broker.conf
environment:
NAMESRV_ADDR: "rmqnamesrv:9876"
JAVA_OPTS: " -Duser.home=/opt"
JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
command: mqbroker -c /etc/rocketmq/broker.conf
depends_on:
- rmqnamesrv
networks:
rmq:
aliases:
- rmqbroker
rmqconsole:
image: styletang/rocketmq-console-ng
container_name: rmqconsole
ports:
- 8090:8080
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
depends_on:
- rmqnamesrv
networks:
rmq:
aliases:
- rmqconsole
networks:
rmq:
name: rmq
driver: bridge
1.2.新建broker.conf文件
# 集群名称
brokerClusterName = DefaultCluster
# 节点名称
brokerName = broker-a
# broker id节点ID, 0 表示 master, 其他的正整数表示 slave,不能小于0
brokerId = 0
# 在每天的什么时间删除已经超过文件保留时间的 commit log,默认值04
deleteWhen = 04
# 以小时计算的文件保留时间 默认值72小时
fileReservedTime = 72
# Broker角色
brokerRole = ASYNC_MASTER
# 刷盘方式
flushDiskType = ASYNC_FLUSH
# Broker服务地址,内部使用填内网ip,如果是需要给外部使用填公网ip,自行更改
brokerIP1 = 192.168.11.99
1.3. 执行docker-compose up -d
即可成功启动
1.4.访问控制台
2.SpringBoot集成RockerMQ
2.1.引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
2.2.配置服务器地址
#Rocketmq配置
rocketmq.name-server=192.168.11.99:9876
# 必须指定生产者组
rocketmq.producer.group=group01
# 消息发送超时时长,默认3s
rocketmq.producer.send-message-timeout=3000
# 同步发送消息失败重试次数,默认2
rocketmq.producer.retry-times-when-send-failed=3
# 异步发送消息失败重试次数,默认2
rocketmq.producer.retry-times-when-send-async-failed=3
2.3创建生产者
@Service
@Slf4j
public class Producer1 {
@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMassage(String msg){
Message<String> message = MessageBuilder.withPayload(msg).build();
rocketMQTemplate.send("topic_01",message);
}
//rocketMQ的延迟队列
public void sendMassage2(String msg){
Message<String> message = MessageBuilder.withPayload(msg).build();
rocketMQTemplate.syncSend("topic_01",message,3000,4);
log.debug("生产者发送消息成功:{}"+msg);
}
//死信队列
public void sendMassage3(String msg){
// Message<String> message = MessageBuilder.withPayload(msg).build();
rocketMQTemplate.convertAndSend("topic_01",msg);
log.debug("生产者发送消息成功:{}"+msg);
}
}
2.4创建消费者
@Component
@Slf4j
@RocketMQMessageListener(topic = "topic_01",consumerGroup ="group_205" )
public class Consumer1 implements RocketMQListener<String> , RocketMQPushConsumerLifecycleListener {
@Override
public void onMessage(String massage) {
System.out.println("消费者1接收到消息:"+massage);
int a =1/0;
log.debug("消费者1接收消息成功:{}"+massage);
}
//死信队列需要重写RocketMQPushConsumerLifecycleListener方法
@Override
public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
//设置最大尝试次数2次
defaultMQPushConsumer.setMaxReconsumeTimes(2);
//设置最大抓取次数
defaultMQPushConsumer.setPullBatchSize(5);
}
4.延迟消息
RocketMQ支持指定级别的延迟消息,即只能设置预设的几个时间等级的延迟,而不是任意时间延迟。目前RocketMQ社区版并不支持任意时间的精确延迟,RocketMQ在4.x版本只能够支持18种内置的延迟消息(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h),具体实现方式是在发送消息时设置消息的延迟等级。
5.死信队列
代码如上