微服务项目收获和总结---第5天(定时发布)

延迟任务

目录

  • 延迟任务
  • 技术对比: 
  •  Redis实现定时任务:​编辑
  • 新增任务:
  • 取消任务:
  • 拉取任务:
  • Zset定时刷新数据到List中:
  • 分布式锁实现定时任务只刷新一次:

技术对比: 

 

 Redis实现定时任务:

 问题1:为什么任务需要存储在数据库中?

问题2:为什么使用两种Redis的数据结构(List和Zset)?

 问题3:添加Zset数据时,为什么需要预加载?

        

新增任务:

添加任务到数据库进行持久化----->添加任务到Redis队列中------>如果任务的执行时间小于等于当前时间,则添加到当前任务List队列中------>如果任务的执行时间大于当前时间并且小于预设时间,就把其放到未来队列Zset中

@Service
@Transactional
@Slf4j
public class TaskServiceImpl implements TaskService {

    /**
     * 添加延迟任务
     *
     * @param task
     * @return
     */
    @Override
    public long addTask(Task task) {
        //1.添加任务到数据库中
        boolean success = addTaskToDb(task);
        if (success) {
            //2.添加任务到redis
            addTaskToCache(task);
        }
        return task.getTaskId();
    }
 

    @Autowired
    private CacheService cacheService;
    /**
     * 把任务添加到redis中
     *
     * @param task
     */
    private void addTaskToCache(Task task) {
        String key = task.getTaskType() + "_" + task.getPriority();
        //获取5分钟之后的时间  毫秒值
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.MINUTE, 5);
        long nextScheduleTime = calendar.getTimeInMillis();
 
        //2.1 如果任务的执行时间小于等于当前时间,存入list
        if (task.getExecuteTime() <= System.currentTimeMillis()) {
            cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));
        } else if (task.getExecuteTime() <= nextScheduleTime) {
            //2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中
            cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());
        }
    }
 
    @Autowired
    private TaskinfoMapper taskinfoMapper;
    @Autowired
    private TaskinfoLogsMapper taskinfoLogsMapper;
 
    /**
     * 添加任务到数据库中
     *
     * @param task
     * @return
     */
    private boolean addTaskToDb(Task task) {
        boolean flag = false;
        try {
            //保存任务表
            Taskinfo taskinfo = new Taskinfo();
            BeanUtils.copyProperties(task, taskinfo);
            taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
            taskinfoMapper.insert(taskinfo);
 
            //设置taskID
            task.setTaskId(taskinfo.getTaskId());
 
            //保存任务日志数据
            TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
            BeanUtils.copyProperties(taskinfo, taskinfoLogs);
            taskinfoLogs.setVersion(1);
            taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
            taskinfoLogsMapper.insert(taskinfoLogs);
 
            flag = true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return flag;
    }
}

取消任务:

把数据库的任务删除----->更新任务的日志------>删除redis队列中的数据

/**
     * 取消任务
     * @param taskId
     * @return
     */
@Override
public boolean cancelTask(long taskId) {
    boolean flag = false;
    //删除任务,更新日志
    Task task = updateDb(taskId,ScheduleConstants.EXECUTED);
    //删除redis的数据
    if(task != null){
        removeTaskFromCache(task);
        flag = true;
    }
    return false;
}
 
/**
     * 删除redis中的任务数据
     * @param task
     */
private void removeTaskFromCache(Task task) {
    String key = task.getTaskType()+"_"+task.getPriority();
    if(task.getExecuteTime()<=System.currentTimeMillis()){
        cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task));
    }else {
        cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task));
    }
}
 
/**
     * 删除任务,更新任务日志状态
     * @param taskId
     * @param status
     * @return
     */
private Task updateDb(long taskId, int status) {
    Task task = null;
    try {
        //删除任务
        taskinfoMapper.deleteById(taskId);
        //更新日志
        TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);
        taskinfoLogs.setStatus(status);
        taskinfoLogsMapper.updateById(taskinfoLogs);
 
        task = new Task();
        BeanUtils.copyProperties(taskinfoLogs,task);
        task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
    }catch (Exception e){
        log.error("task cancel exception taskid={}",taskId);
    }
    return task;
}

 

拉取任务:

 /**
     * 按照类型和优先级拉取任务
     * @return
     */
