文章目录
- 场景说明
- 方案设计
- 数据结构
- Redis使用方案
- 排行榜实现
- 更新用户活跃积分
- 幂等策略
- 榜单评分更新
- 触发活跃度更新
- 排行榜查询
技术派项目源码地址 :
- Gitee :技术派 - https://gitee.com/itwanger/paicoding
- Github :技术派 - https://github.com/itwanger/paicoding
效果如图 :
场景说明
技术派中,提供了一个用户的活跃排行榜,当然作为一个博客社区,更应该实现的是作者排行榜;出于让大家更有参与感的目的,我们以用户活跃度来设计一个排行榜,区分日/月两个榜单
用户活跃度计算方式:
- 用户每访问一个新的页面 +1分
- 对于一篇文章,点赞、收藏 +2分;取消点赞、取消收藏,将之前的活跃分收回
- 文章评论 +3分
- 发布一篇审核通过的文章 +10分
方案设计
数据结构
排行榜,一般而言都是连续的,借此我们可以联想到一个合适的数据结构LinkedList,
好处在于排名变动时,不需要数组的拷贝
Redis使用方案
这里主要使用的是redis的ZSET数据结构,带权重的集合,下面分析一下可能性
-
set: 集合确保里面元素的唯一性
-
权重:这个可以看做我们的score,这样每个元素都有一个score;
-
zset:根据score进行排序的集合
-
从zset的特性来看,我们每个用户的积分,丢到zset中,就是一个带权重的元素,
-
而且是已经排好序的了,只需要获取元素对应的index,就是我们预期的排名
排行榜实现
更新用户活跃积分
接下来我们先思考一下,这个具体的应该怎么实现,先梳理实现的业务流程
- 根据业务实体,计算需要增加/减少的活跃度
- 对于增加活跃度时:
- 做一个幂等,防止重复添加,因此需要判断下之前有没有重复添加过相关的活跃度
- 若幂等了,则直接返回;否则,执行更新,并做好幂等保存
- 对于减少活跃度时:
- 判断之前有没有加过活跃度,防止扣减为负数
- 之前没有扣减过,则直接返回;否则,执行扣减,并移除幂等判定
上面的业务逻辑清晰之后,在看一下我们实现的关键要素
- 怎么做幂等?
- 如何更新榜单的评分?
幂等策略
放了防止重复加活跃度,怎么做幂等呢?
一个简单的方案就是将用户的每个加分项,都直接记录下来,在执行具体加分时,基于此来做幂等判定
基于上面这个思路,很容易想到的一个方案就是
每个用户维护一个活跃更新操作历史记录表,我们先尽量设计得轻量级一点
直接将用户的历史日志,保存在redis的hash数据结构中,每天一个记录
- key: activity_rank_{user_id}_{年月日}
- field: 活跃度更新key
- value: 添加的活跃度
榜单评分更新
- 对有序集合中的某个成员的分数进行增加操作,并返回增加后的总分值
- 具体实现代码如下
public void addActivityScore(Long userId, ActivityScoreBo activityScore) {
if (userId == null) {
return;
}
// 1. 计算活跃度(正为加活跃,负为减活跃)
String field;
int score = 0;
if (activityScore.getPath() != null) {
field = "path_" + activityScore.getPath();
score = 1;
} else if (activityScore.getArticleId() != null) {
field = activityScore.getArticleId() + "_";
if (activityScore.getPraise() != null) {
field += "praise";
score = BooleanUtils.isTrue(activityScore.getPraise()) ? 2 : -2;
} else if (activityScore.getCollect() != null) {
field += "collect";
score = BooleanUtils.isTrue(activityScore.getCollect()) ? 2 : -2;
} else if (activityScore.getRate() != null) {
// 评论回复
field += "rate";
score = BooleanUtils.isTrue(activityScore.getRate()) ? 3 : -3;
} else if (BooleanUtils.isTrue(activityScore.getPublishArticle())) {
// 发布文章
field += "publish";
score += 10;
}
} else if (activityScore.getFollowedUserId() != null) {
field = activityScore.getFollowedUserId() + "_follow";
score = BooleanUtils.isTrue(activityScore.getFollow()) ? 2 : -2;
} else {
return;
}
final String todayRankKey = todayRankKey();
final String monthRankKey = monthRankKey();
// 2. 幂等,判断之前是否有更新过相关的活跃度信息
final String userActionKey = ACTIVITY_SCORE_KEY + userId + DateUtil.format(DateTimeFormatter.ofPattern("yyyyMMdd"), System.currentTimeMillis());
Integer ans = RedisClient.hGet(userActionKey, field, Integer.class);
if (ans == null) {
// 2.1 之前没有加分记录,执行具体的加分
if (score > 0) {
// 记录加分记录
RedisClient.hSet(userActionKey, field, score);
// 个人用户的操作记录,保存一个月的有效期,方便用户查询自己最近31天的活跃情况
RedisClient.expire(userActionKey, 31 * DateUtil.ONE_DAY_SECONDS);
// 更新当天和当月的活跃度排行榜
Double newAns = RedisClient.zIncrBy(todayRankKey, String.valueOf(userId), score);
RedisClient.zIncrBy(monthRankKey, String.valueOf(userId), score);
if (log.isDebugEnabled()) {
log.info("活跃度更新加分! key#field = {}#{}, add = {}, newScore = {}", todayRankKey, userId, score, newAns);
}
if (newAns <= score) {
// 日活跃榜单,保存31天;月活跃榜单,保存1年
RedisClient.expire(todayRankKey, 31 * DateUtil.ONE_DAY_SECONDS);
RedisClient.expire(monthRankKey, 12 * DateUtil.ONE_MONTH_SECONDS);
}
}
} else if (ans > 0) {
// 2.2 之前已经加过分,因此这次减分可以执行
if (score < 0) {
Boolean oldHave = RedisClient.hDel(userActionKey, field);
if (BooleanUtils.isTrue(oldHave)) {
Double newAns = RedisClient.zIncrBy(todayRankKey, String.valueOf(userId), score);
RedisClient.zIncrBy(monthRankKey, String.valueOf(userId), score);
if (log.isDebugEnabled()) {
log.info("活跃度更新减分! key#field = {}#{}, add = {}, newScore = {}", todayRankKey, userId, score, newAns);
}
}
}
}
}
触发活跃度更新
- 文章/用户的相关操作事件监听,并更新对应的活跃度
- 添加了@Async注解, 作为异步处理, 不参与原本的业务逻辑当中
/**
* 用户操作行为,增加对应的积分
*
* @param msgEvent
*/
@EventListener(classes = NotifyMsgEvent.class)
@Async
public void notifyMsgListener(NotifyMsgEvent msgEvent) {
switch (msgEvent.getNotifyType()) {
case COMMENT:
case REPLY:
CommentDO comment = (CommentDO) msgEvent.getContent();
userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setRate(true).setArticleId(comment.getArticleId()));
break;
case COLLECT:
UserFootDO foot = (UserFootDO) msgEvent.getContent();
userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setCollect(true).setArticleId(foot.getDocumentId()));
break;
case CANCEL_COLLECT:
foot = (UserFootDO) msgEvent.getContent();
userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setCollect(false).setArticleId(foot.getDocumentId()));
break;
case PRAISE:
foot = (UserFootDO) msgEvent.getContent();
userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setPraise(true).setArticleId(foot.getDocumentId()));
break;
case CANCEL_PRAISE:
foot = (UserFootDO) msgEvent.getContent();
userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setPraise(false).setArticleId(foot.getDocumentId()));
break;
case FOLLOW:
UserRelationDO relation = (UserRelationDO) msgEvent.getContent();
userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setFollow(true).setArticleId(relation.getUserId()));
break;
case CANCEL_FOLLOW:
relation = (UserRelationDO) msgEvent.getContent();
userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setFollow(false).setArticleId(relation.getUserId()));
break;
default:
}
}
- 发布文章事件
/**
* 发布文章,更新对应的积分
*
* @param event
*/
@Async
@EventListener(ArticleMsgEvent.class)
public void publishArticleListener(ArticleMsgEvent<ArticleDO> event) {
ArticleEventEnum type = event.getType();
if (type == ArticleEventEnum.ONLINE) {
userActivityRankService.addActivityScore(
ReqInfoContext.getReqInfo().getUserId(),
new ActivityScoreBo().setPublishArticle(true).setArticleId(event.getContent().getId()));
}
- 基于用户浏览行为的活跃度更新,这个就可以再Filte/Inteceptor层来实现了
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
if (handler instanceof HandlerMethod) {
HandlerMethod handlerMethod = (HandlerMethod) handler;
Permission permission = handlerMethod.getMethod().getAnnotation(Permission.class);
if (permission == null) {
permission = handlerMethod.getBeanType().getAnnotation(Permission.class);
}
if (permission == null || permission.role() == UserRole.ALL) {
if (ReqInfoContext.getReqInfo() != null) {
// 用户活跃度更新
SpringUtil.getBean(UserActivityRankService.class).addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setPath(ReqInfoContext.getReqInfo().getPath()));
}
return true;
}
if (ReqInfoContext.getReqInfo() == null || ReqInfoContext.getReqInfo().getUserId() == null) {
if (handlerMethod.getMethod().getAnnotation(ResponseBody.class) != null
|| handlerMethod.getMethod().getDeclaringClass().getAnnotation(RestController.class) != null) {
// 访问需要登录的rest接口
response.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE);
response.getWriter().println(JsonUtil.toStr(ResVo.fail(StatusEnum.FORBID_NOTLOGIN)));
response.getWriter().flush();
return false;
} else if (request.getRequestURI().startsWith("/api/admin/") || request.getRequestURI().startsWith("/admin/")) {
response.sendRedirect("/admin");
} else {
// 访问需要登录的页面时,直接跳转到登录界面
response.sendRedirect("/");
}
return false;
}
if (permission.role() == UserRole.ADMIN && !UserRole.ADMIN.name().equalsIgnoreCase(ReqInfoContext.getReqInfo().getUser().getRole())) {
// 设置为无权限
response.setStatus(HttpStatus.FORBIDDEN.value());
return false;
}
}
return true;
}
排行榜查询
- 从redis中获取topN的用户+评分
- 查询用户的信息
- 根据用户评分进行排序,并更新每个用户的排名
@Override
public List<RankItemDTO> queryRankList(ActivityRankTimeEnum time, int size) {
String rankKey = time == ActivityRankTimeEnum.DAY ? todayRankKey() : monthRankKey();
// 1. 获取topN的活跃用户
List<ImmutablePair<String, Double>> rankList = RedisClient.zTopNScore(rankKey, size);
if (CollectionUtils.isEmpty(rankList)) {
return Collections.emptyList();
}
// 2. 查询用户对应的基本信息
// 构建userId -> 活跃评分的map映射,用于补齐用户信息
Map<Long, Integer> userScoreMap = rankList.stream().collect(Collectors.toMap(s -> Long.valueOf(s.getLeft()), s -> s.getRight().intValue()));
List<SimpleUserInfoDTO> users = userService.batchQuerySimpleUserInfo(userScoreMap.keySet());
// 3. 根据评分进行排序
List<RankItemDTO> rank = users.stream()
.map(user -> new RankItemDTO().setUser(user).setScore(userScoreMap.getOrDefault(user.getUserId(), 0)))
.sorted((o1, o2) -> Integer.compare(o2.getScore(), o1.getScore()))
.collect(Collectors.toList());
// 4. 补齐每个用户的排名
IntStream.range(0, rank.size()).forEach(i -> rank.get(i).setRank(i + 1));
return rank;
}
- 核心的实现如下, 基于 zRangeWithScores 获取指定排名的用户+对应分数,其中topN的写法如下
/**
* 找出排名靠前的n个
*
* @param key
* @param n
* @return
*/
public static List<ImmutablePair<String, Double>> zTopNScore(String key, int n) {
return template.execute(new RedisCallback<List<ImmutablePair<String, Double>>>() {
@Override
public List<ImmutablePair<String, Double>> doInRedis(RedisConnection connection) throws DataAccessException {
Set<RedisZSetCommands.Tuple> set = connection.zRangeWithScores(keyBytes(key), -n, -1);
if (set == null) {
return Collections.emptyList();
}
return set.stream()
.map(tuple -> ImmutablePair.of(toObj(tuple.getValue(), String.class), tuple.getScore()))
.sorted((o1, o2) -> Double.compare(o2.getRight(), o1.getRight())).collect(Collectors.toList());
}
});
}