当前位置:AIGC资讯 > AIGC > 正文

实战whisper第二天:直播语音转字幕(全部代码和详细部署步骤)

直播语音实时转字幕:

基于Whisper的实时直播语音转录或翻译是一项使用OpenAI的Whisper模型实现的技术,它能够实时将直播中的语音内容转录成文本,甚至翻译成另一种语言。这一过程大致分为三个步骤:捕获直播音频流、语音识别(转录)以及翻译(如果需要)。下面详细解释其原理和意义。

原理

捕获直播音频流: 首先,需要从直播源捕获音频流。这通常通过软件工具实现,如ffmpegstreamlink,它们可以接入直播平台(如Twitch、YouTube等)的直播流,并提取音频数据。

语音识别(转录): 捕获到的音频流被送入Whisper模型进行语音识别。Whisper是OpenAI开发的一款强大的语音识别模型,它能够准确地将语音转换成文本。该模型训练于多种语言的大量数据集上,因此具有高度的准确性和多语言识别能力。

翻译(可选): 如果需要将转录的文本翻译成另一种语言,可以进一步使用机器翻译模型(如OpenAI的GPT、Google Translate等)对转录文本进行翻译。

意义

提高可及性: 通过实时转录直播语音,听障人士和不懂直播原语言的观众也能够理解内容,大大提高了直播内容的可及性。

内容归档与搜索: 转录生成的文本可以作为直播内容的归档,便于未来搜索和回顾。相比视频数据,文本更容易被搜索引擎索引,从而提高内容的发现性。

多语言翻译: 实时翻译可以让不同语言的观众理解和享受直播内容,促进跨语言、跨文化的交流。

学习和教育: 对于教育直播,实时转录和翻译能够帮助学生更好地理解教学内容,尤其是对于非母语学习者。

内容审核: 转录文本还可以用于自动内容审核,帮助直播平台监控和管理不适宜的内容。 

一、部署 

下载stream-translator

GitHub - fortypercnt/stream-translator

实战whisper语音识别第一天,部署服务器,可远程访问,实时语音转文字(全部代码和详细部署步骤)-CSDN博客

如果在之前的文章,实战whisper语音识别第一天,部署服务器,配置过环境,可跳过下面安装。

git clone https://github.com/fortypercnt/stream-translator.git
pip install -r requirements.txt 

模型下载: 

large-v3模型:https://huggingface.co/Systran/faster-whisper-large-v3/tree/main
large-v2模型:https://huggingface.co/guillaumekln/faster-whisper-large-v2/tree/main
large-v2模型:https://huggingface.co/guillaumekln/faster-whisper-large-v1/tree/main
medium模型:https://huggingface.co/guillaumekln/faster-whisper-medium/tree/main
small模型:https://huggingface.co/guillaumekln/faster-whisper-small/tree/main
base模型:https://huggingface.co/guillaumekln/faster-whisper-base/tree/main
tiny模型:https://huggingface.co/guillaumekln/faster-whisper-tiny/tree/main

经测试large-v3模型需要10G显存以上。显存不够的可以用小模型。

使用方法:

python translator.py 直播链接

这个translator.py是进行实时翻译,不想翻译可运行下面代码

二、代码

translator1.py:

import argparse
import sys
import signal
from datetime import datetime

import ffmpeg
import numpy as np
import whisper
from whisper.audio import SAMPLE_RATE


class RingBuffer:
    def __init__(self, size):
        self.size = size
        self.data = []
        self.full = False
        self.cur = 0

    def append(self, x):
        if self.size <= 0:
            return
        if self.full:
            self.data[self.cur] = x
            self.cur = (self.cur + 1) % self.size
        else:
            self.data.append(x)
            if len(self.data) == self.size:
                self.full = True

    def get_all(self):
        all_data = []
        for i in range(len(self.data)):
            idx = (i + self.cur) % self.size
            all_data.append(self.data[idx])
        return all_data

    def clear(self):
        self.data = []
        self.full = False
        self.cur = 0


