学成在线 - 第3章任务补偿机制实现 + 分块文件清理

7.9 额外实现

7.9.1 任务补偿机制

问题:如果有线程抢占了某个视频的处理任务,如果线程处理过程中挂掉了,该视频的状态将会一直是处理中,其它线程将无法处理,这个问题需要用补偿机制。

单独启动一个任务找到待处理任务表中超过执行期限但仍在处理中的任务,将任务的状态改为执行失败。

任务执行期限是处理一个视频的最大时间,比如定为30分钟,通过任务的启动时间去判断任务是否超过执行期限。

大家思考这个sql该如何实现?

大家尝试自己实现此任务补偿机制。

数据库表结构

media_process:

在这里插入图片描述

根据status字段判断视频文件是否正在处理,如果status值为4(正在处理),且当前时间减去create_date超过30分钟,就把status的值修改为3(处理失败),并更新其他字段的值。

具体实现流程
  1. 编写检查超时的sql

    SELECT * FROM `media_process_history`
    WHERE TIMESTAMPDIFF(MINUTE,create_date,finish_date) < 30
    
  2. 在media_process的mapper接口中添加相应的接口

    MediaProcessMapper.java

    /**
     * 查询是否有执行超过30分钟的视频处理任务
     * @param nowDate 当前任务执行的时间
     * @return List<MediaProcess>
     */
    @Select("SELECT * FROM media_process_history WHERE TIMESTAMPDIFF(MINUTE,create_date,#{date}) > 30")
    List<MediaProcess> selectTimeoutProcess(@Param("date")Date nowDate);
    
  3. 实现service代码

    /**
     * 超时任务列表
     * @param shardIndex 分片序号
     * @param shardTotal 分片总数
     * @param count 获取记录数(cpu核心数)
     * @return List<MediaProcess>
     */
    @Override
    public List<MediaProcess> getTimeoutMediaProcessList(int shardIndex, int shardTotal, int count) {
        List<MediaProcess> mediaTimeoutProcessList = mediaProcessMapper.selectTimeoutProcess(LocalDateTime.now(), shardIndex, shardTotal, count);
        return mediaTimeoutProcessList;
    }
    
  4. 业务逻辑在task中实现

    查询相应超时记录,把status和errMsg字段更新后保存到media_process表里。

    /**
     * 处理视频超时任务
     * @throws Exception
     */
    @XxlJob("videoTimeoutJobHandler")
    public void videoTimeoutJobHandler() throws Exception {
        // 分片参数
        int shardIndex = XxlJobHelper.getShardIndex();
        int shardTotal = XxlJobHelper.getShardTotal();
        List<MediaProcess> mediaTimeoutProcessList = null;
        int size = 0;
        try {
            // 取出cpu核心数作为一次检查超时的记录条数
            int availableProcessors = Runtime.getRuntime().availableProcessors();
            mediaTimeoutProcessList = mediaFileProcessService.getTimeoutMediaProcessList(LocalDateTime.now(), shardIndex, shardTotal, availableProcessors);
            size = mediaTimeoutProcessList.size();
            log.debug("取出的超时任务数量是 {} 条", size);
            if (size <= 0) {
                return ;
            }
        } catch (Exception e) {
            log.error("取出的超时任务超时", size);
            e.printStackTrace();
        }
        // 启动size个线程的线程池
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(size);
        // 定义size个插销
        CountDownLatch countDownLatch = new CountDownLatch(size);
        mediaTimeoutProcessList.forEach(mediaProcess -> {
            // 将任务加入线程池
            fixedThreadPool.execute(() -> {
                try {
                    // 把status值设置为3,并把errMsg更新
                    Long taskId = mediaProcess.getId();
                    String fileId = mediaProcess.getFileId();
                    mediaFileProcessService.saveProcessFinishStatus(taskId, "3", fileId, null, "处理视频超时(30分钟)");
                    log.debug("删除超时任务");
                } catch (Exception e) {
                    e.printStackTrace();
                    log.error("删除超时任务失败,失败原因: {}", e.getMessage());
                } finally {
                    countDownLatch.countDown();
                }
            });
        });
        // 等待,给一个充裕的超时事件,防止无限等待,到达超时时间还没有处理完成则结束任务
        countDownLatch.await(30, TimeUnit.MINUTES);
    }
    

