目录
1.POM文件添加依赖及yml配置
2.RocketmqUtil
3.生产者(异步发送示例)
4.消费者
5.测试
1.POM文件添加依赖及yml配置
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: My_Group
send-message-timeout: 3000
retry-times-when-send-failed: 3
retry-times-when-send-async-failed: 3
2.RocketmqUtil
package com.kaying.marketing.platform.common.util.rocketMq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* @Description: RocketMQ消息的生产者
* @Author: hwk
*/
@Component
@Slf4j
public class RocketMqUtil {
@Autowired
private RocketMQTemplate rocketMqTemplate;
public void sendMsg(String topic,String data) {
rocketMqTemplate.convertAndSend(topic,data);
log.info("【RocketMQ】发送同步消息:{}", data);
}
public void asyncSend(String topic, String tag, String data,Integer messageDelayLevel) {
rocketMqTemplate.asyncSend(topic + ":" + tag, MessageBuilder.withPayload(data).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 消息发送成功
log.error("消息发送成功"+sendResult);
}
@Override
public void onException(Throwable throwable) {
// 消息发送异常
log.error("异步发送消息异常。topic:" + topic + ";tag:" + tag + ";mqMsg" + data, throwable);
}
},
3000L,
// messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d
// messageDelayLevel = 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
messageDelayLevel);
}
/**
* 发送同步消息:消息响应后发送下一条消息
*
* @param topic 消息主题
* @param tag 消息tag
* @param key 业务号
* @param data 消息内容
*/
public void sendSyncMsg(String topic, String tag, String key, String data) {
//消息
Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
//主题
String destination = topic + ":" + tag;
SendResult sendResult = rocketMqTemplate.syncSend(destination, message);
log.info("【RocketMQ】发送同步消息:{}", sendResult);
}
/**
* 发送异步消息:异步回调通知消息发送的状况
*
* @param topic 消息主题
* @param tag 消息tag
* @param key 业务号
* @param data 消息内容
*/
public void sendAsyncMsg(String topic, String tag, String key, String data) {
//消息
Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
//主题
String destination = topic + ":" + tag;
rocketMqTemplate.asyncSend(destination, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("【RocketMQ】发送异步消息:{}", sendResult);
}
@Override
public void onException(Throwable e) {
log.info("【RocketMQ】发送异步消息异常:{}", e.getMessage());
}
});
}
/**
* 发送单向消息:消息发送后无响应,可靠性差,效率高
*
* @param topic 消息主题
* @param tag 消息tag
* @param key 业务号
* @param data 消息内容
*/
public void sendOneWayMsg(String topic, String tag, String key, String data) {
//消息
Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
//主题
String destination = topic + ":" + tag;
rocketMqTemplate.sendOneWay(destination, message);
}
/**
* 同步延迟消息
*
* @param topic 主题
* @param tag 标签
* @param key 业务号
* @param data 消息体
* @param timeout 发送消息的过期时间
* @param delayLevel 延迟等级-----固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
*
*/
public void sendSyncDelayMsg(String topic, String tag, String key, String data, long timeout, int delayLevel) {
// messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d
// messageDelayLevel = 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
//消息
Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
//主题
String destination = topic + ":" + tag;
SendResult sendResult = rocketMqTemplate.syncSend(destination, message, timeout, delayLevel);
log.info("【RocketMQ】发送同步延迟消息:{}", sendResult);
}
/**
* 异步延迟消息
*
* @param topic 主题
* @param tag 标签
* @param key 业务号
* @param data 消息体
* @param timeout 发送消息的过期时间
* @param delayLevel 延迟等级-----固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
*/
public void sendAsyncDelayMsg(String topic, String tag, String key, String data, long timeout, int delayLevel) {
//消息
Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
//主题
String destination = topic + ":" + tag;
rocketMqTemplate.asyncSend(destination, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("【RocketMQ】发送异步延迟消息:{}", sendResult);
}
@Override
public void onException(Throwable e) {
log.info("【RocketMQ】发送异步延迟消息异常:{}", e.getMessage());
}
}, timeout, delayLevel);
}
/**
* 同步顺序消息
*
* @param topic 主题
* @param tag 标签
* @param key 业务号
* @param data 消息体
*/
public void sendSyncOrderlyMsg(String topic, String tag, String key, String data) {
//消息
Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
//主题
String destination = topic + ":" + tag;
SendResult sendResult = rocketMqTemplate.syncSendOrderly(destination, message, key);
log.info("【RocketMQ】发送同步顺序消息:{}", sendResult);
}
/**
* 异步顺序消息
*
* @param topic 主题
* @param tag 标签
* @param key 业务号
* @param data 消息体
*/
public void sendAsyncOrderlyMsg(String topic, String tag, String key, String data) {
//消息
Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
//主题
String destination = topic + ":" + tag;
rocketMqTemplate.asyncSendOrderly(destination, message, key, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("【RocketMQ】发送异步顺序消息:{}", sendResult);
}
@Override
public void onException(Throwable e) {
log.info("【RocketMQ】发送异步顺序消息异常:{}", e.getMessage());
}
});
}
}
3.生产者(异步发送示例)
//异步发送消息代码示例
rocketMqUtil.sendAsyncMsg(RocketConstant.TEST_TOPIC1, RocketConstant.TEST_TAG1, UUID.randomUUID().toString(), "测试消息一");
4.消费者
简单的负载均衡消费的示例(指定topic和tag,相同的组即为负载均衡消费)
也可以指定不同的topic和不同的tag进行消息区分
注意线上和本地连接同一个MQ也会导致负载均衡,导致线上消息丢失
@RocketMQMessageListener(consumerGroup = "1",
topic = RocketConstant.TEST_TOPIC1,
selectorExpression = RocketConstant.TEST_TAG1)
@Service
public class RocketConsumerTag1 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
String orderNo = message;
log.info("tag1,接收:{}", orderNo);
}
}
@RocketMQMessageListener(consumerGroup ="1",
topic = RocketConstant.TEST_TOPIC1,
selectorExpression = RocketConstant.TEST_TAG1)
@Service
public class RocketConsumerTag2 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
String orderNo = message;
log.info("tag2,接收:{}", orderNo);
}
}