def open_stream(stream, direct_url, preferred_quality):
    if direct_url:
        try:
            process = (
                ffmpeg.input(stream, loglevel="panic")
                .output("pipe:", format="s16le", acodec="pcm_s16le", ac=1, ar=SAMPLE_RATE)
                .run_async(pipe_stdout=True)
            )
        except ffmpeg.Error as e:
            raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e

        return process, None

    import streamlink
    import subprocess
    import threading
    stream_options = streamlink.streams(stream)
    if not stream_options:
        print("No playable streams found on this URL:", stream)
        sys.exit(0)

    option = None
    for quality in [preferred_quality, 'audio_only', 'audio_mp4a', 'audio_opus', 'best']:
        if quality in stream_options:
            option = quality
            break
    if option is None:
        # Fallback
        option = next(iter(stream_options.values()))

    def writer(streamlink_proc, ffmpeg_proc):
        while (not streamlink_proc.poll()) and (not ffmpeg_proc.poll()):
            try:
                chunk = streamlink_proc.stdout.read(1024)
                ffmpeg_proc.stdin.write(chunk)
            except (BrokenPipeError, OSError):
                pass

    cmd = ['streamlink', stream, option, "-O"]
    streamlink_process = subprocess.Popen(cmd, stdout=subprocess.PIPE)

    try:
        ffmpeg_process = (
            ffmpeg.input("pipe:", loglevel="panic")
            .output("pipe:", format="s16le", acodec="pcm_s16le", ac=1, ar=SAMPLE_RATE)
            .run_async(pipe_stdin=True, pipe_stdout=True)
        )
    except ffmpeg.Error as e:
        raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e

    thread = threading.Thread(target=writer, args=(streamlink_process, ffmpeg_process))
    thread.start()
    return ffmpeg_process, streamlink_process


def main(url, model="large-v3", interval=5, preferred_quality="audio_only", direct_url=False, **decode_options):
    print("Loading model...")
    model = whisper.load_model(model)
    
    print("Opening stream...")
    ffmpeg_process, _ = open_stream(url, direct_url, preferred_quality)
    
    def handler(signum, frame):
        ffmpeg_process.kill()
        sys.exit(0)
        
    signal.signal(signal.SIGINT, handler)

    n_bytes = interval * SAMPLE_RATE * 2  # Factor 2 comes from reading the int16 stream as bytes
    audio_buffer = RingBuffer(1)  # No need for a history buffer since we're just doing real-time transcription

    try:
        while True:
            in_bytes = ffmpeg_process.stdout.read(n_bytes)
            if not in_bytes:
                break

            audio = np.frombuffer(in_bytes, np.int16).flatten().astype(np.float32) / 32768.0
            audio_buffer.append(audio)

            result = model.transcribe(np.concatenate(audio_buffer.get_all()), **decode_options)
            print(f'{datetime.now().strftime("%H:%M:%S")} {result["text"]}')

            audio_buffer.clear()  # Clear the buffer after each transcription

    finally:
        ffmpeg_process.kill()

def cli():
    parser = argparse.ArgumentParser(description="Real-time audio transcription from streams.")
    parser.add_argument('URL', type=str, help='Stream website and channel name, e.g. twitch.tv/forsen')
    parser.add_argument('--model', type=str, default='large-v3', help='Whisper model for transcription.')
    parser.add_argument('--interval', type=int, default=5, help='Interval between transcription in seconds.')
    parser.add_argument('--preferred_quality', type=str, default='audio_only', help='Preferred stream quality.')
    parser.add_argument('--direct_url', action='store_true', help='Pass the URL directly to ffmpeg.')

    args = parser.parse_args().__dict__
    url = args.pop("URL")
    main(url, **args)

if __name__ == '__main__':
    cli()
python translator1.py https://www.huya.com/kpl

虎牙kpl的直播,文字转录:

还有繁体字,修改代码,繁体转简体:

pip install opencc-python-reimplemented

 translator2.py:

import argparse
import sys
import signal
from datetime import datetime

import ffmpeg
import numpy as np
import whisper
from whisper.audio import SAMPLE_RATE
import opencc



