Spaces:
Running
Running
#!/usr/bin/env python3 | |
# | |
# Copyright (c) 2023 Xiaomi Corporation | |
""" | |
This file demonstrates how to use sherpa-onnx Python APIs to generate | |
subtitles. | |
Supported file formats are those supported by ffmpeg; for instance, | |
*.mov, *.mp4, *.wav, etc. | |
Note that you need a non-streaming model for this script. | |
Please visit | |
https://github.com/k2-fsa/sherpa-onnx/releases/download/asr-models/silero_vad.onnx | |
to download silero_vad.onnx | |
For instance, | |
wget https://github.com/k2-fsa/sherpa-onnx/releases/download/asr-models/silero_vad.onnx | |
(1) For paraformer | |
./python-api-examples/generate-subtitles.py \ | |
--silero-vad-model=/path/to/silero_vad.onnx \ | |
--tokens=/path/to/tokens.txt \ | |
--paraformer=/path/to/paraformer.onnx \ | |
--num-threads=2 \ | |
--decoding-method=greedy_search \ | |
--debug=false \ | |
--sample-rate=16000 \ | |
--feature-dim=80 \ | |
/path/to/test.mp4 | |
(2) For transducer models from icefall | |
./python-api-examples/generate-subtitles.py \ | |
--silero-vad-model=/path/to/silero_vad.onnx \ | |
--tokens=/path/to/tokens.txt \ | |
--encoder=/path/to/encoder.onnx \ | |
--decoder=/path/to/decoder.onnx \ | |
--joiner=/path/to/joiner.onnx \ | |
--num-threads=2 \ | |
--decoding-method=greedy_search \ | |
--debug=false \ | |
--sample-rate=16000 \ | |
--feature-dim=80 \ | |
/path/to/test.mp4 | |
(3) For Moonshine models | |
./python-api-examples/generate-subtitles.py \ | |
--silero-vad-model=/path/to/silero_vad.onnx \ | |
--moonshine-preprocessor=./sherpa-onnx-moonshine-tiny-en-int8/preprocess.onnx \ | |
--moonshine-encoder=./sherpa-onnx-moonshine-tiny-en-int8/encode.int8.onnx \ | |
--moonshine-uncached-decoder=./sherpa-onnx-moonshine-tiny-en-int8/uncached_decode.int8.onnx \ | |
--moonshine-cached-decoder=./sherpa-onnx-moonshine-tiny-en-int8/cached_decode.int8.onnx \ | |
--tokens=./sherpa-onnx-moonshine-tiny-en-int8/tokens.txt \ | |
--num-threads=2 \ | |
/path/to/test.mp4 | |
(4) For Whisper models | |
./python-api-examples/generate-subtitles.py \ | |
--silero-vad-model=/path/to/silero_vad.onnx \ | |
--whisper-encoder=./sherpa-onnx-whisper-base.en/base.en-encoder.int8.onnx \ | |
--whisper-decoder=./sherpa-onnx-whisper-base.en/base.en-decoder.int8.onnx \ | |
--tokens=./sherpa-onnx-whisper-base.en/base.en-tokens.txt \ | |
--whisper-task=transcribe \ | |
--num-threads=2 \ | |
/path/to/test.mp4 | |
(5) For SenseVoice CTC models | |
./python-api-examples/generate-subtitles.py \ | |
--silero-vad-model=/path/to/silero_vad.onnx \ | |
--sense-voice=./sherpa-onnx-sense-voice-zh-en-ja-ko-yue-2024-07-17/model.onnx \ | |
--tokens=./sherpa-onnx-sense-voice-zh-en-ja-ko-yue-2024-07-17/tokens.txt \ | |
--num-threads=2 \ | |
/path/to/test.mp4 | |
(6) For WeNet CTC models | |
./python-api-examples/generate-subtitles.py \ | |
--silero-vad-model=/path/to/silero_vad.onnx \ | |
--wenet-ctc=./sherpa-onnx-zh-wenet-wenetspeech/model.onnx \ | |
--tokens=./sherpa-onnx-zh-wenet-wenetspeech/tokens.txt \ | |
--num-threads=2 \ | |
/path/to/test.mp4 | |
Please refer to | |
https://k2-fsa.github.io/sherpa/onnx/index.html | |
to install sherpa-onnx and to download non-streaming pre-trained models | |
used in this file. | |
""" | |
import argparse | |
import datetime as dt | |
import shutil | |
import subprocess | |
import sys | |
from dataclasses import dataclass | |
from datetime import timedelta | |
from pathlib import Path | |
import numpy as np | |
import sherpa_onnx | |
def get_args(): | |
parser = argparse.ArgumentParser( | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter | |
) | |
parser.add_argument( | |
"--silero-vad-model", | |
type=str, | |
required=True, | |
help="Path to silero_vad.onnx", | |
) | |
parser.add_argument( | |
"--tokens", | |
type=str, | |
help="Path to tokens.txt", | |
) | |
parser.add_argument( | |
"--encoder", | |
default="", | |
type=str, | |
help="Path to the transducer encoder model", | |
) | |
parser.add_argument( | |
"--decoder", | |
default="", | |
type=str, | |
help="Path to the transducer decoder model", | |
) | |
parser.add_argument( | |
"--joiner", | |
default="", | |
type=str, | |
help="Path to the transducer joiner model", | |
) | |
parser.add_argument( | |
"--paraformer", | |
default="", | |
type=str, | |
help="Path to the model.onnx from Paraformer", | |
) | |
parser.add_argument( | |
"--sense-voice", | |
default="", | |
type=str, | |
help="Path to the model.onnx from SenseVoice", | |
) | |
parser.add_argument( | |
"--wenet-ctc", | |
default="", | |
type=str, | |
help="Path to the CTC model.onnx from WeNet", | |
) | |
parser.add_argument( | |
"--num-threads", | |
type=int, | |
default=2, | |
help="Number of threads for neural network computation", | |
) | |
parser.add_argument( | |
"--whisper-encoder", | |
default="", | |
type=str, | |
help="Path to whisper encoder model", | |
) | |
parser.add_argument( | |
"--whisper-decoder", | |
default="", | |
type=str, | |
help="Path to whisper decoder model", | |
) | |
parser.add_argument( | |
"--whisper-language", | |
default="", | |
type=str, | |
help="""It specifies the spoken language in the input file. | |
Example values: en, fr, de, zh, jp. | |
Available languages for multilingual models can be found at | |
https://github.com/openai/whisper/blob/main/whisper/tokenizer.py#L10 | |
If not specified, we infer the language from the input audio file. | |
""", | |
) | |
parser.add_argument( | |
"--whisper-task", | |
default="transcribe", | |
choices=["transcribe", "translate"], | |
type=str, | |
help="""For multilingual models, if you specify translate, the output | |
will be in English. | |
""", | |
) | |
parser.add_argument( | |
"--whisper-tail-paddings", | |
default=-1, | |
type=int, | |
help="""Number of tail padding frames. | |
We have removed the 30-second constraint from whisper, so you need to | |
choose the amount of tail padding frames by yourself. | |
Use -1 to use a default value for tail padding. | |
""", | |
) | |
parser.add_argument( | |
"--moonshine-preprocessor", | |
default="", | |
type=str, | |
help="Path to moonshine preprocessor model", | |
) | |
parser.add_argument( | |
"--moonshine-encoder", | |
default="", | |
type=str, | |
help="Path to moonshine encoder model", | |
) | |
parser.add_argument( | |
"--moonshine-uncached-decoder", | |
default="", | |
type=str, | |
help="Path to moonshine uncached decoder model", | |
) | |
parser.add_argument( | |
"--moonshine-cached-decoder", | |
default="", | |
type=str, | |
help="Path to moonshine cached decoder model", | |
) | |
parser.add_argument( | |
"--decoding-method", | |
type=str, | |
default="greedy_search", | |
help="""Valid values are greedy_search and modified_beam_search. | |
modified_beam_search is valid only for transducer models. | |
""", | |
) | |
parser.add_argument( | |
"--debug", | |
type=bool, | |
default=False, | |
help="True to show debug messages when loading modes.", | |
) | |
parser.add_argument( | |
"--sample-rate", | |
type=int, | |
default=16000, | |
help="""Sample rate of the feature extractor. Must match the one | |
expected by the model. Note: The input sound files can have a | |
different sample rate from this argument.""", | |
) | |
parser.add_argument( | |
"--feature-dim", | |
type=int, | |
default=80, | |
help="Feature dimension. Must match the one expected by the model", | |
) | |
parser.add_argument( | |
"sound_file", | |
type=str, | |
help="The input sound file to generate subtitles ", | |
) | |
return parser.parse_args() | |
def assert_file_exists(filename: str): | |
assert Path(filename).is_file(), ( | |
f"{filename} does not exist!\n" | |
"Please refer to " | |
"https://k2-fsa.github.io/sherpa/onnx/pretrained_models/index.html to download it" | |
) | |
def create_recognizer(args) -> sherpa_onnx.OfflineRecognizer: | |
if args.encoder: | |
assert len(args.paraformer) == 0, args.paraformer | |
assert len(args.sense_voice) == 0, args.sense_voice | |
assert len(args.wenet_ctc) == 0, args.wenet_ctc | |
assert len(args.whisper_encoder) == 0, args.whisper_encoder | |
assert len(args.whisper_decoder) == 0, args.whisper_decoder | |
assert len(args.moonshine_preprocessor) == 0, args.moonshine_preprocessor | |
assert len(args.moonshine_encoder) == 0, args.moonshine_encoder | |
assert ( | |
len(args.moonshine_uncached_decoder) == 0 | |
), args.moonshine_uncached_decoder | |
assert len(args.moonshine_cached_decoder) == 0, args.moonshine_cached_decoder | |
assert_file_exists(args.encoder) | |
assert_file_exists(args.decoder) | |
assert_file_exists(args.joiner) | |
recognizer = sherpa_onnx.OfflineRecognizer.from_transducer( | |
encoder=args.encoder, | |
decoder=args.decoder, | |
joiner=args.joiner, | |
tokens=args.tokens, | |
num_threads=args.num_threads, | |
sample_rate=args.sample_rate, | |
feature_dim=args.feature_dim, | |
decoding_method=args.decoding_method, | |
debug=args.debug, | |
) | |
elif args.paraformer: | |
assert len(args.sense_voice) == 0, args.sense_voice | |
assert len(args.wenet_ctc) == 0, args.wenet_ctc | |
assert len(args.whisper_encoder) == 0, args.whisper_encoder | |
assert len(args.whisper_decoder) == 0, args.whisper_decoder | |
assert len(args.moonshine_preprocessor) == 0, args.moonshine_preprocessor | |
assert len(args.moonshine_encoder) == 0, args.moonshine_encoder | |
assert ( | |
len(args.moonshine_uncached_decoder) == 0 | |
), args.moonshine_uncached_decoder | |
assert len(args.moonshine_cached_decoder) == 0, args.moonshine_cached_decoder | |
assert_file_exists(args.paraformer) | |
recognizer = sherpa_onnx.OfflineRecognizer.from_paraformer( | |
paraformer=args.paraformer, | |
tokens=args.tokens, | |
num_threads=args.num_threads, | |
sample_rate=args.sample_rate, | |
feature_dim=args.feature_dim, | |
decoding_method=args.decoding_method, | |
debug=args.debug, | |
) | |
elif args.sense_voice: | |
assert len(args.wenet_ctc) == 0, args.wenet_ctc | |
assert len(args.whisper_encoder) == 0, args.whisper_encoder | |
assert len(args.whisper_decoder) == 0, args.whisper_decoder | |
assert len(args.moonshine_preprocessor) == 0, args.moonshine_preprocessor | |
assert len(args.moonshine_encoder) == 0, args.moonshine_encoder | |
assert ( | |
len(args.moonshine_uncached_decoder) == 0 | |
), args.moonshine_uncached_decoder | |
assert len(args.moonshine_cached_decoder) == 0, args.moonshine_cached_decoder | |
assert_file_exists(args.sense_voice) | |
recognizer = sherpa_onnx.OfflineRecognizer.from_sense_voice( | |
model=args.sense_voice, | |
tokens=args.tokens, | |
num_threads=args.num_threads, | |
use_itn=True, | |
debug=args.debug, | |
) | |
elif args.wenet_ctc: | |
assert len(args.whisper_encoder) == 0, args.whisper_encoder | |
assert len(args.whisper_decoder) == 0, args.whisper_decoder | |
assert len(args.moonshine_preprocessor) == 0, args.moonshine_preprocessor | |
assert len(args.moonshine_encoder) == 0, args.moonshine_encoder | |
assert ( | |
len(args.moonshine_uncached_decoder) == 0 | |
), args.moonshine_uncached_decoder | |
assert len(args.moonshine_cached_decoder) == 0, args.moonshine_cached_decoder | |
assert_file_exists(args.wenet_ctc) | |
recognizer = sherpa_onnx.OfflineRecognizer.from_wenet_ctc( | |
model=args.wenet_ctc, | |
tokens=args.tokens, | |
num_threads=args.num_threads, | |
sample_rate=args.sample_rate, | |
feature_dim=args.feature_dim, | |
decoding_method=args.decoding_method, | |
debug=args.debug, | |
) | |
elif args.whisper_encoder: | |
assert_file_exists(args.whisper_encoder) | |
assert_file_exists(args.whisper_decoder) | |
assert len(args.moonshine_preprocessor) == 0, args.moonshine_preprocessor | |
assert len(args.moonshine_encoder) == 0, args.moonshine_encoder | |
assert ( | |
len(args.moonshine_uncached_decoder) == 0 | |
), args.moonshine_uncached_decoder | |
assert len(args.moonshine_cached_decoder) == 0, args.moonshine_cached_decoder | |
recognizer = sherpa_onnx.OfflineRecognizer.from_whisper( | |
encoder=args.whisper_encoder, | |
decoder=args.whisper_decoder, | |
tokens=args.tokens, | |
num_threads=args.num_threads, | |
decoding_method=args.decoding_method, | |
debug=args.debug, | |
language=args.whisper_language, | |
task=args.whisper_task, | |
tail_paddings=args.whisper_tail_paddings, | |
) | |
elif args.moonshine_preprocessor: | |
assert_file_exists(args.moonshine_preprocessor) | |
assert_file_exists(args.moonshine_encoder) | |
assert_file_exists(args.moonshine_uncached_decoder) | |
assert_file_exists(args.moonshine_cached_decoder) | |
recognizer = sherpa_onnx.OfflineRecognizer.from_moonshine( | |
preprocessor=args.moonshine_preprocessor, | |
encoder=args.moonshine_encoder, | |
uncached_decoder=args.moonshine_uncached_decoder, | |
cached_decoder=args.moonshine_cached_decoder, | |
tokens=args.tokens, | |
num_threads=args.num_threads, | |
decoding_method=args.decoding_method, | |
debug=args.debug, | |
) | |
else: | |
raise ValueError("Please specify at least one model") | |
return recognizer | |
class Segment: | |
start: float | |
duration: float | |
text: str = "" | |
def end(self): | |
return self.start + self.duration | |
def __str__(self): | |
s = f"{timedelta(seconds=self.start)}"[:-3] | |
s += " --> " | |
s += f"{timedelta(seconds=self.end)}"[:-3] | |
s = s.replace(".", ",") | |
s += "\n" | |
s += self.text | |
return s | |
def main(): | |
args = get_args() | |
assert_file_exists(args.tokens) | |
assert_file_exists(args.silero_vad_model) | |
assert args.num_threads > 0, args.num_threads | |
if not Path(args.sound_file).is_file(): | |
raise ValueError(f"{args.sound_file} does not exist") | |
assert ( | |
args.sample_rate == 16000 | |
), f"Only sample rate 16000 is supported.Given: {args.sample_rate}" | |
recognizer = create_recognizer(args) | |
ffmpeg_cmd = [ | |
"ffmpeg", | |
"-i", | |
args.sound_file, | |
"-f", | |
"s16le", | |
"-acodec", | |
"pcm_s16le", | |
"-ac", | |
"1", | |
"-ar", | |
str(args.sample_rate), | |
"-", | |
] | |
process = subprocess.Popen( | |
ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL | |
) | |
frames_per_read = int(args.sample_rate * 100) # 100 second | |
stream = recognizer.create_stream() | |
config = sherpa_onnx.VadModelConfig() | |
config.silero_vad.model = args.silero_vad_model | |
config.silero_vad.threshold = 0.5 | |
config.silero_vad.min_silence_duration = 0.25 # seconds | |
config.silero_vad.min_speech_duration = 0.25 # seconds | |
# If the current segment is larger than this value, then it increases | |
# the threshold to 0.9 internally. After detecting this segment, | |
# it resets the threshold to its original value. | |
config.silero_vad.max_speech_duration = 5 # seconds | |
config.sample_rate = args.sample_rate | |
window_size = config.silero_vad.window_size | |
buffer = [] | |
vad = sherpa_onnx.VoiceActivityDetector(config, buffer_size_in_seconds=100) | |
segment_list = [] | |
print("Started!") | |
start_t = dt.datetime.now() | |
num_processed_samples = 0 | |
is_eof = False | |
# TODO(fangjun): Support multithreads | |
while not is_eof: | |
# *2 because int16_t has two bytes | |
data = process.stdout.read(frames_per_read * 2) | |
if not data: | |
vad.flush() | |
is_eof = True | |
else: | |
samples = np.frombuffer(data, dtype=np.int16) | |
samples = samples.astype(np.float32) / 32768 | |
num_processed_samples += samples.shape[0] | |
buffer = np.concatenate([buffer, samples]) | |
while len(buffer) > window_size: | |
vad.accept_waveform(buffer[:window_size]) | |
buffer = buffer[window_size:] | |
streams = [] | |
segments = [] | |
while not vad.empty(): | |
segment = Segment( | |
start=vad.front.start / args.sample_rate, | |
duration=len(vad.front.samples) / args.sample_rate, | |
) | |
segments.append(segment) | |
stream = recognizer.create_stream() | |
stream.accept_waveform(args.sample_rate, vad.front.samples) | |
streams.append(stream) | |
vad.pop() | |
for s in streams: | |
recognizer.decode_stream(s) | |
for seg, stream in zip(segments, streams): | |
seg.text = stream.result.text | |
segment_list.append(seg) | |
end_t = dt.datetime.now() | |
elapsed_seconds = (end_t - start_t).total_seconds() | |
duration = num_processed_samples / 16000 | |
rtf = elapsed_seconds / duration | |
srt_filename = Path(args.sound_file).with_suffix(".srt") | |
with open(srt_filename, "w", encoding="utf-8") as f: | |
for i, seg in enumerate(segment_list): | |
print(i + 1, file=f) | |
print(seg, file=f) | |
print("", file=f) | |
print(f"Saved to {srt_filename}") | |
print(f"Audio duration:\t{duration:.3f} s") | |
print(f"Elapsed:\t{elapsed_seconds:.3f} s") | |
print(f"RTF = {elapsed_seconds:.3f}/{duration:.3f} = {rtf:.3f}") | |
print("Done!") | |
if __name__ == "__main__": | |
if shutil.which("ffmpeg") is None: | |
sys.exit("Please install ffmpeg first!") | |
main() | |