@Override
public Task poll(int type,int priority) {
    Task task = null;
    try {
        String key = type+"_"+priority;
        //从List中取出任务
        String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
        if(StringUtils.isNotBlank(task_json)){
            task = JSON.parseObject(task_json, Task.class);
            //更新数据库信息,删除任务并且更新日志
            updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);
        }
    }catch (Exception e){
        e.printStackTrace();
        log.error("poll task exception");
    }
 
    return task;
}

Zset定时刷新数据到List中+分布式锁实现定时任务只刷新一次:

如何获取Zset中所有的key。方案一:keys()获取,不推荐。方案二:scan()获取。

通过定时任务,使用redis管道,把Zset中的数据往List中传输 

 
/**
 * 未来数据定时刷新
 */
@Scheduled(cron = "0 */1 * * * ?")
public void refresh(){
    //获取锁
    String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);
    if(StringUtils.isNotBlank(token)){
        log.info("未来数据定时刷新---定时任务");
 
        //获取所有未来数据的集合key
        Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
        for (String futureKey : futureKeys) {//future_100_50
 
            //获取当前数据的key  topic
            String topicKey = ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1];
 
            //按照key和分值查询符合条件的数据
            Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
 
            //同步数据
            if(!tasks.isEmpty()){
                cacheService.refreshWithPipeline(futureKey,topicKey,tasks);
                log.info("成功的将"+futureKey+"刷新到了"+topicKey);
            }
        }
    }
}

数据库同步到Redis中:

 清除缓存----->查询出小于未来5分钟的所有任务------>新增任务到Redis中

 
@Scheduled(cron = "0 */5 * * * ?")
@PostConstruct
public void reloadData() {
    clearCache();
    log.info("数据库数据同步到缓存");
    Calendar calendar = Calendar.getInstance();
    calendar.add(Calendar.MINUTE, 5);
 
    //查看小于未来5分钟的所有任务
    List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));
    if(allTasks != null && allTasks.size() > 0){
        for (Taskinfo taskinfo : allTasks) {
            Task task = new Task();
            BeanUtils.copyProperties(taskinfo,task);
            task.setExecuteTime(taskinfo.getExecuteTime().getTime());
            addTaskToCache(task);
        }
    }
}
 
private void clearCache(){
    // 删除缓存中未来数据集合和当前消费者队列的所有key
    Set<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_
    Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_
    cacheService.delete(futurekeys);
    cacheService.delete(topickeys);
}

其他微服务通过Fegin远程调用Schedule服务 :
@FeignClient("leadnews-schedule")
public interface IScheduleClient {
 
    /**
     * 添加任务
     * @param task   任务对象
     * @return       任务id
     */
    @PostMapping("/api/v1/task/add")
    public ResponseResult  addTask(@RequestBody Task task);
 
    /**
     * 取消任务
     * @param taskId        任务id
     * @return              取消结果
     */
    @GetMapping("/api/v1/task/cancel/{taskId}")
    public ResponseResult cancelTask(@PathVariable("taskId") long taskId);
 
    /**
     * 按照类型和优先级来拉取任务
     * @param type
     * @param priority
     * @return
     */
    @GetMapping("/api/v1/task/poll/{type}/{priority}")
    public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority")  int priority);
}

文章发布和审核:

 发布:
 
@Autowired
private WmNewsTaskService wmNewsTaskService;
 
/**
     * 发布修改文章或保存为草稿
     * @param dto
     * @return
     */
@Override
public ResponseResult submitNews(WmNewsDto dto) {
 
    //0.条件判断
    if(dto == null || dto.getContent() == null){
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    }
 
    //1.保存或修改文章
 
    WmNews wmNews = new WmNews();
    //属性拷贝 属性名词和类型相同才能拷贝
    BeanUtils.copyProperties(dto,wmNews);
    //封面图片  list---> string
    if(dto.getImages() != null && dto.getImages().size() > 0){
        //[1dddfsd.jpg,sdlfjldk.jpg]-->   1dddfsd.jpg,sdlfjldk.jpg
        String imageStr = StringUtils.join(dto.getImages(), ",");
        wmNews.setImages(imageStr);
    }
    //如果当前封面类型为自动 -1
    if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){
        wmNews.setType(null);
    }
 
    saveOrUpdateWmNews(wmNews);
 
    //2.判断是否为草稿  如果为草稿结束当前方法
    if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())){
        return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
    }
 
    //3.不是草稿,保存文章内容图片与素材的关系
    //获取到文章内容中的图片信息
    List<String> materials =  ectractUrlInfo(dto.getContent());
    saveRelativeInfoForContent(materials,wmNews.getId());
 
    //4.不是草稿,保存文章封面图片与素材的关系,如果当前布局是自动,需要匹配封面图片
    saveRelativeInfoForCover(dto,wmNews,materials);
 
    //审核文章
    //        wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
    wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime());
 
    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
 
}
 消费:
@Autowired
private WmNewsAutoScanServiceImpl wmNewsAutoScanService;
 
/**
     * 消费延迟队列数据
     */
@Scheduled(fixedRate = 1000)
@Override
@SneakyThrows
public void scanNewsByTask() {
 
    log.info("文章审核---消费任务执行---begin---");
 
    ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
    if(responseResult.getCode().equals(200) && responseResult.getData() != null){
        String json_str = JSON.toJSONString(responseResult.getData());
        Task task = JSON.parseObject(json_str, Task.class);
        byte[] parameters = task.getParameters();
        WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class);
        System.out.println(wmNews.getId()+"-----------");
        wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
    }
    log.info("文章审核---消费任务执行---end---");
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/655382.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

万界星空科技定制化MES系统帮助实现数字化生产

由于不同企业的生产流程、需求和目标各异&#xff0c;MES管理系统的个性化和定制化需求也不同。有些企业需要将MES管理系统与ERP等其他管理系统进行集成&#xff0c;以实现全面的信息共享和协同工作。有些企业需要将MES管理系统与SCADA等控制系统进行集成&#xff0c;以实现实时…

极验3逆向 JS逆向最新点选验证码 逆向分析详解

目录 声明&#xff01; 一、请求流程分析 二、w参数生成位置 三、主要问题 四、结果展示 原创文章&#xff0c;请勿转载&#xff01; 本文内容仅限于安全研究&#xff0c;不公开具体源码。维护网络安全&#xff0c;人人有责。 声明&#xff01; 本文章中所有内容仅供学习交流…

K8s种的service配置

什么是service 官方的解释是:   k8s中最小的管理单元是pod&#xff1b;而service是 将运行在一个或一组 Pod 上的网络应用程序公开为网络服务的方法;   Kubernetes 中 Service 的一个关键目标是让你无需修改现有应用以使用某种服务发现机制。 你可以在 Pod 集合中运行代码…

MySQL(三)查询

1、单表和多表查询 1.1 算术运算符、比较运算符及特殊运算符 1)MySQL的算术运算符 select 0.1+0.3333,0.1-0.3333,0.1*0.3333,1/2,1%2; select 1/0,100%0; select 3%2,mod(3,2); 2)MySQL的比较运算符 select 1=0,1=1,null=null; select 1<>0,1<>1,null<&…

音乐系统java在线音乐网站基于springboot+vue的音乐系统带万字文档

文章目录 音乐系统一、项目演示二、项目介绍三、万字项目文档四、部分功能截图五、部分代码展示六、底部获取项目源码和万字论文参考&#xff08;9.9&#xffe5;带走&#xff09; 音乐系统 一、项目演示 在线音乐系统 二、项目介绍 基于springbootvue的前后端分离在线音乐系…

英特尔LLM技术挑战记录

英特尔技术介绍&#xff1a; Flash Attention Flash Attention 是一种高效的注意力机制实现&#xff0c;旨在优化大规模 Transformer 模型中的自注意力计算。在深度学习和自然语言处理领域&#xff0c;自注意力是 Transformer 架构的核心组件&#xff0c;用于模型中不同输入元…

Java 文件操作和输入输出流

在 Java 编程中&#xff0c;文件操作和输入输出流是非常常见和重要的任务&#xff0c;它们允许你读取和写入文件、处理数据流等。 文件操作概述 文件操作是指对文件进行创建、读取、写入、删除等操作的过程。在 Java 中&#xff0c;文件操作通常涉及到使用文件对象、输入输出…

聚合网卡和Wondershaper限速的一些问题(速度减半问题)

首先我们来了解一下聚合网卡&#xff1a; 聚合网卡&#xff0c;又称为链路聚合组&#xff08;LAG, Link Aggregation Group&#xff09;、端口汇聚&#xff08;Port Trunking&#xff09;、以太通道&#xff08;Ethernet Bonding&#xff09;等&#xff0c;是一种网络技术&…

python基础知识:py文件转换为jupyter文件

