实战whisper第二天:直播语音转字幕(全部代码和详细部署步骤)

直播语音实时转字幕:

基于Whisper的实时直播语音转录或翻译是一项使用OpenAI的Whisper模型实现的技术,它能够实时将直播中的语音内容转录成文本,甚至翻译成另一种语言。这一过程大致分为三个步骤:捕获直播音频流、语音识别(转录)以及翻译(如果需要)。下面详细解释其原理和意义。

原理

  1. 捕获直播音频流: 首先,需要从直播源捕获音频流。这通常通过软件工具实现,如ffmpegstreamlink,它们可以接入直播平台(如Twitch、YouTube等)的直播流,并提取音频数据。

  2. 语音识别(转录): 捕获到的音频流被送入Whisper模型进行语音识别。Whisper是OpenAI开发的一款强大的语音识别模型,它能够准确地将语音转换成文本。该模型训练于多种语言的大量数据集上,因此具有高度的准确性和多语言识别能力。

  3. 翻译(可选): 如果需要将转录的文本翻译成另一种语言,可以进一步使用机器翻译模型(如OpenAI的GPT、Google Translate等)对转录文本进行翻译。

意义

  1. 提高可及性: 通过实时转录直播语音,听障人士和不懂直播原语言的观众也能够理解内容,大大提高了直播内容的可及性。

  2. 内容归档与搜索: 转录生成的文本可以作为直播内容的归档,便于未来搜索和回顾。相比视频数据,文本更容易被搜索引擎索引,从而提高内容的发现性。

  3. 多语言翻译: 实时翻译可以让不同语言的观众理解和享受直播内容,促进跨语言、跨文化的交流。

  4. 学习和教育: 对于教育直播,实时转录和翻译能够帮助学生更好地理解教学内容,尤其是对于非母语学习者。

  5. 内容审核: 转录文本还可以用于自动内容审核,帮助直播平台监控和管理不适宜的内容。 

一、部署 

下载stream-translator

GitHub - fortypercnt/stream-translator

实战whisper语音识别第一天,部署服务器,可远程访问,实时语音转文字(全部代码和详细部署步骤)-CSDN博客

如果在之前的文章,实战whisper语音识别第一天,部署服务器,配置过环境,可跳过下面安装。

git clone https://github.com/fortypercnt/stream-translator.git
pip install -r requirements.txt 

模型下载: 

large-v3模型:https://huggingface.co/Systran/faster-whisper-large-v3/tree/main
large-v2模型:https://huggingface.co/guillaumekln/faster-whisper-large-v2/tree/main
large-v2模型:https://huggingface.co/guillaumekln/faster-whisper-large-v1/tree/main
medium模型:https://huggingface.co/guillaumekln/faster-whisper-medium/tree/main
small模型:https://huggingface.co/guillaumekln/faster-whisper-small/tree/main
base模型:https://huggingface.co/guillaumekln/faster-whisper-base/tree/main
tiny模型:https://huggingface.co/guillaumekln/faster-whisper-tiny/tree/main

经测试large-v3模型需要10G显存以上。显存不够的可以用小模型。

使用方法:

python translator.py 直播链接

这个translator.py是进行实时翻译,不想翻译可运行下面代码

二、代码

translator1.py:

import argparse
import sys
import signal
from datetime import datetime

import ffmpeg
import numpy as np
import whisper
from whisper.audio import SAMPLE_RATE


class RingBuffer:
    def __init__(self, size):
        self.size = size
        self.data = []
        self.full = False
        self.cur = 0

    def append(self, x):
        if self.size <= 0:
            return
        if self.full:
            self.data[self.cur] = x
            self.cur = (self.cur + 1) % self.size
        else:
            self.data.append(x)
            if len(self.data) == self.size:
                self.full = True

    def get_all(self):
        all_data = []
        for i in range(len(self.data)):
            idx = (i + self.cur) % self.size
            all_data.append(self.data[idx])
        return all_data

    def clear(self):
        self.data = []
        self.full = False
        self.cur = 0


