(aiohttp-asyncio-FFmpeg-Docker-SRS)实现异步摄像头转码服务器

1. 背景介绍

在先前的博客文章中,我们已经搭建了一个基于SRS的流媒体服务器。现在,我们希望通过Web接口来控制这个服务器的行为,特别是对于正在进行的 RTSP 转码任务的管理。这将使我们能够在不停止整个服务器的情况下,动态地启动或停止摄像头的转码过程。

Docker部署 SRS rtmp/flv流媒体服务器-CSDN博客文章浏览阅读360次,点赞7次,收藏5次。SRS(Simple Realtime Server)是一款开源的流媒体服务器,具有高性能、高可靠性、高灵活性的特点,能够支持直播、点播、转码等多种流媒体应用场景。SRS 不仅提供了流媒体服务器,还提供了适用于多种平台的客户端 SDK 和在线转码等辅助服务,是一款十分强大的流媒体解决方案。https://blog.csdn.net/m0_56659620/article/details/135400510?spm=1001.2014.3001.5501

2. 技术选择

在选择技术方案时,考虑到构建视频流转码服务的需求,我们将采用Python编程语言,并结合asyncio和aiohttp库。这一选择基于异步框架的优势,以下是对异步框架和同步框架在视频流转码场景中的优缺点的明确总结:

异步框架的优势:

  • 高并发处理: 异步框架通过非阻塞方式处理请求,能够高效处理大量并发请求,确保系统在高负载下保持稳定性。
  • 异步I/O: 支持异步I/O操作,允许在等待I/O操作完成的同时继续处理其他请求,提高整体效率。
  • 资源利用率高: 能够更有效地利用系统资源,同时处理多个请求,提高视频转码效率。
  • 事件驱动: 采用事件驱动模型,适应实时性要求高的视频流处理,能够立即响应新的转码请求。

同步框架的缺点:

  • 阻塞: 阻塞调用可能导致整个程序停滞,尤其在处理大文件或网络请求时可能引发性能问题,特别是在高并发场景下。
  • 低并发: 每个请求需要独立的线程或进程,可能导致系统资源耗尽,降低并发处理能力,对于需要同时处理多个视频流的情况可能不够高效。

考虑到处理大量并发请求、提高系统性能和响应性的需求,采用异步框架是更为合适的选择。异步框架的高并发处理能力、异步I/O支持、高资源利用率以及事件驱动的特性使其更适用于实时性要求较高的视频流转码服务。

3. 代码实现(必须在linux系统运行,4步骤为部署攻略)

3.1 导入必要的库

首先,我们导入所需的库,包括asyncio、aiohttp、aiohttp_cors和logging。

import asyncio
from aiohttp import web
import aiohttp_cors
import logging

3.2 设置日志

logging.basicConfig(level=logging.INFO)

3.3 配置并发控制和任务跟踪

设置最大同时运行的ffmpeg子进程数量,并使用Semaphore限制并发进程数量。同时,使用字典跟踪正在进行的转码任务。

MAX_CONCURRENT_PROCESSES = 5
semaphore = asyncio.Semaphore(MAX_CONCURRENT_PROCESSES)
transcoding_tasks = {}

3.4 定义启动和停止转码任务的方法

定义启动和停止 RTSP 转码任务的方法

# 开始转码方法
async def perform_transcoding(ip, camera_id, rtmp_server):
    # 检查相同RTSP是否已有子进程在处理
    if camera_id in transcoding_tasks:
        return transcoding_tasks[camera_id]

    # 使用Semaphore限制并发进程数量
    async with semaphore:
        # 实际的转码操作,这里需要调用ffmpeg或其他工具
        ffmpeg_command = [
            'ffmpeg',
            '-rtsp_transport', 'tcp',
            '-i', ip,
            '-c:v', 'libx264',
            '-c:a', 'aac',
            '-f', 'flv',
            f'{rtmp_server}/live/livestream{camera_id}'
        ]

        # 创建异步子进程
        process = await asyncio.create_subprocess_exec(*ffmpeg_command)

        # 将任务添加到字典中
        transcoding_tasks[camera_id] = process

        # 等待子进程完成
        await process.communicate()

        # 从字典中移除已完成的任务
        transcoding_tasks.pop(camera_id, None)