7.9.2 达到最大失败次数(暂时未实现)

问题:需要找到前端对应的接口。

当任务达到最大失败次数时一般就说明程序处理此视频存在问题,这种情况就需要人工处理,在页面上会提示失败的信息,人工可手动执行该视频进行处理,或通过其它转码工具进行视频转码,转码后直接上传mp4视频。

7.9.3 分块文件清理问题

上传一个文件进行分块上传,上传一半不传了,之前上传到minio的分块文件要清理吗?怎么做的?

1、在数据库中有一张文件表记录minio中存储的文件信息。

2、文件开始上传时会写入文件表,状态为上传中,上传完成会更新状态为上传完成。(实现过程中并没有往文件表中插入正在上传的文件记录,只是在media_minio_files临时保存了分块信息)

3、当一个文件传了一半不再上传了说明该文件没有上传完成,会有定时任务去查询文件表中的记录,如果文件未上传完成则删除minio中没有上传成功的文件目录。

实现思路

难点:怎么判断文件上传了一半不再上传了?上传分块文件只在上传视频时有用,上传分块文件时不会往media_files表里添加记录,只有所有分块上传完成,并合并成功后,才会往media_files里插入数据。所以要建立一个media_minio_files表,每上传一个分块就往media_minio_files表里插入一条分块信息,记录包括分块的上传时间。如果超过30分钟分块记录还没有被删除,说明上传到一半不传了,把minio里的分块目录删除,并删除对应的数据库里的记录。在文件合并成功后,数据库里的分块文件上传记录也要删除。

media_minio_files表结构

在这里插入图片描述

