在前面的文章中,我们说过解决消息重复消费的方式中,有一个方式是幂等,那么幂等是怎么实现呢?
面试官:对于MQ中的消息重复消费说说的你的理解
一、定义
首先我们先来了解一下幂等的定义,它指的是同一个操作的重复执行不会产生额外的影响,也就是多次执行与一次执行的结果效果相同。
二、影响
当方法不是幂等的时候,对于我们的系统会产生很多的影响,例如:
- 重复调用造成资源浪费。
- 数据不一致。
- 业务逻辑发生错误。
所以我们在写接口时,一定要注意接口的幂等,保障接口幂等,相当于保住自己的饭碗😂(尤其涉及到 money 的系统)。
三、自定义注解实现幂等
在使用注解实现幂等之前,先说一下大概思路。
这个思路与分布式锁大体相同,所以理解起来会相对容易点,需要注意的就是释放的时机。
- AOP 拦截需要做幂等的方法。
- 获取 key 的解析器。
- 通过 key 解析器解析出来判断幂等的条件(也就是什么条件下才算是重复的请求)。
- 在 Redis 中判断该 key 是否存在。
- 如果存在,说明已经有在执行的请求,直接拒绝请求,响应结束。
- 如果不存在,说明当前线程是首次请求,放行请求,开始执行方法。
上述流程很简单吧,如果你看懂了就跟我一起来实战一下。
需要注意的是,判断幂等的条件不是唯一的,不同的业务场景可以使用不同的幂等条件,所以这个地方需要支持自定义幂等 key。
3.1、自定义注解 Idempotent
java
复制代码
@Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) public @interface Idempotent { int timeout() default 1; TimeUnit timeUnit() default TimeUnit.SECONDS; String message() default "重复请求,请稍后重试"; Class<? extends IdempotentKeyResolver> keyResolver() default DefaultIdempotentKeyResolver.class; String keyArg() default ""; boolean deleteKeyWhenException() default true; }
timeout
指定幂等操作的超时时间,默认是1
秒。timeUnit
指定时间单位,默认SECONDS
。message
正在执行时的提示信息。keyResolver
也就是我们所说的自定义key
的解析器。keyArg
使用Spring EL
表达式解析器解析`key
使用。deleteKeyWhenException
当发生异常的时候是否删除 key。发生异常的时候删除key是为了避免下次请求无法正常执行。当请求正常的时候不需要,如果删除的话,不就和开头一样了吗,分布式锁?
3.2、自定义 key 解析器
定义 key
解析器IdempotentKeyResolver
。
java
复制代码
public interface IdempotentKeyResolver { /** * 解析一个 Key * * @param idempotent 幂等注解 * @param joinPoint AOP 切面 * @return Key */ String resolver(JoinPoint joinPoint, Idempotent idempotent); }
3.2.1、默认的 key 解析器
默认解析我们使用方法名加参数
生成一个 key
,因为参数可能过长,所以我们使用MD5
压缩一下。
typescript
复制代码
public class DefaultIdempotentKeyResolver implements IdempotentKeyResolver { @Override public String resolver(JoinPoint joinPoint, Idempotent idempotent) { String methodName = joinPoint.getSignature().toString(); String argsStr = StrUtil.join(",", joinPoint.getArgs()); return SecureUtil.md5(methodName + argsStr); } }
3.2.2、使用用户信息做 key
我们使用方法名、参数、用户ID、用户类型
生成 key
,同样使用 MD5
压缩。
用户ID
与用户类型
取决于我们自己怎么获取,可以读取session
也可以读取数据库,具体取决自己的业务系统,此处就不再演示。
ini
复制代码
public class UserIdempotentKeyResolver implements IdempotentKeyResolver { @Override public String resolver(JoinPoint joinPoint, Idempotent idempotent) { String methodName = joinPoint.getSignature().toString(); String argsStr = StrUtil.join(",", joinPoint.getArgs()); Long userId = ""; Integer userType = ""; return SecureUtil.md5(methodName + argsStr + userId + userType); } }
3.2.3、Spring EL 表达式解析 key
使用Spring EL
表达式解析,在使用中通过 EL
表达式解析参数,最后生成一个key
。
scss
复制代码
public class ExpressionIdempotentKeyResolver implements IdempotentKeyResolver { private final ParameterNameDiscoverer parameterNameDiscoverer = new LocalVariableTableParameterNameDiscoverer(); private final ExpressionParser expressionParser = new SpelExpressionParser(); @Override public String resolver(JoinPoint joinPoint, Idempotent idempotent) { // 获得被拦截方法参数名列表 Method method = getMethod(joinPoint); Object[] args = joinPoint.getArgs(); String[] parameterNames = this.parameterNameDiscoverer.getParameterNames(method); // 准备 Spring EL 表达式解析的上下文 StandardEvaluationContext evaluationContext = new StandardEvaluationContext(); if (ArrayUtil.isNotEmpty(parameterNames)) { for (int i = 0; i < parameterNames.length; i++) { evaluationContext.setVariable(parameterNames[i], args[i]); } } // 解析参数 Expression expression = expressionParser.parseExpression(idempotent.keyArg()); return expression.getValue(evaluationContext, String.class); } private static Method getMethod(JoinPoint point) { // 处理,声明在类上的情况 MethodSignature signature = (MethodSignature) point.getSignature(); Method method = signature.getMethod(); if (!method.getDeclaringClass().isInterface()) { return method; } // 处理,声明在接口上的情况 try { return point.getTarget().getClass().getDeclaredMethod( point.getSignature().getName(), method.getParameterTypes()); } catch (NoSuchMethodException e) { throw new RuntimeException(e); } } }
3.3、幂等注解逻辑处理类
拦截添加了注解的方法,实现对应的幂等操作。
java
复制代码
@Aspect @Slf4j public class IdempotentAspect { /** * IdempotentKeyResolver 集合 */ private final Map<Class<? extends IdempotentKeyResolver>, IdempotentKeyResolver> keyResolvers; private final IdempotentRedisDAO idempotentRedisDAO; public IdempotentAspect(List<IdempotentKeyResolver> keyResolvers, IdempotentRedisDAO idempotentRedisDAO) { this.keyResolvers = CollectionUtils.convertMap(keyResolvers, IdempotentKeyResolver::getClass); this.idempotentRedisDAO = idempotentRedisDAO; } @Around(value = "@annotation(idempotent)") public Object aroundPointCut(ProceedingJoinPoint joinPoint, Idempotent idempotent) throws Throwable { // 获得 IdempotentKeyResolver IdempotentKeyResolver keyResolver = keyResolvers.get(idempotent.keyResolver()); Assert.notNull(keyResolver, "找不到对应的 IdempotentKeyResolver"); // 解析 Key String key = keyResolver.resolver(joinPoint, idempotent); // 1. 锁定 Key boolean success = idempotentRedisDAO.setIfAbsent(key, idempotent.timeout(), idempotent.timeUnit()); // 锁定失败,抛出异常 if (!success) { log.info("[aroundPointCut][方法({}) 参数({}) 存在重复请求]", joinPoint.getSignature().toString(), joinPoint.getArgs()); throw new ServiceException(GlobalErrorCodeConstants.REPEATED_REQUESTS.getCode(), idempotent.message()); } // 2. 执行逻辑 try { return joinPoint.proceed(); } catch (Throwable throwable) { // 3. 异常时,删除 Key if (idempotent.deleteKeyWhenException()) { idempotentRedisDAO.delete(key); } throw throwable; } } }
3.4、封装Redis操作
对于 key
的缓存,我们放在 Redis中,所以我们此处封装一个 Redis操作类。
typescript
复制代码
@AllArgsConstructor public class IdempotentRedisDAO { /** * 幂等操作 * * KEY 格式:idempotent:%s // 参数为 uuid * VALUE 格式:String * 过期时间:不固定 */ private static final String IDEMPOTENT = "idempotent:%s"; private final StringRedisTemplate redisTemplate; public Boolean setIfAbsent(String key, long timeout, TimeUnit timeUnit) { String redisKey = formatKey(key); return redisTemplate.opsForValue().setIfAbsent(redisKey, "", timeout, timeUnit); } public void delete(String key) { String redisKey = formatKey(key); redisTemplate.delete(redisKey); } private static String formatKey(String key) { return String.format(IDEMPOTENT, key); } }
四、使用注解 Idempotent
需要引入注解,切面,以及解析配置类,让其被Spring
管理起来,然后在需要使用的接口上增加注解。
less
复制代码
@Idempotent(idempotent = true,expireTime = 3,timeUnit = TimeUnit.SECONDS,info = "请勿重复更新用户密码",delKey = false) @PutMapping(value = "updatePassword") public String updatePassword(User user){ userServiceImpl.updatePassword(user); return "更新成功"; }
总结
总结一下设计思路以及需要注意的地方。
AOP
拦截请求,方法处理之前先存入Redis
中key
、value
以及过期时间。- 过期时间必须设置,防止一个请求阻塞,自动过期时间必须是超过业务逻辑处理时间。
- 该方案是接口请求层面的幂等,如果业务方面的,还需要业务单独开发自己本身的幂等逻辑。
- 前端请求做遮罩层,防止在过期时间小于业务处理时间时的多次触发,造成业务的不一致。
- 对于业务的幂等数据库层面可以创建唯一索引,先查询在添加。
- 这种方式与分布式锁逻辑类似,但是不可用于分布锁,并发压测下会有问题。但是做幂等就可以,因为实际的情况就是同一个用户不会在短短的3、5秒内完成50-100个以上的重复请求。
- 对于
key
的生成还可以加上请求IP
做限制。
好了,接口的幂等方案到这就结束了,文中的代码参考的是yudao-cloud
的幂等设计,感兴趣的可以看一下。如有错误也欢迎指出,大家一起评论区交流学习。