# 停止转码方法
async def stop_transcoding(camera_id):
    # 停止转码任务
    if camera_id in transcoding_tasks:
        process = transcoding_tasks[camera_id]
        process.terminate()  # 发送终止信号
        await process.wait()  # 等待进程结束

        # 从字典中移除已停止的任务
        transcoding_tasks.pop(camera_id, None)

3.5 定义Web接口路由

定义Web接口路由,包括启动摄像头转码、停止摄像头转码和停止所有摄像头转码的路由。

# 开始转码任务
async def play_camera(request):
    data = await request.post()

    # 从表单数据中获取摄像头的ID和rtsp流
    camera_id = data.get('id')
    rtsp = data.get('ip')

    # 这里设置你的 RTMP 服务器地址
    rtmp_server = 'rtmp://192.168.14.93:1935'

    # 执行实际的转码操作
    task = await perform_transcoding(rtsp, camera_id, rtmp_server)

    # 返回包含转码后的RTMP URL的JSON响应
    rtmp_url = f'http://192.168.14.93:8080/live/livestream{camera_id}.flv'
    return web.json_response({'message': '转码启动成功', 'flv_data': rtmp_url})


# 停止转码任务
async def stop_camera(request):
    data = await request.post()
    camera_id = data.get('id')

    # 停止指定摄像头的转码任务
    await stop_transcoding(camera_id)

    return web.json_response({'code':200,'message': '转码停止成功'})


# 如果页面进行刷新或者关闭停止全部转码任务
async def stop_all_camera(request):

    # 获取所有正在运行的任务的列表
    tasks = [stop_transcoding(camera_id) for camera_id in transcoding_tasks.keys()]

    # 并发停止所有任务
    await asyncio.gather(*tasks)

    # 清空字典,表示所有任务都已停止
    transcoding_tasks.clear()

    return web.json_response({'code':200,'message': '转码停止成功'})

3.6 创建Web应用和配置CORS

创建Web应用,配置CORS(跨域资源共享)中间件,以确保接口可以被跨域访问。

app = web.Application()

# CORS配置
cors = aiohttp_cors.setup(app, defaults={
    "*": aiohttp_cors.ResourceOptions(
        allow_credentials=True,
        expose_headers="*",
        allow_headers="*",
    )
})

3.7 添加Web接口路由

添加Web接口路由,包括启动摄像头转码、停止摄像头转码和停止所有摄像头转码的路由。

app.router.add_route('POST', '/play_camera', play_camera)  # 开始转码任务路由
app.router.add_route('POST', '/stop_camera', stop_camera)  # 停止转码任务路由
app.router.add_route('POST', '/stop_all_camera', stop_all_camera)  # 停止全部转码任务路由

3.8 添加CORS中间件

添加CORS中间件,确保接口可以被跨域访问。

# 添加 CORS 中间件
for route in list(app.router.routes()):
    cors.add(route)

3.9 运行Web应用

运行Web应用,监听指定的主机和端口。

if __name__ == '__main__':
    web.run_app(app, host='0.0.0.0', port=7000,access_log=logging.getLogger())

 3.10 完整代码

import asyncio
from aiohttp import web
import aiohttp_cors
import logging

# 设置日志级别
logging.basicConfig(level=logging.INFO)

# 最大同时运行的ffmpeg子进程数量
MAX_CONCURRENT_PROCESSES = 5

# 使用Semaphore限制并发进程数量
semaphore = asyncio.Semaphore(MAX_CONCURRENT_PROCESSES)

# 字典用于跟踪正在进行的转码任务
transcoding_tasks = {}