具体实现流程
  1. 上传文件块的时候,上传一个文件块完毕要把这个块的信息保存到表里。

    /**
     * 把文件分块信息入库
     * @param fileMd5 文件md5
     * @param fileName 文件名称
     * @param bucket minio桶
     * @param objectName minio中块的存储路径
     * @param chunkSize 块大小
     * @return MediaMinioFiles
     */
    @Transactional
    @Override
    public MediaMinioFiles addMediaChunkToDb(String fileMd5, String fileName, String bucket, String objectName, Long chunkSize) {
        MediaMinioFiles mediaMinioFile = new MediaMinioFiles();
        mediaMinioFile.setFileId(fileMd5);
        mediaMinioFile.setFilename(fileName);
        mediaMinioFile.setBucket(bucket);
        mediaMinioFile.setFilePath(objectName);
        mediaMinioFile.setFileSize(chunkSize);
        mediaMinioFile.setStatus("4");  // 上传中
        int insert = mediaMinioFilesMapper.insert(mediaMinioFile);
        if (insert < 0) {
            log.error("保存分块文件信息到数据库失败,{}", mediaMinioFile.toString());
            XueChengPlusException.cast("保存分块文件信息失败");
        }
        log.debug("保存分块文件信息到数据库成功,{}", mediaMinioFile.toString());
        return mediaMinioFile;
    }
    
    /**
     * 上传分块
     * @param fileMd5 文件md5
     * @param chunk 分块序号
     * @param localChunkFilePath 分块文件本地路径
     * @return RestResponse
     */
    @Override
    public RestResponse uploadChunk(String fileMd5, int chunk, String localChunkFilePath) {
        // 得到分块文件的目录路径
        String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);
        // 得到分块文件的路径
        String chunkFilePath = chunkFileFolderPath + chunk;
        // mimeType
        String mimeType = getMimeType(null);
        // 将文件存储到 minio
        boolean b = addMediaFilesToMinIO(localChunkFilePath, mimeType, bucketVideoFiles, chunkFilePath);
        // 将分块信息存到数据库中
        currentProxy.addMediaChunkToDb(fileMd5, null, bucketVideoFiles, chunkFilePath, 1024 * 5L);
        if (!b) {
            log.debug("上传分块文件失败: {}", chunkFilePath);
            return RestResponse.validfail(false, "上传分块失败");
        }
        log.debug("上传分块文件成功: {}", chunkFilePath);
        return RestResponse.success(true);
    }
    
  2. 如果没有上传失败,在成功合并块文件之后,要把上面数据库中记录删除。

    /**
     * 删除数据库中的分块文件上传记录
     * @param fileMd5 文件md5
     */
    @Transactional
    public void clearChunkFromDb(String fileMd5) {
        LambdaQueryWrapper<MediaMinioFiles> deleteQueryWrapper = new LambdaQueryWrapper<>();
        deleteQueryWrapper.eq(MediaMinioFiles::getFileId, fileMd5);
        try {
            mediaMinioFilesMapper.delete(deleteQueryWrapper);
            log.debug("删除分块上传记录成功");
        } catch (Exception e) {
            e.printStackTrace();
            log.error("删除分块上传记录失败");
        }
    }
    

    在合并分块的最后几行:

    // 文件入库
    currentProxy.addMediaFilesToDb(companyId, fileMd5, uploadFileParamsDto, bucketVideoFiles, mergeFilePath);
    // 清除文件分块
    clearChunkFiles(chunkFileFolderPath, chunkTotal);
    // 清除数据库中文件分块上传信息
    currentProxy.clearChunkFromDb(fileMd5);
    return RestResponse.success(true);
    
  3. 如果上传文件一半失败,要根据数据库中media_minio_files表中存的块的路径删除所有超时块。

    首先查询所有的超时块,得到上传的超时块列表:

    /**
     * 拿到数据库中所有上传超时的文件分块信息
     * @param time 当前任务执行时间
     * @return List<MediaMinioFiles>
     */
    @Override
    public List<MediaMinioFiles> getChunkTimeoutFiles(LocalDateTime time) {
        List<MediaMinioFiles> mediaMinioFiles = mediaMinioFilesMapper.selectTimeoutChunks(DateUtil.toDateTime(time));
        return mediaMinioFiles;
    }
    

    删除minio中所有超时块,并且删除所有超时记录

    @XxlJob("videoChunkTimeoutJobHandler")
    public void videoChunkTimeoutJobHandler() {
        log.debug(">>>>>>>>>> 开始执行检查上传超时块任务");
        // 拿到所有的超时分块任务
        List<MediaMinioFiles> chunkTimeoutFiles = mediaFileService.getChunkTimeoutFiles(LocalDateTime.now());
        if (chunkTimeoutFiles == null) {  // 没有超时任务
            log.debug("没有超时任务");
            return ;
        }
        // 根据记录中的file_path,删除minio中的所有文件
        chunkTimeoutFiles.forEach(chunk -> {
            String filePath = chunk.getFilePath();
            mediaFileService.clearSingleChunkFile(filePath);
        });
        // 删除数据库中对应的记录
        mediaFileService.clearChunkFromDb(null, LocalDateTime.now());
    }
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/604552.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Layer1 公链竞争破局者:Sui 生态的全面创新之路

随着 Sui 生态逐渐在全球范围内树立起声望&#xff0c;并通过与 Revolut 等前沿金融科技平台合作&#xff0c;推广区块链教育与应用&#xff0c;Sui 生态的未来发展方向已成为业界瞩目的焦点。如今&#xff0c;Sui 的总锁定价值已攀升至 5.93 亿美元&#xff0c;充分展示了其在…

分布式架构的演技进过程

