PyQt下载M3U8文件

下载模型

from pathlib import Path
from typing import Any, Union
from pydantic import Field, BaseModel

class DownloadUrlModel(BaseModel):
    title: str = Field(..., description="文件名")
    save_path: Union[Path, str] = Field(..., description="文件路径")
    url: str = Field(..., description="m3u8文件路径")
    isM3u8: bool = Field(True, description="是否为m3u8文件")

M3U8解密

pip install pycryptodome

from Crypto.Cipher import AES
from Crypto.Util.Padding import pad


class DecodeByte:
    # 解密
    @staticmethod
    def do_decode(key, iv, data, method="AES-128") -> bytes:
        if isinstance(key, str):
            key = key.encode('utf-8')
        if isinstance(iv, str):
            iv = iv.encode('utf-8')
        if "AES-128" == method:
            aes = AES.new(key, AES.MODE_CBC, iv)
            if data and (len(data) % 16) != 0:
                data = pad(data, 16)
            return aes.decrypt(data)
        else:
            return None

下载线程

FFMPEG_EXE_PATHffmpeg.exe的绝对路径,若没有在官网上下载
ffmpeg下载M3U8没有进度显示,并且它是同步下载,会比较慢,使用httpx异步比较快

# coding = utf-8
import asyncio
import os
import re
import shutil
from pathlib import Path
from typing import Union

import httpx
from PySide6.QtCore import QThread, Signal

from ..models import DownloadUrlModel
from ..utils import HTTPRequest, DecodeByte
from ..config import FFMPEG_EXE_PATH


