1. 引入依赖
<!--阿里云ons,方便的接入到云服务-->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.4.Final</version>
</dependency>
2. 配置
配置注意事项:
- nameSrvAddr我这里是用的4.0版本的支持http,5.0不支持http
- 一个 Group ID 代表一个 Consumer 实例群组。同一个消费者 Group ID 下所有的 Consumer 实例必须保证订阅的 Topic 一致,并且也必须保证订阅 Topic 时设置的过滤规则(Tag)一致。否则您的消息可能会丢失。
- 订阅关系参考官方文档: 订阅关系一致
- 此处我配置了多个GroupId,Tag,Topic(order,market,vehicle)如果不需要配置一个即可,对应基本配置类需要增减对应属性
aliyun:
rocketmq:
accessKey: LTAI5txxxxxxx
secretKey: Afq06tBxrdBxxxxxxxx
nameSrvAddr: http://MQ_INST_xxxxxxxxxx_BYkZuJCq.cn-beijing.mq.aliyuncs.com:80
orderGroupId: GID_xxxxxx_test
orderTag: 'order'
orderTopic: vehicle-order-test
marketGroupId: GID_xxxxxx2_test
marketTag: 'market'
marketTopic: vehicle-market-test
vehicleGroupId: GID_xxxxxx3_test
vehicleTag: 'vehicle'
vehicleTopic: vehicle-order-test
3. 配置类
3.1 基本配置类
package com.vehicle.manager.core.config;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.Properties;
/**
* Rocket MQ 配置类
* @author zr 2024/3/1
*/
@Configuration
@ConfigurationProperties(prefix = "aliyun.rocketmq")
@Data
public class RocketMqConfig {
private String accessKey;
private String secretKey;
private String nameSrvAddr;
private String marketGroupId;
private String marketTopic;
private String marketTag;
private String orderTopic;
private String orderGroupId;
private String orderTag;
private String vehicleTopic;
private String vehicleGroupId;
private String vehicleTag;
public Properties getMqPropertie() {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
return properties;
}
}
3.2 生产者配置
package com.vehicle.manager.core.config;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author zr 2024/3/1
*/
@Configuration
public class ProducerConfig {
@Autowired
private RocketMqConfig mqConfig;
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ProducerBean buildProducer() {
ProducerBean producer = new ProducerBean();
producer.setProperties(mqConfig.getMqPropertie());
return producer;
}
}
3.3 消费者配置
package com.vehicle.manager.core.config;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.vehicle.manager.core.listener.VehicleListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* RocketMq消费者
* @author zr 2024/3/1
*/
@Configuration
public class VehicleConsumerConfig {
@Autowired
private RocketMqConfig mqConfig;
@Autowired
private VehicleListener vehicleListener;
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ConsumerBean buildVehicleBuyerConsumer() {
ConsumerBean consumerBean = new ConsumerBean();
//配置文件
Properties properties = mqConfig.getMqPropertie();
properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getVehicleGroupId());
//将消费者线程数固定为20个 20为默认值
properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
consumerBean.setProperties(properties);
//订阅关系
Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
Subscription subscription = new Subscription();
subscription.setTopic(mqConfig.getVehicleTopic());
subscription.setExpression(mqConfig.getVehicleTag());
subscriptionTable.put(subscription, vehicleListener);
//订阅多个topic如上面设置
consumerBean.setSubscriptionTable(subscriptionTable);
return consumerBean;
}
}
4. 生产者工具类
- MessageRecord为记录消息发送的对象,可以自行根据字段进行设计调整
- 参数说明:
- topic – 消息主题, 最长不超过255个字符; 由a-z, A-Z, 0-9, 以及中划线"-“和下划线”_"构成.
- tag – 消息标签, 请使用合法标识符, 尽量简短且见名知意
- key – 业务主键
- body – 消息体, 消息体长度默认不超过4M, 具体请参阅集群部署文档描述.
package com.vehicle.manager.core.util;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.vehicle.manager.core.config.RocketMqConfig;
import com.vehicle.manager.core.mapper.MessageRecordMapper;
import com.vehicle.manager.core.model.entity.MessageRecord;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
/**
* RocketMessageProducer rocketMQ消息生产者
* @author zr 2024/3/1
*/
@Component
@Slf4j
public class RocketMessageProducer {
private static ProducerBean producer;
private static RocketMqConfig mqConfig;
private static MessageRecordMapper messageRecordMapper;
@Autowired
private MessageRecordMapper messageRecordMapperInstance;
@PostConstruct
public void init() {
RocketMessageProducer.messageRecordMapper = messageRecordMapperInstance;
}
public RocketMessageProducer(ProducerBean producer, RocketMqConfig mqConfig) {
this.producer = producer;
this.mqConfig = mqConfig;
}
/**
* 生产车辆服务普通消息
* @param tag
* @param key
* @param body
*/
public static void producerVehicleMsg(String tag, String key, String body) {
Message msg = new Message(mqConfig.getVehicleTopic(), tag, key, body.getBytes());
long time = System.currentTimeMillis();
try {
SendResult sendResult = producer.send(msg);
assert sendResult != null;
log.info(time
+ " Send mq message success.Topic is:" + msg.getTopic()
+ " Tag is:" + msg.getTag() + " Key is:" + msg.getKey()+" body is:"+new String(msg.getBody())
+ " msgId is:" + sendResult.getMessageId());
MessageRecord messageRecord = new MessageRecord();
messageRecord.setPlatformType("mq");
messageRecord.setMessageType("order");
messageRecord.setMqMessageTopic(msg.getTopic());
messageRecord.setMqMessageTag(msg.getTag());
messageRecord.setMqMessageKey(msg.getKey());
messageRecord.setMqMessageId(sendResult.getMessageId());
messageRecord.setCreatedTime(LocalDateTime.now());
messageRecord.setMessageContent(new String(msg.getBody()));
messageRecordMapper.insert(messageRecord);
} catch (ONSClientException e) {
e.printStackTrace();
log.error(time + " Send mq message failed. Topic is:" + msg.getTopic());
}
}
/**
* 生产车辆服务延时普通消息
* @param tag order:订单服务 vehicle:主要用于本服务的超时回应
* @param key
* @param body
* @param delay 延迟秒
*/
public static void producerVehicleDelayMsg(String tag, String key, String body,Integer delay) {
Message msg = new Message(mqConfig.getVehicleTopic(), tag, key, body.getBytes());
long time = System.currentTimeMillis();
msg.setStartDeliverTime(time+ delay*1000);
try {
SendResult sendResult = producer.send(msg);
assert sendResult != null;
log.info(time
+ " 发送消息成功.Topic is:" + msg.getTopic()
+ " Tag 为:" + msg.getTag() + " Key 为:" + msg.getKey()+" body 为:"+new String(msg.getBody())
+ " msgId 为:" + sendResult.getMessageId());
} catch (ONSClientException e) {
e.printStackTrace();
log.error(time + " Send mq message failed. Topic is:" + msg.getTopic());
}
}
}
5. 消费者监听
package com.vehicle.manager.core.listener;
import com.alibaba.fastjson.JSON;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.vehicle.manager.core.model.dto.req.VehicleMQMessageDTO;
import com.vehicle.manager.core.service.HlCarService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author zr 2024/3/1
*/
@Component
@Slf4j
public class VehicleListener implements MessageListener {
@Autowired
private HlCarService hlCarService;
@Override
public Action consume(Message message, ConsumeContext context) {
log.info("VehicleReceive 消息: " + message);
try {
byte[] body = message.getBody();
String s = new String(body);
log.info(s);
// VehicleMQMessageDTO需要自行根据业务封装
VehicleMQMessageDTO vehicleMQMessageDTO = JSON.parseObject(s, VehicleMQMessageDTO.class);
log.info(vehicleMQMessageDTO.toString());
// 以下做你的业务处理
// .........
return Action.CommitMessage;//进行消息的确认
} catch (Exception e) {
log.info(e.getMessage());
//消费失败
return Action.ReconsumeLater;
}
}
}
6. 测试
6.1 发送消息
package com.vehicle.manager.core;
import com.alibaba.fastjson.JSON;
import com.vehicle.manager.api.StartApplication;
import com.vehicle.manager.core.util.RocketMessageProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author zr 2024/3/1
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = StartApplication.class)
public class MqTest {
@Test
public void producerMsg() {
RocketMessageProducer.producerVehicleMsg("vehicle","test", JSON.toJSONString(new String("testBody")));
}
}
6.2 接收消息
7. 延时消息
如果需要使用延时消息可以参考RocketMessageProducer中有一个延时消息的方法producerVehicleDelayMsg