最近看了一篇文章,觉得讲的挺不错,就借机给大家分享一下。 早期应用:早期的应用比较简单,访问人数有限,大部分的开发单机就能完成。 分离模型:在业务发展后,用户数量逐步上升,服务器的性能出现瓶颈;就需要将应用和数据分开存储,避免相互抢占资源。 缓存模式:随着系…

历代著名画家作品赏析-东晋顾恺之

中国历史朝代顺序为&#xff1a;夏朝、商朝、西周、东周、秦朝、西楚、西汉、新朝、玄汉、东汉、三国、曹魏、蜀汉、孙吴、西晋、东晋、十六国、南朝、刘宋、南齐、南梁、南陈、北朝、北魏、东魏、北齐、西魏、北周、隋&#xff0c;唐宋元明清&#xff0c;近代。 一、东晋著名…

现身说法暑期三下乡社会实践团一个好的投稿方法胜似千军万马

作为一名在校大学生,去年夏天我有幸参与了学院组织的暑期大学生三下乡社会实践活动,这段经历不仅让我深入基层,体验了不一样的生活,更是在新闻投稿的实践中,经历了一次从传统到智能的跨越。回忆起那段时光,从最初的邮箱投稿困境,到后来智慧软文发布系统的高效运用,每一步都刻印…

顺序栈的操作

归纳编程学习的感悟&#xff0c; 记录奋斗路上的点滴&#xff0c; 希望能帮到一样刻苦的你&#xff01; 如有不足欢迎指正&#xff01; 共同学习交流&#xff01; &#x1f30e;欢迎各位→点赞 &#x1f44d; 收藏⭐ 留言​&#x1f4dd;既然选择了远方&#xff0c;当不负青春…

什么是DDoS攻击?DDoS攻击的原理是什么?

一、DDoS攻击概念 DDoS攻击又叫“分布式拒绝服务”(Distributed DenialofService)攻击&#xff0c;它是一种通过控制大量计算机、物联网终端或网络僵尸&#xff08;Zombie&#xff09;来向目标网站发送大量请求&#xff0c;从而耗尽其服务器资源&#xff0c;导致正常用户无法访…

WEB基础--JDBC操作数据库

使用JDBC操作数据库 使用JDBC查询数据 五部曲&#xff1a;建立驱动&#xff0c;建立连接&#xff0c;获取SQL语句&#xff0c;执行SQL语句&#xff0c;释放资源 建立驱动 //1.加载驱动Class.forName("com.mysql.cj.jdbc.Driver"); 建立连接 //2.连接数据库 Stri…

【3dmax笔记】026:挤出和壳修改器的使用

文章目录 一、修改器二、挤出三、壳 一、修改器 3ds Max中的修改器是一种强大的工具&#xff0c;用于创建和修改复杂的几何形状。这些修改器可以改变对象的形状、大小、方向和位置&#xff0c;以生成所需的效果。以下是一些常见的3ds Max修改器及其功能&#xff1a; 挤出修改…

Google Earth Engine谷歌地球引擎计算遥感影像在每个8天间隔内的多年平均值

本文介绍在谷歌地球引擎&#xff08;Google Earth Engine&#xff0c;GEE&#xff09;中&#xff0c;求取多年时间中&#xff0c;遥感影像在每1个8天时间间隔内的多年平均值的方法。 本文是谷歌地球引擎&#xff08;Google Earth Engine&#xff0c;GEE&#xff09;系列教学文章…

神经网络案例实战

&#x1f50e;我们通过一个案例详细使用PyTorch实战 &#xff0c;案例背景&#xff1a;你创办了一家手机公司&#xff0c;不知道如何估算手机产品的价格。为了解决这个问题&#xff0c;收集了多家公司的手机销售数据&#xff1a;这些数据维度可以包括RAM、存储容量、屏幕尺寸、…

JavaScript数字分隔符