搜索了很多&#xff0c;都没什么用&#xff0c;会出现一些json错误&#xff0c;最终直接新建文件成功: 在自己电脑安装Anaconda&#xff0c;安装jupyter notebook&#xff0c;输入命令打开jupyter notebook&#xff1a; 在Anoconda命令行中cd到自己要转换文件的地址&#xff0…

【css3】01-css3新特性样式篇

目录 1 背景 1.1 设置背景图片的定位 1.2 背景裁切-规定背景的绘制区域 1.3 设置背景图片尺寸 2 边框 2.1 盒子阴影box-shadow 2.2 边框图片border-image 3 文本 -文字阴影text-shadow 1 背景 1.1 设置背景图片的定位 background-origin&#xff1a;规定背景图片的定位…

大型央企国企信创化与数字化转型规划实施方案(71页PPT)

方案介绍&#xff1a; 随着全球信息技术的迅猛发展&#xff0c;数字化转型已成为企业提升竞争力、实现可持续发展的必经之路。作为国家经济的重要支柱&#xff0c;大型央企国企在信创化与数字化转型方面承载着重要的责任和使命。本方案旨在通过系统性的规划和实施&#xff0c;…

OrangePi AIpro测评:智能与创新的完美结合

OrangePi AIpro上手指南 简介 香橙派与华为合作发布的香橙派AiPro为Ai主力&#xff0c;为边缘设备的Ai计算提供了可能。 集成图形处理器&#xff0c;拥有8GB/16GB LPDDR4X&#xff08;我这个是8G内存版本的&#xff09;&#xff0c;可以外接32GB/64GB/128GB/256GB eMMC模块&a…

Nacos 2.x 系列【9】配置中心

文章目录 1. 概述1.1 配置1.2 配置中心 2. 案例演示2.1 环境搭建2.2 自定义参数配置2.2 服务配置 1. 概述 1.1 配置 在系统开发过程中&#xff0c;开发者通常会将一些需要变更的参数、变量等从代码中分离出来独立管理&#xff0c;以独立的配置文件的形式存在。 在实际开发中…

华为OD机试【计算最接近的数】(java)(100分)

1、题目描述 给定一个数组X和正整数K&#xff0c;请找出使表达式X[i] - X[i1] … - X[i K 1]&#xff0c;结果最接近于数组中位数的下标i&#xff0c;如果有多个i满足条件&#xff0c;请返回最大的i。 其中&#xff0c;数组中位数&#xff1a;长度为N的数组&#xff0c;按照元…

922. 按奇偶排序数组 II - 力扣

1. 题目 给定一个非负整数数组 nums&#xff0c; nums 中一半整数是 奇数 &#xff0c;一半整数是 偶数 。 对数组进行排序&#xff0c;以便当 nums[i] 为奇数时&#xff0c;i 也是 奇数 &#xff1b;当 nums[i] 为偶数时&#xff0c; i 也是 偶数 。 你可以返回 任何满足上述…

FreeRtos进阶——消息队列的操作逻辑

消息队列&#xff08;queue&#xff09; 在不同的任务之间&#xff0c;如果我们需要互相之间通信&#xff0c;使用全局变量进行通信&#xff0c;是一种不安全的通信的方式。为保证线程安全&#xff0c;我们需要引入消息队列的通信方式。 粗暴的消息队列 为保证线程的安全&am…

生成验证码的奥秘:从列表到字符串的魔法转换

新书上架~&#x1f447;全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我&#x1f446;&#xff0c;收藏下次不迷路┗|&#xff40;O′|┛ 嗷~~ 目录 一、引言&#xff1a;验证码生成的背景与需求 二、生成验证码的方法一&#xff1a;列表生成…

大模型时代的具身智能系列专题(四)

google deepmind团队 谷歌旗下最大的两个 AI 研究机构——地处伦敦 DeepMind 与位于硅谷的 Google Brain 合并成立新部门 Google DeepMind。其将机器学习和系统神经科学的最先进技术结合起来&#xff0c;建立强大的通用学习算法。代表作有AlphaGo&#xff0c;AlphaStar&#x…

基于语音识别的智能电子病历(三)之 Soniox

Soniox成立于2020年&#xff0c;目前总部位于美国加州福斯特城&#xff0c;该公司开发了市场上最好的语音识别引擎之一。该公司目前提供市面上领先的云转录引擎之一——这也是audioXpress成功用于采访和一般语音转文本转换的引擎。 专注于语音AI的Soniox在2021年推出了世界上第…