def open_stream(stream, direct_url, preferred_quality):
    if direct_url:
        try:
            process = (
                ffmpeg.input(stream, loglevel="panic")
                .output("pipe:", format="s16le", acodec="pcm_s16le", ac=1, ar=SAMPLE_RATE)
                .run_async(pipe_stdout=True)
            )
        except ffmpeg.Error as e:
            raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e

        return process, None

    import streamlink
    import subprocess
    import threading
    stream_options = streamlink.streams(stream)
    if not stream_options:
        print("No playable streams found on this URL:", stream)
        sys.exit(0)

    option = None
    for quality in [preferred_quality, 'audio_only', 'audio_mp4a', 'audio_opus', 'best']:
        if quality in stream_options:
            option = quality
            break
    if option is None:
        # Fallback
        option = next(iter(stream_options.values()))

    def writer(streamlink_proc, ffmpeg_proc):
        while (not streamlink_proc.poll()) and (not ffmpeg_proc.poll()):
            try:
                chunk = streamlink_proc.stdout.read(1024)
                ffmpeg_proc.stdin.write(chunk)
            except (BrokenPipeError, OSError):
                pass

    cmd = ['streamlink', stream, option, "-O"]
    streamlink_process = subprocess.Popen(cmd, stdout=subprocess.PIPE)

    try:
        ffmpeg_process = (
            ffmpeg.input("pipe:", loglevel="panic")
            .output("pipe:", format="s16le", acodec="pcm_s16le", ac=1, ar=SAMPLE_RATE)
            .run_async(pipe_stdin=True, pipe_stdout=True)
        )
    except ffmpeg.Error as e:
        raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e

    thread = threading.Thread(target=writer, args=(streamlink_process, ffmpeg_process))
    thread.start()
    return ffmpeg_process, streamlink_process


def main(url, model="large-v3", interval=5, preferred_quality="audio_only", direct_url=False, **decode_options):
    print("Loading model...")
    model = whisper.load_model(model)
    
    print("Opening stream...")
    ffmpeg_process, _ = open_stream(url, direct_url, preferred_quality)
    
    def handler(signum, frame):
        ffmpeg_process.kill()
        sys.exit(0)
        
    signal.signal(signal.SIGINT, handler)

    n_bytes = interval * SAMPLE_RATE * 2  # Factor 2 comes from reading the int16 stream as bytes
    audio_buffer = RingBuffer(1)  # No need for a history buffer since we're just doing real-time transcription

    try:
        while True:
            in_bytes = ffmpeg_process.stdout.read(n_bytes)
            if not in_bytes:
                break

            audio = np.frombuffer(in_bytes, np.int16).flatten().astype(np.float32) / 32768.0
            audio_buffer.append(audio)

            result = model.transcribe(np.concatenate(audio_buffer.get_all()), **decode_options)
            print(f'{datetime.now().strftime("%H:%M:%S")} {result["text"]}')

            audio_buffer.clear()  # Clear the buffer after each transcription

    finally:
        ffmpeg_process.kill()

def cli():
    parser = argparse.ArgumentParser(description="Real-time audio transcription from streams.")
    parser.add_argument('URL', type=str, help='Stream website and channel name, e.g. twitch.tv/forsen')
    parser.add_argument('--model', type=str, default='large-v3', help='Whisper model for transcription.')
    parser.add_argument('--interval', type=int, default=5, help='Interval between transcription in seconds.')
    parser.add_argument('--preferred_quality', type=str, default='audio_only', help='Preferred stream quality.')
    parser.add_argument('--direct_url', action='store_true', help='Pass the URL directly to ffmpeg.')

    args = parser.parse_args().__dict__
    url = args.pop("URL")
    main(url, **args)

if __name__ == '__main__':
    cli()
python translator1.py https://www.huya.com/kpl

虎牙kpl的直播,文字转录:

还有繁体字,修改代码,繁体转简体:

pip install opencc-python-reimplemented

 translator2.py:

import argparse
import sys
import signal
from datetime import datetime

import ffmpeg
import numpy as np
import whisper
from whisper.audio import SAMPLE_RATE
import opencc



