目录
问题背景
技术方向
方案确认
消息队列(√)
分布式锁(×)
方案实现
监控方向
业务方向
问题背景
公司邮件服务token有 分钟内超200封的熔断机制,当前token被熔断后,系统发邮件操作会被忽略,所以邮件服务也没有重试操作
人工发现token被熔断后,需要联系邮件群中值班人,将token恢复
分货业务依赖邮件来查看分货通知以及结果,并且分货层层依赖,如果不能及时收到邮件会影响业务的分货时效等,所以通过三个方面去解决这个问题
技术方向
系统内发邮件收口做限流
方案确认
方向:限流发邮件方法1分钟内最大200次
实现:改造系统发邮件底层方法,1分钟内最多发200个
消息队列(√)
面临问题:多出来的怎么处理?消息队列(需要持久化)
实现:新建一个topic,调用发邮件方法的请求全部扔到MQ中,自己消费,通过设置消费者的拉取间隔以及最大拉取数量限制,分钟内消费消息条数不超过200条
面临问题:多分区多消费者?
实现:默认拉取数量为32,目前MQ服务端设置,限制最大拉取数量为32
(可行)设置1个分区,一个消费者组,目前有2个实例(此时其中一个实例不会消费),设置拉取间隔为10s
(不可行,有自动加实例机制)设置2个分区,一个消费者组,目前有2个实例,设置拉取间隔为10s,最大拉取条数为16;系统在流量激增的情况下会增加实例来分摊流量
最终实现方式
topic设置1个分区,一个消费者组,使用默认负载均衡策略:平均分配
//平均分配负载均衡核心逻辑
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
int startIndex = mod > 0 && index < mod ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for(int i = 0; i < range; ++i) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
解析两个实例负载均衡过程
//第一个实例起来,触发负载均衡
//index = 0
int index = cidAll.indexOf(currentCID);
//mod = 1%2 = 1
int mod = mqAll.size() % cidAll.size();
//averageSize = 1
int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
//startIndex = 0 * 1 = 0
int startIndex = mod > 0 && index < mod ? index * averageSize : index * averageSize + mod;
//range = min(1, 1 - 0) = 1
int range = Math.min(averageSize, mqAll.size() - startIndex);
for(int i = 0; i < range; ++i) {
//(0+0)%1 = 0,所以将第一个分区分给当前实例
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
//第二个实例起来,触发负载均衡
//index = 1
int index = cidAll.indexOf(currentCID);
//mod = 1%2 = 1
int mod = mqAll.size() % cidAll.size();
//averageSize = 1
int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
//startIndex = 1 * 1 + 1 = 2
int startIndex = mod > 0 && index < mod ? index * averageSize : index * averageSize + mod;
//range = min(1, 1 - 2) = -1
int range = Math.min(averageSize, mqAll.size() - startIndex);
//不会进入循环分配分区
for(int i = 0; i < range; ++i) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
所以只会有一个实例去消费当前这个分区,在集群消费模式下,一个分区只会被消费组内的一个消费者消费,rmq默认拉取数量为32,设置拉取间隔为10s,所以每分钟内消费:32*6 = 192
分布式锁(×)
当前场景的点,在于需要将超出1分钟两百条的那些邮件持久化存储,等到下一个一分钟去发送,而分布式锁只能实现控制接口的流量,没法保证超出流量那部分的存储,所以没法解决当前问题
方案实现
最终采用消息队列,RocketMQ解决该问题
实现代码,使用Java SDK,设置拉取间隔为10s即可
public void run(String... args) throws Exception {
Properties properties = new Properties();
properties.setProperty(ConfigKey.CONSUMER_GROUP, emailNotifyMqProperties.getConsumerGroup());
properties.setProperty(ConfigKey.ACCESS_KEY, rocketMqProperties.getAccessKey());
properties.setProperty(ConfigKey.SECRET_KEY, rocketMqProperties.getSecretKey());
properties.setProperty(ConfigKey.NAME_SERVER_ADDR, rocketMqProperties.getServer());
properties.setProperty(ConfigKey.ENABLE_MSG_TRACE, "true");
//消费限流:解决发邮件分钟内超过200封会被熔断的问题
properties.setProperty(ConfigKey.PULL_INTERVAL, "10000");
NormalConsumer consumer = ClientFactory.createNormalConsumer(properties, this::consumeMessage);
consumer.subscribe(emailNotifyMqProperties.getTopic(), null);
consumer.start();
}
监控方向
1. 系统日志报警,配置邮件发送失败报警
2. 关注token熔断消息通知
业务方向
梳理当前系统中邮件通知的场景,分析报警内容,从以下方向减少邮件次数发送
1. 用户是否需要关注(用户长时间使用下来,部分通知发现自己并不关注的,比如节点报错可重试成功的
2. 是否可以批量发送(多条通知集合到一条邮件发送:多用户,多单据等)