class DownloadM3U8Thread(QThread):
    loggerSignal = Signal(str)
    progressSignal = Signal(int)
    stopSignal = Signal(bool)

    def __init__(self, parent=None):
        super().__init__(parent)
        self.__is_stop = False
        self.__model: DownloadUrlModel = None
        self.finished_file: Path = None
        self.failed_file: Path = None
        self.temp_path: Path = None

        self.num = 0
        self.list_length = 0
        self.retry_max = 7
        # 解密信息
        self.cry = {
            "key": "",
            "iv": "",
            "method": "",
        }

        self.started.connect(self.startedSlot)

    def sendRequest(self, url: str, *, method='GET', **kwargs):
        response = httpx.request(method, url, verify=False, headers=HTTPRequest.headers, timeout=HTTPRequest.timeout,
                                 **kwargs)
        response.raise_for_status()
        return response

    async def aiohttp_send(self, client: httpx.AsyncClient, url: str, index: int, *, retry_count=0):
        ts_name = f'{str(index).zfill(6)}.ts'
        try:
            response = await client.get(url)
            self.save_data_file(response.content, ts_name)
            progress = round(self.num / self.list_length * 100, 3)
            self.loggerSignal.emit(f'下载进度: {progress} %')
            self.progressSignal.emit(int(progress))
        except httpx.ConnectError as e:
            await asyncio.sleep(4)
            if retry_count < self.retry_max:
                retry_count += 1
                await self.aiohttp_send(client, url, index, retry_count=retry_count)
            else:
                self.save_failed_file(f'{url}_{ts_name}')

    async def aiohttp_download(self, urls):
        async with httpx.AsyncClient(headers=HTTPRequest.headers, timeout=HTTPRequest.timeout) as client:
            tasks = []
            self.list_length = len(urls)
            for index, url in enumerate(urls, 1):
                if self.__is_stop:
                    self.wait()
                    self.quit()
                    break
                ts_name = f'{str(index).zfill(6)}.ts'
                if ts_name in self.open_finished_file():
                    continue
                task = asyncio.ensure_future(self.aiohttp_send(client, url, index))
                tasks.append(task)
            await asyncio.gather(*tasks)

    def get_full_ts_url(self, url: str, ts_name: str) -> str:
        """
        获取完整的ts文件url
        :param url: 原始url
        :param ts_name: ts文件名
        :return: str
        """
        if ts_name.startswith('http'):
            return ts_name
        tl = ts_name.split('/')
        new_url = []
        # 循环url,去掉ts name中重复的部分
        for s in url.split('/')[:-1]:
            if s in tl:
                tl.remove(s)
            new_url.append(s)
        # 拼接ts name
        new_url.extend(tl)
        result = '/'.join(new_url)
        return result

    def setCryInfo(self, text, url):
        # 获取加密参数
        x_key = re.findall('#EXT-X-KEY:(.*?)\n', text)
        cry_obj = dict()
        if len(x_key) > 0:
            # 提取
            for item in x_key[0].split(','):
                key = item.split('=')[0]
                value = item.replace(key, '')[1:].replace('"', '')
                cry_obj[key] = value
            # format
            if cry_obj.get('URI') and not cry_obj['URI'].startswith('http'):
                cry_obj['URI'] = self.get_full_ts_url(url, cry_obj['URI'])
            elif not cry_obj.get('URI'):
                cry_obj['URI'] = ''
            # 获取key
            res = self.sendRequest(cry_obj['URI'])
            self.cry['key'] = res.content
            # 加密方式
            self.cry['method'] = cry_obj.get('METHOD')
            # iv值
            if cry_obj.get('IV'):
                self.cry['iv'] = cry_obj['IV'][2:18]
        else:
            pass

    def save_data_file(self, data: bytes, ts_name: str):
        # 如果有加密,需要data解密后再存储
        if self.cry.get('key'):
            # 如果源文件有iv就读取,如果没有就用文件名
            iv = self.cry["iv"] if self.cry.get("iv") else ts_name.split('.')[0].zfill(16)
            data = DecodeByte.do_decode(self.cry["key"], iv, data, self.cry["method"])
            if not data:
                raise Exception('解密失败')
        # 保存
        with open(self.temp_path / ts_name, 'wb') as f:
            f.write(data)
            self.save_finished_file(ts_name)
            self.num += 1

    def open_failed_file(self) -> list:
        return self.failed_file.read_text(encoding='utf-8').split('\n')

    def save_failed_file(self, ts_name: str):
        with self.failed_file.open('a', encoding='utf-8') as f:
            f.write(ts_name + '\n')

    def open_finished_file(self) -> list:
        return self.finished_file.read_text(encoding='utf-8').split('\n')

    def save_finished_file(self, ts_name: str):
        with self.finished_file.open('a', encoding='utf-8') as f:
            f.write(ts_name + '\n')

    def save_ffmpeg_file(self, file_list: list):
        # 保存ffmpeg合并文件
        ffmpeg_file = self.temp_path / 'ffmpeg.txt'
        ffmpeg_file.touch(exist_ok=True)
        with ffmpeg_file.open('a+', encoding='utf-8') as f:
            for file in file_list:
                f.write(f"file '{file}'\n")
        return ffmpeg_file

    def combine_ts(self, source_path: Path, dest_file: Union[Path, str]):
        # 获取所有缓存文件
        file_list = [str(file.name) for file in source_path.glob('**/*.ts')]
        if not file_list:
            return
        # 名称排序
        file_list.sort(key=lambda s: s.split('.')[0])
        ffmpeg_txt = self.save_ffmpeg_file(file_list)
        # 文件总数
        length = len(file_list)
        # 开始合并文件
        cmd = f'{FFMPEG_EXE_PATH} -f concat -safe 0 -i  {ffmpeg_txt}  -c  copy {dest_file}'
        os.system(cmd)
        # with open(dest_file, 'ab') as f:
        #     # 循环文件列表
        #     for i, file in enumerate(file_list, 1):
        #         # 读取每个文件
        #         with open(os.path.join(source_path, file), 'rb') as rf:
        #             # 把每个文件的内容 追加到同一个文件
        #             data = rf.read()
        #             f.write(data)
        #         # 打印进度
        #         self.loggerSignal.emit('合并中: {:3.2f}%'.format(i / length * 100))
        # # 移除缓存文件夹
        self.sleep(2)
        try:
            shutil.rmtree(source_path.parent)
        except Exception as e:
            self.loggerSignal.emit(f'删除文件夹错误:{e}')

    def run(self):
        url = self.__model.url
        save_path = Path(self.__model.save_path)
        video_path = save_path / self.__model.title
        self.temp_path = video_path / 'temp'
        self.finished_file = self.temp_path / 'finished.txt'
        self.failed_file = self.temp_path / 'failed.txt'

        self.temp_path.mkdir(parents=True, exist_ok=True)
        self.finished_file.touch(exist_ok=True)
        self.failed_file.touch(exist_ok=True)

        try:
            self.loggerSignal.emit(f"开始加载m3u8文件")
            m3u8_text = self.sendRequest(url).text
            self.setCryInfo(m3u8_text, url)
            new_urls = []
            self.loggerSignal.emit("解析m3u8文件")
            for line in m3u8_text.split('\n'):
                if not line.startswith('#'):
                    new_urls.append(self.get_full_ts_url(url, line))
            self.loggerSignal.emit('开始下载视频...')
            asyncio.run(self.aiohttp_download(new_urls))
            self.loggerSignal.emit('开始合并文件...')
            self.combine_ts(self.temp_path, str(save_path / f'{self.__model.title}.mp4'))
            self.loggerSignal.emit(f"视频下载完成")
        except Exception as e:
            self.loggerSignal.emit(f"视频下载失败")
            return

    def stop(self):
        self.__is_stop = True
        self.stopSignal.emit(True)
        self.loggerSignal.emit('下载已停止')

    def startedSlot(self):
        self.__is_stop = False
        self.stopSignal.emit(False)

    def setDownloadTask(self, model: DownloadUrlModel):
        self.__model = model