class RingBuffer:
    def __init__(self, size):
        self.size = size
        self.data = []
        self.full = False
        self.cur = 0

    def append(self, x):
        if self.size <= 0:
            return
        if self.full:
            self.data[self.cur] = x
            self.cur = (self.cur + 1) % self.size
        else:
            self.data.append(x)
            if len(self.data) == self.size:
                self.full = True

    def get_all(self):
        all_data = []
        for i in range(len(self.data)):
            idx = (i + self.cur) % self.size
            all_data.append(self.data[idx])
        return all_data

    def clear(self):
        self.data = []
        self.full = False
        self.cur = 0


def open_stream(stream, direct_url, preferred_quality):
    if direct_url:
        try:
            process = (
                ffmpeg.input(stream, loglevel="panic")
                .output("pipe:", format="s16le", acodec="pcm_s16le", ac=1, ar=SAMPLE_RATE)
                .run_async(pipe_stdout=True)
            )
        except ffmpeg.Error as e:
            raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e

        return process, None

    import streamlink
    import subprocess
    import threading
    stream_options = streamlink.streams(stream)
    if not stream_options:
        print("No playable streams found on this URL:", stream)
        sys.exit(0)

    option = None
    for quality in [preferred_quality, 'audio_only', 'audio_mp4a', 'audio_opus', 'best']:
        if quality in stream_options:
            option = quality
            break
    if option is None:
        # Fallback
        option = next(iter(stream_options.values()))

    def writer(streamlink_proc, ffmpeg_proc):
        while (not streamlink_proc.poll()) and (not ffmpeg_proc.poll()):
            try:
                chunk = streamlink_proc.stdout.read(1024)
                ffmpeg_proc.stdin.write(chunk)
            except (BrokenPipeError, OSError):
                pass

    cmd = ['streamlink', stream, option, "-O"]
    streamlink_process = subprocess.Popen(cmd, stdout=subprocess.PIPE)

    try:
        ffmpeg_process = (
            ffmpeg.input("pipe:", loglevel="panic")
            .output("pipe:", format="s16le", acodec="pcm_s16le", ac=1, ar=SAMPLE_RATE)
            .run_async(pipe_stdin=True, pipe_stdout=True)
        )
    except ffmpeg.Error as e:
        raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e

    thread = threading.Thread(target=writer, args=(streamlink_process, ffmpeg_process))
    thread.start()
    return ffmpeg_process, streamlink_process


def main(url, model="large-v3", interval=5, preferred_quality="audio_only", direct_url=False, **decode_options):
    print("Loading model...")
    model = whisper.load_model(model)
    
    print("Opening stream...")
    ffmpeg_process, _ = open_stream(url, direct_url, preferred_quality)
    
    converter = opencc.OpenCC('t2s')  # 创建繁体转简体的转换器
    
    def handler(signum, frame):
        ffmpeg_process.kill()
        sys.exit(0)
        
    signal.signal(signal.SIGINT, handler)

    n_bytes = interval * SAMPLE_RATE * 2  # Factor 2 comes from reading the int16 stream as bytes
    audio_buffer = RingBuffer(1)

    try:
        while True:
            in_bytes = ffmpeg_process.stdout.read(n_bytes)
            if not in_bytes:
                break

            audio = np.frombuffer(in_bytes, np.int16).flatten().astype(np.float32) / 32768.0
            audio_buffer.append(audio)

            result = model.transcribe(np.concatenate(audio_buffer.get_all()), **decode_options)
            result_text = converter.convert(result["text"])  # 将繁体转换为简体
            print(f'{datetime.now().strftime("%H:%M:%S")} {result_text}')

            audio_buffer.clear()

    finally:
        ffmpeg_process.kill()


def cli():
    parser = argparse.ArgumentParser(description="Real-time audio transcription from streams.")
    parser.add_argument('URL', type=str, help='Stream website and channel name, e.g. twitch.tv/forsen')
    parser.add_argument('--model', type=str, default='large-v3', help='Whisper model for transcription.')
    parser.add_argument('--interval', type=int, default=5, help='Interval between transcription in seconds.')
    parser.add_argument('--preferred_quality', type=str, default='audio_only', help='Preferred stream quality.')
    parser.add_argument('--direct_url', action='store_true', help='Pass the URL directly to ffmpeg.')

    args = parser.parse_args().__dict__
    url = args.pop("URL")
    main(url, **args)