# 开始转码方法
async def perform_transcoding(ip, camera_id, rtmp_server):
    # 检查相同RTSP是否已有子进程在处理
    if camera_id in transcoding_tasks:
        return transcoding_tasks[camera_id]

    # 使用Semaphore限制并发进程数量
    async with semaphore:
        # 实际的转码操作,这里需要调用ffmpeg或其他工具
        ffmpeg_command = [
            'ffmpeg',
            '-rtsp_transport', 'tcp',
            '-i', ip,
            '-c:v', 'libx264',
            '-c:a', 'aac',
            '-f', 'flv',
            f'{rtmp_server}/live/livestream{camera_id}'
        ]

        # 创建异步子进程
        process = await asyncio.create_subprocess_exec(*ffmpeg_command)

        # 将任务添加到字典中
        transcoding_tasks[camera_id] = process

        # 等待子进程完成
        await process.communicate()

        # 从字典中移除已完成的任务
        transcoding_tasks.pop(camera_id, None)

# 停止转码方法
async def stop_transcoding(camera_id):
    # 停止转码任务
    if camera_id in transcoding_tasks:
        process = transcoding_tasks[camera_id]
        process.terminate()  # 发送终止信号
        await process.wait()  # 等待进程结束

        # 从字典中移除已停止的任务
        transcoding_tasks.pop(camera_id, None)


# 开始转码任务
async def play_camera(request):
    data = await request.post()

    # 从表单数据中获取摄像头的ID和rtsp流
    camera_id = data.get('id')
    rtsp = data.get('ip')

    # 这里设置你的 RTMP 服务器地址
    rtmp_server = 'rtmp://192.168.14.93:1935'

    # 执行实际的转码操作
    task = await perform_transcoding(rtsp, camera_id, rtmp_server)

    # 返回包含转码后的RTMP URL的JSON响应
    rtmp_url = f'http://192.168.14.93:8080/live/livestream{camera_id}.flv'
    return web.json_response({'message': '转码启动成功', 'flv_data': rtmp_url})


# 停止转码任务
async def stop_camera(request):
    data = await request.post()
    camera_id = data.get('id')

    # 停止指定摄像头的转码任务
    await stop_transcoding(camera_id)

    return web.json_response({'code':200,'message': '转码停止成功'})


# 如果页面进行刷新或者关闭停止全部转码任务
async def stop_all_camera(request):

    # 获取所有正在运行的任务的列表
    tasks = [stop_transcoding(camera_id) for camera_id in transcoding_tasks.keys()]

    # 并发停止所有任务
    await asyncio.gather(*tasks)

    # 清空字典,表示所有任务都已停止
    transcoding_tasks.clear()

    return web.json_response({'code':200,'message': '转码停止成功'})


app = web.Application()

# CORS配置
cors = aiohttp_cors.setup(app, defaults={
    "*": aiohttp_cors.ResourceOptions(
        allow_credentials=True,
        expose_headers="*",
        allow_headers="*",
    )
})

app.router.add_route('POST', '/play_camera', play_camera)  # 开始转码任务路由
app.router.add_route('POST', '/stop_camera', stop_camera)  # 停止转码任务路由
app.router.add_route('POST', '/stop_all_camera', stop_all_camera)  # 停止全部转码任务路由

# 添加 CORS 中间件
for route in list(app.router.routes()):
    cors.add(route)


if __name__ == '__main__':
    web.run_app(app, host='0.0.0.0', port=7000,access_log=logging.getLogger())

4. 部署(Docker环境)

部署所需Dockerfile文件代码如下

FROM python:3.7-slim

WORKDIR /app

COPY requirements.txt .