combine_ts

使用open方法写的合并函数,会出现格式错误的问题,ffmpeg没有此问题

    def combine_ts(self, source_path: Path, dest_file: Union[Path, str]):
        # 获取所有缓存文件
        pass

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/949879.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

用ResNet50+Qwen2-VL-2B-Instruct+LoRA模仿Diffusion-VLA的论文思路,在3090显卡上训练和测试成功

想一步步的实现Diffusion VLA论文的思路&#xff0c;不过论文的图像的输入用DINOv2进行特征提取的&#xff0c;我先把这个部分换成ResNet50。 老铁们&#xff0c;直接上代码&#xff1a; from PIL import Image import torch import torchvision.models as models from torch…

Spring Boot 项目自定义加解密实现配置文件的加密

在Spring Boot项目中&#xff0c; 可以结合Jasypt 快速实现对配置文件中的部分属性进行加密。 完整的介绍参照&#xff1a; Spring Boot Jasypt 实现application.yml 属性加密的快速示例 但是作为一个技术强迫症&#xff0c;总是想着从底层开始实现属性的加解密&#xff0c;…

A/B实验之置信检验(一):如何避免误判 (I类) 和漏报 (II类)

假设检验的依据&#xff1a;如何避免误判和漏报 A/B实验系列相关文章&#xff08;置顶&#xff09; 1.A/B实验之置信检验&#xff08;一&#xff09;&#xff1a;如何避免误判和漏报 2.A/B实验之置信检验&#xff08;二&#xff09;&#xff1a;置信检验精要 引言 在数据驱动…

每日一题:链表中环的入口结点

文章目录 判断链表环的入口节点描述数据范围&#xff1a;复杂度要求&#xff1a;输入输出 示例代码实现思路解析注意事项&#xff1a; 判断链表环的入口节点 描述 给定一个链表&#xff0c;判断该链表是否存在环。如果存在环&#xff0c;返回环的入口节点&#xff1b;如果不存…

深度学习blog-Meanshift均值漂移算法-最大熵模型

均值漂移&#xff08;Mean Shift&#xff09;是一种无监督的聚类算法&#xff0c;广泛应用于数据挖掘和计算机视觉任务。它通过移动样本点到其近邻的均值位置来寻找数据的高密度区域&#xff0c;最终形成聚类。 均值漂移算法原理 均值漂移算法的核心思想是通过滑动窗口&#…

51c自动驾驶~合集45

我自己的原文哦~ https://blog.51cto.com/whaosoft/13020031 #运动控制和规划控制需要掌握的技术栈~ 各大垃圾家电造车厂又要开始了~~~​ 1、ROS的通信方式 李是Lyapunov的李&#xff1a;谈谈ROS的通信机制 话题通信和服务通信&#xff0c;其中话题通信是通过发布和订阅…

Python基于jieba和wordcloud绘制词云图

【Cesium】自定义材质&#xff0c;添加带有方向的滚动路线 &#x1f356; 前言&#x1f3b6;一、实现过程✨二、代码展示&#x1f3c0;三、运行结果&#x1f3c6;四、知识点提示 &#x1f356; 前言 Python基于jieba和wordcloud绘制词云图 &#x1f3b6;一、实现过程 读取文本…

计算机网络与服务器

目录 架构体系及相关知识 三层架构&#xff1a; 四层架构&#xff1a; 常见的应用的模式&#xff1a; OSI模型 分层 数据链路层 TCP/IP模型 TCP和UDP都是传输层的协议 TCP三次握手、四次次分手 URL&HTTP协议详解 网址URL 结构化 报文行 报文头 空行 报文体…