if __name__ == '__main__':
    cli()
python translator2.py https://www.huya.com/kpl

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/476560.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【嵌入式——QT】QT Charts

【嵌入式——QT】QT Charts 概述Qt提供的坐标轴类QChartQLineSeriesQValueAxis常见图表及用到的序列类图示代码示例 概述 QT Charts模块是一组易于使用的图表组件&#xff0c;它基于Qt的Graphics View架构&#xff0c;其核心组件是QChartView和QChart&#xff0c;QChartView父…

前端开发经验分享:写页面时总是有预期之外的滚动条怎么办?

问题描述&#xff1a; 在制作一个页面时常常会出现一些预期之外的滚动条&#xff0c;一般有以下原因&#xff1a;1.内容过多&#xff1a;当容器内的内容&#xff08;如文本、图片等&#xff09;的总高度或总宽度超过容器的可视区域时&#xff0c;滚动条就会出现。2.样式设置&a…

一维前缀和一维差分(下篇讲解二维前缀和二维差分)(超详细,python版,其他语言也很轻松能看懂)

本篇博客讲解一维前缀和&#xff0c;一维差分&#xff0c;还会给出一维差分的模板题&#xff0c;下篇博客讲解 二维前缀和&二维差分。 一维前缀和&#xff1a; 接触过算法的小伙伴应该都了解前缀和&#xff0c;前缀和在算法中应用很广&#xff0c;不了解也没有关系&#…

基于SSM的中国旅游网站管理系统+数据库+数据库表结构文档+免费远程调试

项目介绍&#xff1a; Javaee项目&#xff0c;采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构&#xff0c;通过Spring SpringMvc Mybatis JspBootstrap来实现。MySQL数据库作为系统数据储存平台&#xff…

阻止默认行为 e.preventDefault()搭配passive:false才有效

正确情况 如果想阻止默认行为,那么 e.preventDefault()搭配passive:false才是正解 document.addEventListener(touchmove,(e)>{ e.preventDefault() console.log(123,123);},{passive:false}) 如果搭配 passive:false,则会报警告 e.preventDefault()搭配passive:true会报…

php 对接Vungle海外广告平台收益接口Reporting API

今天对接的是Vungle广告reporting api接口&#xff0c;拉取广告收益回来自己做统计。记录分享给大家 首先是文档地址,进入到Vungle后台就能看到文档地址以及参数&#xff1a; 文档地址&#xff1a;https://support.vungle.com/hc/en-us/articles/211365828-Publisher-Reporting…

线程池实现“线程复用”的原理

线程池实现“线程复用”的原理 学习线程复用的原理&#xff0c;以及对线程池的 execute 这个非常重要的方法进行源码解析。 线程复用原理 我们知道线程池会使用固定数量或可变数量的线程来执行任务&#xff0c;但无论是固定数量或可变数量的线程&#xff0c;其线程数量都远远…

3.3网安学习第三阶段第三周回顾(个人学习记录使用)

本周重点 ①渗透测试介绍 ②sqlmap注入扫描工具 ③XSS脚本注入 本周主要内容 ①渗透测试介绍 一、渗透测试 通过模拟黑客对系统进行攻击的手段或技术&#xff0c;在被测系统中发现漏洞的行为。除了提供漏洞之外&#xff0c;还需提供安全意见。 与黑站不同&#xff0c;渗…

Vue响应式原理全解析

前言 大家好&#xff0c;我是程序员蒿里行。浅浅记录一下面试中的高频问题&#xff0c;请你谈一下Vue响应式原理。 必备前置知识&#xff0c;​​Vue2​​官方文档中​​深入响应式原理​​​及​​Vue3​​官方文档中​​深入响应式系统​​。 什么是响应式 响应式本质是当…

RabbitMQ集群部署

集群部署 我们看看如何安装RabbitMQ的集群。 1.集群分类 在RabbitMQ的官方文档中&#xff0c;讲述了两种集群的配置方式&#xff1a; 普通模式&#xff1a;普通模式集群不进行数据同步&#xff0c;每个MQ都有自己的队列、数据信息&#xff08;其它元数据信息如交换机等会同…