RUN apt-get update \
    && apt-get install -y ffmpeg \
    && rm -rf /var/lib/apt/lists/* \
    && pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["python", "async_io_io.py"]

部署所需requirements.txt如下

aiohttp
aiohttp-cors
ffmpeg

根目录进行打包及启动

请求接口实现转码

5. 总结

通过以上的步骤,我们成功构建了一个流媒体服务器控制接口,可以通过Web接口实现对摄像头的 RTSP 转码任务的动态管理。这个接口可以集成到现有的流媒体服务器中,提供更多控制和管理的可能性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/302139.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【leetcode】力扣算法之旋转图像【难度中等】

题目描述 给定一个 n n 的二维矩阵 matrix 表示一个图像。请你将图像顺时针旋转 90 度。 你必须在 原地 旋转图像,这意味着你需要直接修改输入的二维矩阵。请不要 使用另一个矩阵来旋转图像。 用例 输入: matrix [[1,2,3],[4,5,6],[7,8,9]] 输出&…

为什么广西桉木建筑模板被广泛用于中高层建筑施工?

在中高层建筑施工中,选择合适的建筑模板至关重要。广西桉木建筑模板因其独特的性能优势,在市场上占据了重要地位。专业生产厂家如能强优品木业,更是以其优质的桉木模板,成为广西地区的佼佼者。 高强度和稳定性 桉木以其高密度和优…

apk反编译修改教程系列---修改apk包名等信息 让一个应用拥有无限分身 手机电脑同步演示【九】

往期教程: apk反编译修改教程系列-----修改apk应用名称 任意修改名称 签名【一】 apk反编译修改教程系列-----任意修改apk版本号 版本名 防止自动更新【二】 apk反编译修改教程系列-----修改apk中的图片 任意更换apk桌面图片【三】 apk反编译修改教程系列---简单…

80套经典精美网页设计模板html模板打包分享/国内外优秀网页模板/html5网页静态模板

我收集的80套经典网页设计模板html模板,Bootstrap扁平化网站模版,并且无密打包分享。里面还有国内外优秀网页模板,可以直接简单的修改就可以作为自己的主页。内容是大气漂亮的htmlcss网站模板。 不同种类,不同行业、不同风格的网…

在做题中学习(45):最大连续1的个数III

1004. 最大连续1的个数 III - 力扣&#xff08;LeetCode&#xff09; 解法&#xff1a;同向双指针————“滑动窗口” 思路&#xff1a;因为要返回数组中连续的数&#xff0c;就相当于一个子数组&#xff0c;而要的是一个可以翻转 < k个0的子数组使它可以变为全1的子数…

springboot git配置文件自动刷新失败问题排查

http://{ip}:{port}/refresh 说明&#xff1a;springBoot版本是1.5.9&#xff0c;接口路径与2.x&#xff0c;不同 路径区别&#xff1a;/refresh VS /actuator/refresh 用postman调用refresh接口刷新git配置&#xff0c;报错如下&#xff0c;没有权限 在服务本地启动&#…

数字化妆,销量爆灯:美妆个护行业的直播营销新姿势

“ 直播电商走进全域营销驱动增长的时代 ” 文&#xff5c;欣桐&凯丰 编辑 | 靳淇 出品&#xff5c;极新 过去几年&#xff0c;美妆个护是直播电商平台中冲锋最猛的行业之一。李佳琦、薇雅等头部主播的“疯狂带货”下&#xff0c;美妆个护品牌脱颖而出&#xff0c;花…

听GPT 讲Rust源代码--compiler(38)

File: rust/compiler/rustc_parse/src/parser/expr.rs 在Rust的源代码中&#xff0c;rust/compiler/rustc_parse/src/parser/expr.rs这个文件扮演了解析表达式的角色。表达式是Rust中的一种语法结构&#xff0c;用于表示程序中的计算、操作和值。 该文件定义了一个名为ExprPa…

CVE-2023-36025 Windows SmartScreen 安全功能绕过漏洞

CVE-2023-36025是微软于11月补丁日发布的安全更新中修复Windows SmartScreen安全功能绕过漏洞。攻击者可以通过诱导用户单击特制的URL来利用该漏洞&#xff0c;对目标系统进行攻击。成功利用该漏洞的攻击者能够绕过Windows Defender SmartScreen检查及其相关提示。该漏洞的攻击…

视频监控系统EasyCVR平台可视化模式设备列表搜索及八分屏播放模式定制开发

国标GB28181协议EasyCVR安防平台可以提供实时远程视频监控、视频录像、录像回放与存储、告警、语音对讲、云台控制、平台级联、磁盘阵列存储、视频集中存储、云存储等丰富的视频能力&#xff0c;平台支持7*24小时实时高清视频监控&#xff0c;能同时播放多路监控视频流&#xf…

DC-DC升压/降压 隔离电源解决方案PCB和原理图

DC-DC隔离电源模块是一种基于变换原理而设计的模块,可以将一种电压转变为另一种电压,同时实现电气信号的隔离和滤波作用。其工作原理基于电感和电容的原理,一般由输入电路、输出电路、开关电路和控制电路四部分组成。 DC-DC电源模块的众多优点是大家众所周知的&#xff0c;DC-…

高效构建Java应用:Maven入门和进阶(二)

高效构建Java应用&#xff1a;Maven入门和进阶&#xff08;二&#xff09; 二.基于IDEA的Maven的工程创建2.1 梳理Maven工程GAVP属性2.2 Idea构建Maven JavaSE工程2.3 Idea构建Maven JavaEE工程2.4 Maven工程项目结构说明 二.基于IDEA的Maven的工程创建 2.1 梳理Maven工程GAVP…

最实用!2023年7款免费App设计工具,让你的应用界面更上一层楼

即时设计 即时设计是一种高效的在线原型设计工具&#xff0c;支持移动终端、网络终端和网页终端的产品原型设计。无需下载&#xff0c;可通过浏览器操作&#xff0c;支持软件下载&#xff0c;随时随地设计和模拟。学习难度低&#xff0c;强大的材料库和简单的设计界面可以帮助…

STM32 基础知识(探索者开发板)--146讲 IIC

IIC特点&#xff1a; 同步串行半双工通信总线 IIC有一个弱上拉电阻&#xff0c;在主机和从机都没有传输数据下拉时&#xff0c;总线会自动上拉 SCL在低电平期间&#xff0c;改变SDA的值来上传数据&#xff0c;方便SCL电平上升时进行数据读取 SCL在高电平期间&#xff0c;不能…

微信getAccessToken限制问题

微信getAccessToken限制问题 错误代码&#xff1a;45009&#xff0c;错误信息&#xff1a;调用分钟频率受限 https://developers.weixin.qq.com/miniprogram/dev/OpenApiDoc/mp-access-token/getAccessToken.html GET https://api.weixin.qq.com/cgi-bin/token 接口重复访问会…

英诺赛科PCB layout 设计案列分享1----高压单管

Layout设计中的几个关键步骤是布局、走线、铺铜、散热&#xff0c;英诺赛科高压单管GaN的Layout设计也不例外。 反激拓扑是高压单管GaN的典型应用&#xff0c;快充场合常用。该拓扑在地线的处理上都需特别注意&#xff0c;如下图所示&#xff0c;Layout时辅助绕组地、IC信号地功…

前端JS加密对抗由浅入深-2

前言&#xff1a; 本文主要讲解&#xff0c;针对前端非对称、多段加密数据传输站点&#xff0c;如何进行动态调试&#xff0c;如何进行安全测试。本次讲解不涉及任何漏洞方面&#xff0c;仅为学习探讨&#xff0c;该站点现已经更改加密方式&#xff0c;严禁非法测试&#xff0…

Uncaught (in promise) ReferenceError: require is not defined

在 Vue3 中加载项目路径下的资源图片,起初按照之前 vue 的写法 require 但浏览器却抛出了异常 Uncaught (in promise) ReferenceError: require is not defined 因为 require 采用的 webpack 加载方式,而 vue3 中通过 vite 的方式,两者存在差异,所以才产生了刚开始的一目; vu…

python基础教程八(循环1)

1. while循环 为避免多次重复的代码&#xff0c;我们会用到循环 while (condition): 执行语句 while循环的结构非常简单只要条件满足就一直循环直到&#xff0c;条件不满足为止。 例子如下&#xff1a; x1 while x<100:print(x)x1结果就是最简单的输出1-100的数字 while…

MySQL常用连接工具

Navicat 缺点&#xff1a;老版本不支持mysql8数据库连接&#xff0c;而且样式没有新版的好看 优点&#xff1a;新版支持多种数据连接&#xff0c;而且样式比较好看 HeidiSQL 官网&#xff1a;Download HeidiSQL 有一个非常好的优点就是不用输入表名称&#xff1a;双击就会自…