Cursor实现go项目配置并实现仓库Gin项目运行

✅作者简介&#xff1a;大家好&#xff0c;我是 Meteors., 向往着更加简洁高效的代码写法与编程方式&#xff0c;持续分享Java技术内容。 &#x1f34e;个人主页&#xff1a;Meteors.的博客 &#x1f49e;当前专栏&#xff1a;知识备份 ✨特色专栏&#xff1a;知识分享 &#x…

141.环形链表 142.环形链表II

141.环形链表 & 142.环形链表II 141.环形链表 思路&#xff1a;快慢指针 or 哈希表 快慢指针代码&#xff1a; class Solution { public:bool hasCycle(ListNode *head) {if(headnullptr||head->nextnullptr)return false;ListNode *fasthead->next; //不能设置成…

信用租赁系统助力企业实现免押金租赁新模式

内容概要 在现代商业环境中&#xff0c;信用租赁正在迅速崛起。通过结合大数据与区块链技术&#xff0c;信用租赁系统彻底改变了传统的租赁流程。什么是信用租赁呢&#xff1f;简单说&#xff0c;就是不需要押金&#xff0c;你也能够租到你想要的物品&#xff0c;这对企业和消…

el-select下拉框在弹框里面错位

问题出现 Element Plus 是一个基于 Vue 3 的组件库&#xff0c;el-select 是其中一个用于选择器的组件。在 el-select 组件中&#xff0c;teleported 属性用于控制下拉菜单的渲染位置。 解决方法 teleported 属性「element-plus」 popper-append-to-body属性「element」 ‌…

IO进程day1

一、思维导图

力扣-21-合并两个有序链表

思路&#xff1a; 因为是升序的两个链表&#xff0c;我们可以进行数据域比大小&#xff0c;然后把p3(自己创建的)的指针域指向小的那个 注&#xff1a;一定要先判断两个指针为0的情况

人工智能的发展领域之GPU加速计算的应用概述、架构介绍与教学过程

文章目录 一、架构介绍GPU算力平台概述优势与特点 二、注册与登录账号注册流程GPU服务器类型配置选择指南内存和存储容量网络带宽CPU配置 三、创建实例实例创建步骤镜像选择与设置 四、连接实例SSH连接方法远程桌面配置 一、架构介绍 GPU算力平台概述 一个专注于GPU加速计算的…

QT实现 端口扫描暂停和继续功能 3

上篇QT给端口扫描工程增加线程2-CSDN博客 为按钮pushButton_Stop添加clicked事件&#xff0c;功能为暂停扫描&#xff0c;并在暂停后显示继续按钮&#xff0c;点击继续按钮之后继续扫描 1.更新UI 添加继续按钮 点击转到槽则会自动声明 2. 更新 MainWindow.h 需要新增的部分…

汽车微处理器安全机制以及测试介绍

本文介绍了三类汽车微处理器安全机制&#xff1a;硬件类、软件类和混合类&#xff0c;旨在提高系统的可靠性和安全性。硬件类安全机制包括逻辑内建自测试&#xff08;Logic-BIST&#xff09;、三重模块冗余&#xff08;TMR&#xff09;、内存内建自测试&#xff08;Memory-BIST…

【Azure Redis 缓存】Azure Redis 遇见的连接不上问题和数据丢失的情况解答

问题描述 PHP应用再连接Azure Redis服务时&#xff0c;出现Connection Timed out。当通过升级提高Azure Redis的性能时候&#xff0c;发现之前的数据丢失了。 image.png 问题解答 当Redis服务出现Timeout的情况时&#xff0c;可以从Redis服务的指标(Metrics)开始查看&#xff0…

python学习笔记—15—数据容器之列表

1. 数据容器 列表(list)、元组(tuple)、字符串(str)、集合(set)、字典(dict) 2. 列表 (1) 定义 tmp_list ["super", "carry", "doinb"] print(f"tmp_list {tmp_list}, tmp_list type is {type(tmp_list)}") tmp_list1 ["doi…

记录一次面试中被问到的问题 (HR面)

文章目录 一、你对公司的了解多少二、为什么对这个岗位感兴趣三、不能说的离职原因四、离职原因高情商回复五、你的核心优势是什么六、你认为你比其他面试候选人的优势是什么七、不要提及情感 一、你对公司的了解多少 准备要点&#xff1a; 在面试前&#xff0c;对公司进行充分…