基于Android的Appium+Python自动化脚本编写

1.Appium Appium是一个开源测试自动化框架&#xff0c;可用于原生&#xff0c;混合和移动Web应用程序测试&#xff0c; 它使用WebDriver协议驱动iOS&#xff0c;Android和Windows应用程序。 通过Appium&#xff0c;我们可以模拟点击和屏幕的滑动&#xff0c;可以获取元素的id…

动态规划(算法竞赛、蓝桥杯)--单调队列优化DP琪露诺

1、B站视频链接&#xff1a;E46 单调队列优化DP 琪露诺_哔哩哔哩_bilibili 题目链接&#xff1a;琪露诺 - 洛谷

自定义类型--结构体、联合体、枚举类型

**Ladies and gentlemen**&#xff0c;今天&#xff0c;我们将来进行对自定义类型的学习&#xff01; 目录 1.结构的特殊声明2. 结构体内存对齐2.1 对齐规则2.1.12.1.22.1.32.1.4 2.2 为什么存在内存对齐?1. 平台原因 (移植原因)&#xff1a;2. 性能原因&#xff1a; 2.3 修改…

这个简单的生活方式,为你带来满满的幸福感

在今天文章的开头&#xff0c;我想请你思考一个问题&#xff1a;影响幸福感的最大因素是什么&#xff1f; 不妨先想一想&#xff0c;再往下拉&#xff0c;继续阅读。 可能不少朋友的回答&#xff0c;会是财富、事业、理想、生活环境、社会地位…… 这些因素当然对幸福感都非常重…

Python正则表达式之模式修正符,你get了吗?

​大家好&#xff0c;今天我要和大家分享一个Python编程中的神秘武器——正则表达式模式修正符&#xff01;正则表达式&#xff0c;对于很多编程新手来说&#xff0c;可能是一个头疼的问题。但别担心&#xff0c;模式修正符就像是你手中的魔法棒&#xff0c;让你的正则表达式更…

3.21系统栈、数据结构栈、栈的基本操作、队列、队列的基本操作------------》

栈 先进后出、后进先出 一、系统栈 大小&#xff1a;8MB 1、局部变量 2、未经初始化为随机值 3、代码执行到变量定义时为变量开辟空间 4、当变量的作用域结束时回收空间 5、函数的形参和返回值 6、函数的调用关系、保护现场和恢复现场 7、栈的增长方向&#xff0c;自高…

C语言例:设 int x; 则表达式 (x=4*5,x*5),x+25 的值

代码如下&#xff1a; #include<stdio.h> int main(void) {int x,m;m ((x4*5,x*5),x25);printf("(x4*5,x*5),x25 %d\n",m);//x4*520//x*5100//x2545return 0; } 结果如下&#xff1a;

必学干货!使用Python正则表达式匹配多个字符

1.匹配多个字符 今天我们来聊一聊正则表达式中一个很强大的功能&#xff1a;匹配多个字符&#xff01;正则表达式是一个非常强大的工具&#xff0c;可以帮助我们轻松地处理和匹配字符串。通过使用不同的符号和技巧&#xff0c;我们可以匹配多个字符&#xff0c;从而更加灵活地…

【智能算法】海洋捕食者算法(MPA)原理及实现

目录 1.背景2.算法原理2.1算法思想2.2算法过程 3.结果展示4.参考文献 1.背景 2020年&#xff0c;Afshin Faramarzi 等人受到海洋生物适者生存启发&#xff0c;提出了海洋捕食者算法(Marine Predators Algorithm&#xff0c;MPA)。 2.算法原理 2.1算法思想 MPA根据模拟自然界…

WSL2的安装步骤

WSL2&#xff08;Windows Subsystem for Linux 2&#xff09;是微软公司开发的一项创新性技术&#xff0c;它在Windows操作系统上提供了一个完整的Linux内核&#xff0c;并允许用户在Windows环境中运行Linux发行版。之前想在Windows上使用Linux系统必须先安装VirtualBox或VMWar…