文章目录
- Redis
- spring整合redis
- 实现点赞
- 帖子的赞
- 用户的赞
- 关注功能
- 热帖排行
- redis存储验证码、登录凭证、用户信息
- kafka
- 阻塞队列
- kafka![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/d35be55986344b548710985cd8ecbd87.png)
- 触发事件
- 处理事件
- Redis高级
- 网站数据统计
- 实现搜索功能
- 实现的功能
- 1. elasticsearch基本设置
- 2. 实现数据同步
- 3. es搜索并分页、高亮的功能
- 4. 实现搜索功能
- Quarzt
- 任务调度
Redis
redis-cli
select 0
# 加值减值
incr count decr count
# 查询库中所有的key
keys *
# 以test开头的所有的key
keys test*
# key的类型, 名为test:user的key
type test:user
# key是否存在
exists test:user
del test:user# 删除
#过期时间, 10秒过期
expire test:user 10
spring整合redis
实现点赞
帖子的赞
like:entity:entityType:entityId
实体的赞中存的是用户id, 可以更好的适应各种需求–>set(userId)–无序唯一
- 点一次是点赞,再点一次是取消赞
- 先看集合中有没有userId,
operations.opsForSet().isMember(entityLikeKey, userId)
如果存在,就删除。不存在就添加
public void like(int userId,int entityType,int entityId,int entityUserId){
// 事务
redisTemplate.execute(new SessionCallback() {
@Override
public Object execute(RedisOperations operations) throws DataAccessException {
String entityLikeKey= RedisKeyUtil.getEntityLikeKey(entityType,entityId);
// 用户该帖子获得的赞
String userLikeKey=RedisKeyUtil.getUserLikeKey(entityUserId);
Boolean isMember = operations.opsForSet().isMember(entityLikeKey, userId);
operations.multi();//开启事务
if(isMember){
operations.opsForSet().remove(entityLikeKey,userId);
operations.opsForValue().decrement(userLikeKey);
}else {
operations.opsForSet().add(entityLikeKey,userId);
operations.opsForValue().increment(userLikeKey);
}
return operations.exec();//执行事务
}
});
}
- 查询某实体(帖子)的点赞数量
- 查询用户是否给实体点赞
- 查询某个用户获得的赞
// 查询某实体点赞的数量
public long findEntityLikeCount(int entityType,int entityId){
String entityLikeKey=RedisKeyUtil.getEntityLikeKey(entityType,entityId);
return redisTemplate.opsForSet().size(entityLikeKey);
}
//查询某人对某实体的点赞状态
public int findEntityLikeStatus(int userId,int entityType,int entityId){
String entityLikeKey=RedisKeyUtil.getEntityLikeKey(entityType,entityId);
return redisTemplate.opsForSet().isMember(entityLikeKey,userId)?1:0;
//查询某个用户的赞的数量
public int findUserLikeCount(int userId){
String userLikeKey = RedisKeyUtil.getUserLikeKey(userId);
Integer count = (Integer) redisTemplate.opsForValue().get(userLikeKey);
return count==null?0:count.intValue();
}
}
用户的赞
like:user:userId
关注功能
- 某个用户关注的所有实体
followee:userId:entityType ->Zset(entityId,now)
按时间排序 - 实体的粉丝
follower:entityType:entityId
->Zset(userId,now)
关注取关的功能实现
public void follow(int userId,int entityType,int entityId){
redisTemplate.execute(new SessionCallback() {
@Override
public Object execute(RedisOperations operations) throws DataAccessException {
String followeeKey= RedisKeyUtil.getFolloweeKey(userId,entityType);
String followerKey= RedisKeyUtil.getFollowerKey(entityId,entityType);
operations.multi();
operations.opsForZSet().add(followeeKey,entityId,System.currentTimeMillis());
operations.opsForZSet().add(followerKey,userId,System.currentTimeMillis());
return operations.exec();
}
});
}
public void unfollow(int userId,int entityType,int entityId){
redisTemplate.execute(new SessionCallback() {
@Override
public Object execute(RedisOperations operations) throws DataAccessException {
String followeeKey= RedisKeyUtil.getFolloweeKey(userId,entityType);
String followerKey= RedisKeyUtil.getFollowerKey(entityId,entityType);
operations.multi();
operations.opsForZSet().remove(followeeKey,entityId);
operations.opsForZSet().remove(followerKey,userId);
return operations.exec();
}
});
}
- 查询关注实体的数量
- 查询实体粉丝的数量
- 查询当前用户是否已关注这个实体
粉丝列表、关注列表 - 查询某用户关注的人
- 查询某用户的粉丝
//查询某用户关注的人
public List<Map<String,Object>> findFollowees(int userId,int offset,int limit){
String followeeKey= RedisKeyUtil.getFolloweeKey(userId,ENTITY_TYPE_USER);
Set<Integer> targetIds=redisTemplate.opsForZSet().reverseRange(followeeKey,offset,offset+limit-1);
if(targetIds==null){
return null;
}
List<Map<String,Object>> list=new ArrayList<>();
for(Integer targetId:targetIds){
Map<String,Object> map=new HashMap<>();
User user = userService.findUserById(targetId);
map.put("user",user);
Double score=redisTemplate.opsForZSet().score(followeeKey,targetId);
map.put("followTime",new Date(score.longValue()));
list.add(map);
}
return list;
}
//查询某用户的粉丝
public List<Map<String,Object>> findFollowers(int userId,int offset,int limit){
String followerKey= RedisKeyUtil.getFollowerKey(userId,ENTITY_TYPE_USER);
Set<Integer> targetIds=redisTemplate.opsForZSet().reverseRange(followerKey,offset,offset+limit-1);
if(targetIds==null){
return null;
}
List<Map<String,Object>> list=new ArrayList<>();
for(Integer targetId:targetIds){
Map<String,Object> map=new HashMap<>();
User user = userService.findUserById(targetId);
map.put("user",user);
Double score=redisTemplate.opsForZSet().score(followerKey,targetId);
map.put("followTime",new Date(score.longValue()));
list.add(map);
}
return list;
}
热帖排行
// 帖子分数
public static String getPostScoreKey() {
return PREFIX_POST + SPLIT + "score";
}
- 评论、加精、点赞、收藏时,对帖子算分数
- 将需要重新计算分数的帖子Id存入Redis中
// 保存需要计算帖子分数的id
String redisKey = RedisKeyUtil.getPostScoreKey();
redisTemplate.opsForSet().add(redisKey, discussPostId);
- 定时刷新分数,五分钟一次
- quartz对帖子分数进行计算
private void refresh(int postId) {
DiscussPost post = discussPostService.findDiscussPostById(postId);
if(post == null) {
logger.error("该帖子不存在:id=" + postId);
return;
}
// 是否精华
boolean wonderful = post.getStatus() == 1;
// 评论数量
int commentCount = post.getCommentCount();
// 点赞数量
long likeCount = likeService.findEntityLikeCount(ENTITY_TYPE_POST, postId);
// 计算权重
double w = (wonderful ? 75 : 0) + commentCount * 10 + likeCount * 2;
// 分数 = 帖子权重 + 距离天数
double score = Math.log10(Math.max(w, 1)) + (post.getCreateTime().getTime()-epoch.getTime()) / (1000 * 3600 * 24);
// 更新帖子分数
discussPostService.updateScore(postId, score);
// 同步搜索的数据
post.setScore(score);
elasticSearchService.saveDiscussPost(post);
}
配置quartz
// 刷新帖子分数任务
@Bean
public JobDetailFactoryBean postScoreRefreshJobDetail(){
JobDetailFactoryBean factoryBean = new JobDetailFactoryBean();
factoryBean.setJobClass(PostScoreRefreshJob.class);
factoryBean.setName("postScoreRefreshJob");
factoryBean.setGroup("communityJobJobGroup");
factoryBean.setDurability(true); // 持久化
factoryBean.setRequestsRecovery(true);
return factoryBean;
}
@Bean
public SimpleTriggerFactoryBean postScoreRefreshTrigger(JobDetail postScoreRefreshJobDetail){
SimpleTriggerFactoryBean factoryBean = new SimpleTriggerFactoryBean();
factoryBean.setJobDetail(postScoreRefreshJobDetail);
factoryBean.setName("postScoreRefreshTrigger");
factoryBean.setGroup("communityTriggerGroup");
factoryBean.setRepeatInterval(1000 * 60 * 5); // 频率
factoryBean.setJobDataMap(new JobDataMap()); // 存储job的状态
return factoryBean;
}
redis存储验证码、登录凭证、用户信息
- 存储验证码时,可以给用户生成一个临时的随机字符串,用于标识用户。
- 登录凭证,退出后,将状态改变,再存入
- 缓存用户信息
- 优先从缓存中取值
- 取不到数据时,初始化缓存数据
- 数据变更时,清除缓存数据
//1.优先从缓存中读取数据
public User getCache(int userId){
String redisKey = RedisKeyUtil.getUserKey(userId);
return (User) redisTemplate.opsForValue().get(redisKey);
}
//2.在缓存中读不到数据时,初始化缓存数据
public User initCache(int userId){
User user = userMapper.selectById(userId);
String redisKey = RedisKeyUtil.getUserKey(userId);
redisTemplate.opsForValue().set(redisKey,user,3600, TimeUnit.SECONDS);
return user;
}
//3.数据变更时清除缓存
private void clearCache(int userId){
String redisKey = RedisKeyUtil.getUserKey(userId);
redisTemplate.delete(redisKey);
}
kafka
阻塞队列
kafka
数据存到硬盘上,主副本,从副本
触发事件
将消息放入队列中
封装成事件,
- 评论后,发布通知
触发发帖事件
Event event=new Event()
.setTopic(TOPIC_COMMENT)
.setUserId(hostHolder.getUser().getId())
.setEntityType(comment.getEntityType())
.setEntityId(comment.getEntityId())
.setData("postId", discussPostId);
if(comment.getEntityType()==ENTITY_TYPE_POST){
DiscussPost target = discussPostService.findDiscussPostById(comment.getEntityId());
event.setEntityUserId(target.getUserId());
}else if(comment.getEntityType()==ENTITY_TYPE_COMMENT){
Comment target=commentService.findCommentsById(comment.getEntityId());
event.setEntityUserId(target.getUserId());
}
eventProducer.fireEvent(event);
- 点赞后, 发布通知
- 关注后,发布通知
public class EventProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
//处理事件
public void fireEvent(Event event){
//将事件发布到指定的主题
kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
}
}
处理事件
消费事件
//发送站内消息
Message message = new Message();
message.setFromId(SYSTEM_USER_ID);
message.setToId(event.getEntityUserId());
message.setConversationId(event.getTopic());
message.setCreateTime(new Date());
Map<String,Object> content=new HashMap<>();
content.put("userId",event.getUserId());
content.put("entityType",event.getEntityType());
content.put("entityId",event.getEntityId());
if(!event.getData().isEmpty()){
for (Map.Entry<String ,Object> entry:event.getData().entrySet()) {
content.put(entry.getKey(),entry.getValue());
}
}
message.setContent(JSONObject.toJSONString(content));
messageService.addMessage(message);
- 封装事件对象
- 开发事件的生产者
- 开发事件的消费者
Redis高级
网站数据统计
- 设定redis键值
用时间来当做rediskey
// 单日uv
public static String getUVKey(String date){
return PREFIX_UV + SPLIT + date;
}
// 区间uv
public static String getUVKey(String startDate, String endDate){
return PREFIX_UV + SPLIT + startDate + SPLIT + endDate;
}
// 单日活跃用户
public static String getDAUKey(String date){
return PREFIX_DAU + SPLIT + date;
}
// 区间活跃用户
public static String getDAUKey(String startDate, String endDate){
return PREFIX_DAU + SPLIT + startDate + SPLIT + endDate;
}
- service层
// 将指定的IP计入UV
public void recordUV(String ip){
// 访问的日期作为rediskey
String redisKey = RedisKeyUtil.getUVKey(df.format(new Date()));
// ip地址作为值存入redis
redisTemplate.opsForHyperLogLog().add(redisKey, ip);
}
// 指定统计日期范围内的UV
public long calculateUV(Date start, Date end){
if(start == null || end == null){
throw new IllegalArgumentException("参数不能为空!");
}
// 整理该日期范围内的key
List<String> keyList = new ArrayList<>();
// 实例化,获取当前时间
Calendar calendar = Calendar.getInstance();
calendar.setTime(start);
while(!calendar.getTime().after(end)){
String key = RedisKeyUtil.getUVKey(df.format(calendar.getTime()));
keyList.add(key);
// 日期往后增加一天
calendar.add(Calendar.DATE, 1);
}
// 合并数据
String redisKey = RedisKeyUtil.getUVKey(df.format(start), df.format(end));
redisTemplate.opsForHyperLogLog().union(redisKey, keyList.toArray());
// 返回统计的结果
return redisTemplate.opsForHyperLogLog().size(redisKey);
}
// 将指定用户计入DAU
public void recordDAU(int userId){
String redisKey = RedisKeyUtil.getDAUKey(df.format(new Date()));
redisTemplate.opsForValue().setBit(redisKey, userId, true);
}
// 统计指定日期范围内的DAU
public long calculateDAU(Date start, Date end){
if(start == null || end == null){
throw new IllegalArgumentException("参数不能为空!");
}
// 整理该日期范围内的key
List<byte[]> keyList = new ArrayList<>();
// 实例化
Calendar calendar = Calendar.getInstance();
calendar.setTime(start);
while(!calendar.getTime().after(end)){
String key = RedisKeyUtil.getDAUKey(df.format(calendar.getTime()));
keyList.add(key.getBytes());
calendar.add(Calendar.DATE, 1);
}
// 进行OR运算
return (long) redisTemplate.execute(new RedisCallback() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
String redisKey = RedisKeyUtil
.getDAUKey(df.format(start),df.format(end));
connection.bitOp(RedisStringCommands.BitOperation.OR, redisKey.getBytes(),keyList.toArray(new byte[0][0]));
return connection.bitCount(redisKey.getBytes());
}
});
}
- 在prehandle中实现调用
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 统计UV
String ip = request.getRemoteHost();
dataService.recordUV(ip);
// 统计DAU
User user = hostHolder.getUser();
if(user != null){
dataService.recordDAU(user.getId());
}
return true;
}
- controller层
实现搜索功能
实现的功能
- 搜索服务:将帖子保存到elasticsearch服务器;从es服务器搜索、删除帖子;
- 发布服务:发布帖子时,将帖子异步提交到es服务器;增加评论时,将帖子异步提交到es服务器;·在消费组件中增加一个方法,消费帖子发布事件;
- 显示结果:在控制器中处理搜索请求,在HTML上显示搜索结果。
1. elasticsearch基本设置
底层是基于netty的
// 管理bean的生命周期的,初始化的方法
// 被它修饰的方法在构造器调用完以后执行
@PostConstruct
public void init(){
//解决netty启动冲突问题
System.setProperty("es.set.netty.runtime.available.processors","false");
}
要考虑mysql中的表和es中的索引的对应关系.数据库中的字段和es的字段的对应关系
// 索引的名字,类型,分片,副本
@Document(indexName = "discusspost",type = "doc",shards=6,replicas=3)
public class DiscussPost {
@Id
private int id;// 主键
@Field(type = FieldType.Integer)
private int userId;
// 存储的分词器和搜索的分词器
@Field(type = FieldType.Text,analyzer = "ik_max_word",searchAnalyzer = "ik_smart")
private String title;
}
设置Repository接口
// 定义处理的实体类是DiscussPost,实体的主键类型Integer
@Repository
public interface DiscussPostRepository extends ElasticsearchRepository<DiscussPost,Integer> {
}
2. 实现数据同步
通过生产者消费者的方式同步数据的变化,将数据保存在es服务器中
触发发帖事件。
//触发发帖事件
Event event=new Event()
.setTopic(TOPIC_PUBLISH)
.setUserId(user.getId())
.setEntityType(ENTITY_TYPE_POST)
.setEntityId(discussPost.getId())
;
eventProducer.fireEvent(event);
消费发帖事件
Event event=JSONObject.parseObject(record.value().toString(),Event.class);
// 查找新增的帖子,并保存到es
DiscussPost post= discussPostService.findDiscussPostById(event.getEntityId());
elasticSearchService.saveDiscussPost(post);
3. es搜索并分页、高亮的功能
public Page<DiscussPost> searchDiscussPost(String keyword,int current,int limit){
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.multiMatchQuery(keyword, "title", "content"))
.withSort(SortBuilders.fieldSort("type").order(SortOrder.DESC))
.withSort(SortBuilders.fieldSort("score").order(SortOrder.DESC))
.withSort(SortBuilders.fieldSort("createTime").order(SortOrder.DESC))
.withPageable(PageRequest.of(current, limit))
.withHighlightFields(
new HighlightBuilder.Field("title").preTags("<em>").postTags("</em>"),
new HighlightBuilder.Field("content").preTags("<em>").postTags("</em>")
).build();
return elasticsearchTemplate.queryForPage(searchQuery, DiscussPost.class, new SearchResultMapper() {
@Override
public <T> AggregatedPage<T> mapResults(SearchResponse response, Class<T> aClass, Pageable pageable) {
SearchHits hits = response.getHits();
if (hits.getTotalHits() <= 0) {
return null;
}
List<DiscussPost> list = new ArrayList<>();
for (SearchHit hit : hits) {
DiscussPost post = new DiscussPost();
String id = hit.getSourceAsMap().get("id").toString();
post.setId(Integer.valueOf(id));
String userId = hit.getSourceAsMap().get("userId").toString();
post.setUserId(Integer.valueOf(userId));
String title = hit.getSourceAsMap().get("title").toString();
post.setTitle(title);
String content = hit.getSourceAsMap().get("content").toString();
post.setContent(content);
String status = hit.getSourceAsMap().get("status").toString();
post.setStatus(Integer.valueOf(status));
String createTime = hit.getSourceAsMap().get("createTime").toString();
post.setCreateTime(new Date(Long.valueOf(createTime)));
String commentCount = hit.getSourceAsMap().get("commentCount").toString();
post.setCommentCount(Integer.valueOf(commentCount));
// 处理高亮显示的结果
HighlightField titleField = hit.getHighlightFields().get("title");
if (titleField != null) {
post.setTitle(titleField.getFragments()[0].toString());
}
HighlightField contentField = hit.getHighlightFields().get("content");
if (contentField != null) {
post.setContent(contentField.getFragments()[0].toString());
}
list.add(post);
}
return new AggregatedPageImpl(list, pageable,
hits.getTotalHits(), response.getAggregations(), response.getScrollId(), hits.getMaxScore());
}
});
4. 实现搜索功能
org.springframework.data.domain.Page<DiscussPost> searchResult=
elasticSearchService.searchDiscussPost(keyword, page.getCurrent()-1, page.getLimit());
// 聚合数据发送到前端
List<Map<String,Object>> discussPosts=new ArrayList<>();
...
Quarzt
任务调度
存到数据库中
配置