● 如果现在我们用一个很大的数字&#xff0c;例如2300000000&#xff0c;这样真的不便于我们进行阅读&#xff0c;我们希望用千位分隔符来隔开它&#xff0c;例如230,000,000; ● 下面我们使用_当作分隔符来尝试一下 const diameter 287_266_000_000; console.log(diameter)…

论文分享[cvpr2018]Non-local Neural Networks非局部神经网络

论文 https://arxiv.org/abs/1711.07971 代码https://github.com/facebookresearch/video-nonlocal-net 非局部神经网络 motivation:受计算机视觉中经典的非局部均值方法[4]的启发&#xff0c;非局部操作将位置的响应计算为所有位置的特征的加权和。 非局部均值方法 NLM&#…

Python实现Chiikawa

写在前面 哈&#xff1f;呀哈&#xff01;本期小编给大家素描版Chiikawa&#xff01; 主人公当然是我们可爱的吉伊、小八以及乌萨奇啦~ Chiikawa小小可爱 《Chiikawa》是一部来自日本的超萌治愈系漫画与动画作品&#xff0c;由作者秋田祯信创作。"Chiikawa"这个名字…

【Kolmogorov-Arnold网络 替代多层感知机MLPs】KAN: Kolmogorov-Arnold Networks

KAN: Kolmogorov-Arnold Networks 论文地址 代码地址 知乎上的讨论&#xff08;看一下评论区更正&#xff09; Abstract Inspired by the Kolmogorov-Arnold representation theorem, we propose Kolmogorov-Arnold Networks (KANs) as promising alternatives to Multi-Layer…

支持LLM的Markdown笔记;ComfyUI-HiDiffusion图片生成和对图像进行高质量编辑

✨ 1: ComfyUI-HiDiffusion ComfyUI-HiDiffusion是一个为HiDiffusion技术使用而定制的节点。HiDiffusion技术是专门用于在计算机视觉和图像处理中生成和改进图片质量的先进算法。该技术通常应用于图像的超分辨率、去噪、风格转换等方面。 ComfyUI-HiDiffusion的主要特点包含提…

Julia 语言环境安装与使用

1、Julia 语言环境安装 安装教程&#xff1a;https://www.runoob.com/julia/julia-environment.html Julia 安装包下载地址为&#xff1a;https://julialang.org/downloads/。 安装步骤&#xff1a;注意&#xff08;勾选 Add Julia To PATH 自动将 Julia 添加到环境变量&…

(五)JSP教程——response对象

response对象主要用于动态响应客户端请求&#xff08;request&#xff09;&#xff0c;然后将JSP处理后的结果返回给客户端浏览器。JSP容器根据客户端的请求建立一个默认的response对象&#xff0c;然后使用response对象动态地创建Web页面、改变HTTP标头、返回服务器端地状态码…

C++string续

一.find_first_of与find 相同&#xff1a;都是从string里面找字符&#xff0c;传参格式一样(都可以从某个位置开始找) 不同&#xff1a;find_first_of只能找字符&#xff0c;find可以找字符串 find_first_of参数里面的string与char*是每个字符的集合&#xff0c;指找出string…

ETL工具中JSON格式的转换方式

JSON的用处 JSON&#xff08;JavaScript Object Notation&#xff09;是一种轻量级的数据交换格式&#xff0c;其设计初衷是为了提升网络应用中数据的传输效率及简化数据结构的解析过程。自其诞生以来&#xff0c;JSON 已成为Web开发乃至众多软件开发领域中不可或缺的一部分&a…

【大模型认识】警惕AI幻觉,利用插件+微调来增强GPT模型

文章目录 一. 大模型的局限1. 大模型不会计算2. 甚至明目张胆的欺骗 二. 使用插件和微调来增强GPT模型1. 模型的局限性2. 插件来增强大模型的能力3. 微调技术-提高特定任务的准确性 一. 大模型的局限 1. 大模型不会计算 LLM根据给定的输入提示词逐个预测下一个词&#xff08;…