class RingBuffer:
    def __init__(self, size):
        self.size = size
        self.data = []
        self.full = False
        self.cur = 0

    def append(self, x):
        if self.size <= 0:
            return
        if self.full:
            self.data[self.cur] = x
            self.cur = (self.cur + 1) % self.size
        else:
            self.data.append(x)
            if len(self.data) == self.size:
                self.full = True

    def get_all(self):
        all_data = []
        for i in range(len(self.data)):
            idx = (i + self.cur) % self.size
            all_data.append(self.data[idx])
        return all_data

    def clear(self):
        self.data = []
        self.full = False
        self.cur = 0


def open_stream(stream, direct_url, preferred_quality):
    if direct_url:
        try:
            process = (
                ffmpeg.input(stream, loglevel="panic")
                .output("pipe:", format="s16le", acodec="pcm_s16le", ac=1, ar=SAMPLE_RATE)
                .run_async(pipe_stdout=True)
            )
        except ffmpeg.Error as e:
            raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e

        return process, None

    import streamlink
    import subprocess
    import threading
    stream_options = streamlink.streams(stream)
    if not stream_options:
        print("No playable streams found on this URL:", stream)
        sys.exit(0)

    option = None
    for quality in [preferred_quality, 'audio_only', 'audio_mp4a', 'audio_opus', 'best']:
        if quality in stream_options:
            option = quality
            break
    if option is None:
        # Fallback
        option = next(iter(stream_options.values()))

    def writer(streamlink_proc, ffmpeg_proc):
        while (not streamlink_proc.poll()) and (not ffmpeg_proc.poll()):
            try:
                chunk = streamlink_proc.stdout.read(1024)
                ffmpeg_proc.stdin.write(chunk)
            except (BrokenPipeError, OSError):
                pass

    cmd = ['streamlink', stream, option, "-O"]
    streamlink_process = subprocess.Popen(cmd, stdout=subprocess.PIPE)

    try:
        ffmpeg_process = (
            ffmpeg.input("pipe:", loglevel="panic")
            .output("pipe:", format="s16le", acodec="pcm_s16le", ac=1, ar=SAMPLE_RATE)
            .run_async(pipe_stdin=True, pipe_stdout=True)
        )
    except ffmpeg.Error as e:
        raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e

    thread = threading.Thread(target=writer, args=(streamlink_process, ffmpeg_process))
    thread.start()
    return ffmpeg_process, streamlink_process


def main(url, model="large-v3", interval=5, preferred_quality="audio_only", direct_url=False, **decode_options):
    print("Loading model...")
    model = whisper.load_model(model)
    
    print("Opening stream...")
    ffmpeg_process, _ = open_stream(url, direct_url, preferred_quality)
    
    converter = opencc.OpenCC('t2s')  # 创建繁体转简体的转换器
    
    def handler(signum, frame):
        ffmpeg_process.kill()
        sys.exit(0)
        
    signal.signal(signal.SIGINT, handler)

    n_bytes = interval * SAMPLE_RATE * 2  # Factor 2 comes from reading the int16 stream as bytes
    audio_buffer = RingBuffer(1)

    try:
        while True:
            in_bytes = ffmpeg_process.stdout.read(n_bytes)
            if not in_bytes:
                break

            audio = np.frombuffer(in_bytes, np.int16).flatten().astype(np.float32) / 32768.0
            audio_buffer.append(audio)

            result = model.transcribe(np.concatenate(audio_buffer.get_all()), **decode_options)
            result_text = converter.convert(result["text"])  # 将繁体转换为简体
            print(f'{datetime.now().strftime("%H:%M:%S")} {result_text}')

            audio_buffer.clear()

    finally:
        ffmpeg_process.kill()


def cli():
    parser = argparse.ArgumentParser(description="Real-time audio transcription from streams.")
    parser.add_argument('URL', type=str, help='Stream website and channel name, e.g. twitch.tv/forsen')
    parser.add_argument('--model', type=str, default='large-v3', help='Whisper model for transcription.')
    parser.add_argument('--interval', type=int, default=5, help='Interval between transcription in seconds.')
    parser.add_argument('--preferred_quality', type=str, default='audio_only', help='Preferred stream quality.')
    parser.add_argument('--direct_url', action='store_true', help='Pass the URL directly to ffmpeg.')

    args = parser.parse_args().__dict__
    url = args.pop("URL")
    main(url, **args)

if __name__ == '__main__':
    cli()
python translator2.py https://www.huya.com/kpl

更新时间 2024-05-25