Spaces:
Running
on
Zero
Running
on
Zero
from nemo.collections.asr.models import ASRModel | |
import torch | |
import gradio as gr | |
import spaces | |
import gc | |
import shutil | |
from pathlib import Path | |
from pydub import AudioSegment | |
import numpy as np | |
import os | |
import gradio.themes as gr_themes | |
import csv | |
import json | |
from typing import List, Tuple | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
MODEL_NAME="nvidia/parakeet-tdt-0.6b-v2" | |
model = ASRModel.from_pretrained(model_name=MODEL_NAME) | |
model.eval() | |
def start_session(request: gr.Request): | |
session_hash = request.session_hash | |
# プロジェクトディレクトリ内のoutputsフォルダを使用 | |
base_dir = Path(__file__).parent | |
session_dir = base_dir / "outputs" / session_hash | |
session_dir.mkdir(parents=True, exist_ok=True) | |
print(f"Session with hash {session_hash} started in {session_dir}") | |
return session_dir.as_posix() | |
def end_session(request: gr.Request): | |
session_hash = request.session_hash | |
base_dir = Path(__file__).parent | |
session_dir = base_dir / "outputs" / session_hash | |
if session_dir.exists(): | |
print(f"Session directory {session_dir} will be preserved.") | |
# 削除しないように変更 | |
# shutil.rmtree(session_dir) | |
print(f"Session with hash {session_hash} ended.") | |
def get_audio_segment(audio_path, start_second, end_second): | |
if not audio_path or not Path(audio_path).exists(): | |
print(f"Warning: Audio path '{audio_path}' not found or invalid for clipping.") | |
return None | |
try: | |
start_ms = int(start_second * 1000) | |
end_ms = int(end_second * 1000) | |
start_ms = max(0, start_ms) | |
if end_ms <= start_ms: | |
print(f"Warning: End time ({end_second}s) is not after start time ({start_second}s). Adjusting end time.") | |
end_ms = start_ms + 100 | |
audio = AudioSegment.from_file(audio_path) | |
clipped_audio = audio[start_ms:end_ms] | |
samples = np.array(clipped_audio.get_array_of_samples()) | |
if clipped_audio.channels == 2: | |
samples = samples.reshape((-1, 2)).mean(axis=1).astype(samples.dtype) | |
frame_rate = clipped_audio.frame_rate | |
if frame_rate <= 0: | |
print(f"Warning: Invalid frame rate ({frame_rate}) detected for clipped audio.") | |
frame_rate = audio.frame_rate | |
if samples.size == 0: | |
print(f"Warning: Clipped audio resulted in empty samples array ({start_second}s to {end_second}s).") | |
return None | |
return (frame_rate, samples) | |
except FileNotFoundError: | |
print(f"Error: Audio file not found at path: {audio_path}") | |
return None | |
except Exception as e: | |
print(f"Error clipping audio {audio_path} from {start_second}s to {end_second}s: {e}") | |
return None | |
def preprocess_audio(audio_path, session_dir): | |
""" | |
オーディオファイルの前処理(リサンプリング、モノラル変換)を行う。 | |
Args: | |
audio_path (str): 入力オーディオファイルのパス。 | |
session_dir (str): セッションディレクトリのパス。 | |
Returns: | |
tuple: (processed_path, info_path_name, duration_sec) のタプル、または None(処理に失敗した場合)。 | |
""" | |
try: | |
original_path_name = Path(audio_path).name | |
audio_name = Path(audio_path).stem | |
try: | |
gr.Info(f"Loading audio: {original_path_name}", duration=2) | |
audio = AudioSegment.from_file(audio_path) | |
duration_sec = audio.duration_seconds | |
except Exception as load_e: | |
gr.Error(f"Failed to load audio file {original_path_name}: {load_e}", duration=None) | |
return None, None, None | |
resampled = False | |
mono = False | |
target_sr = 16000 | |
if audio.frame_rate != target_sr: | |
try: | |
audio = audio.set_frame_rate(target_sr) | |
resampled = True | |
except Exception as resample_e: | |
gr.Error(f"Failed to resample audio: {resample_e}", duration=None) | |
return None, None, None | |
if audio.channels == 2: | |
try: | |
audio = audio.set_channels(1) | |
mono = True | |
except Exception as mono_e: | |
gr.Error(f"Failed to convert audio to mono: {mono_e}", duration=None) | |
return None, None, None | |
elif audio.channels > 2: | |
gr.Error(f"Audio has {audio.channels} channels. Only mono (1) or stereo (2) supported.", duration=None) | |
return None, None, None | |
processed_audio_path = None | |
if resampled or mono: | |
try: | |
processed_audio_path = Path(session_dir, f"{audio_name}_resampled.wav") | |
audio.export(processed_audio_path, format="wav") | |
transcribe_path = processed_audio_path.as_posix() | |
info_path_name = f"{original_path_name} (processed)" | |
except Exception as export_e: | |
gr.Error(f"Failed to export processed audio: {export_e}", duration=None) | |
if processed_audio_path and os.path.exists(processed_audio_path): | |
os.remove(processed_audio_path) | |
return None, None, None | |
else: | |
transcribe_path = audio_path | |
info_path_name = original_path_name | |
return transcribe_path, info_path_name, duration_sec | |
except Exception as e: | |
gr.Error(f"Audio preprocessing failed: {e}", duration=None) | |
return None, None, None | |
def transcribe_audio(transcribe_path, model, duration_sec, device): | |
""" | |
オーディオファイルを文字起こしし、タイムスタンプを取得する。 | |
Args: | |
transcribe_path (str): 入力オーディオファイルのパス。 | |
model (ASRModel): 使用するASRモデル。 | |
duration_sec (float): オーディオファイルの長さ(秒)。 | |
device (str): 使用するデバイス('cuda' or 'cpu')。 | |
Returns: | |
tuple: (vis_data, raw_times_data, word_vis_data) のタプル、または None(処理に失敗した場合)。 | |
""" | |
long_audio_settings_applied = False | |
try: | |
# CUDA使用前にメモリをクリア | |
if device == 'cuda': | |
torch.cuda.empty_cache() | |
gc.collect() | |
model.to(device) | |
model.to(torch.float32) | |
gr.Info(f"Transcribing on {device}...", duration=2) | |
if duration_sec > 480: | |
try: | |
gr.Info("Audio longer than 8 minutes. Applying optimized settings for long transcription.", duration=3) | |
print("Applying long audio settings: Local Attention and Chunking.") | |
model.change_attention_model("rel_pos_local_attn", [256,256]) | |
model.change_subsampling_conv_chunking_factor(1) | |
# メモリ効率を改善するための設定 | |
torch.cuda.empty_cache() | |
gc.collect() | |
long_audio_settings_applied = True | |
except Exception as setting_e: | |
gr.Warning(f"Could not apply long audio settings: {setting_e}", duration=5) | |
print(f"Warning: Failed to apply long audio settings: {setting_e}") | |
# より効率的なメモリ使用のためにbfloat16を使用 | |
model.to(torch.bfloat16) | |
# メモリ使用状況をログに出力 | |
if device == 'cuda': | |
print(f"CUDA Memory before transcription: {torch.cuda.memory_allocated() / 1024**2:.2f} MB") | |
output = model.transcribe([transcribe_path], timestamps=True) | |
if not output or not isinstance(output, list) or not output[0] or not hasattr(output[0], 'timestamp') or not output[0].timestamp or 'segment' not in output[0].timestamp: | |
gr.Error("Transcription failed or produced unexpected output format.", duration=None) | |
return None, None, None | |
# 結果を処理する前にメモリを解放 | |
if device == 'cuda': | |
model.cpu() | |
torch.cuda.empty_cache() | |
gc.collect() | |
segment_timestamps = output[0].timestamp['segment'] | |
vis_data = [[f"{ts['start']:.2f}", f"{ts['end']:.2f}", ts['segment']] for ts in segment_timestamps] | |
raw_times_data = [[ts['start'], ts['end']] for ts in segment_timestamps] | |
word_timestamps_raw = output[0].timestamp.get("word", []) | |
word_vis_data = [ | |
[f"{w['start']:.2f}", f"{w['end']:.2f}", w["word"]] | |
for w in word_timestamps_raw if isinstance(w, dict) and 'start' in w and 'end' in w and 'word' in w | |
] | |
gr.Info("Transcription complete.", duration=2) | |
return vis_data, raw_times_data, word_vis_data | |
except torch.cuda.OutOfMemoryError as e: | |
error_msg = 'CUDA out of memory. Please try a shorter audio or reduce GPU load.' | |
print(f"CUDA OutOfMemoryError: {e}") | |
gr.Error(error_msg, duration=None) | |
# メモリエラー時に強制的にクリーンアップ | |
if device == 'cuda': | |
torch.cuda.empty_cache() | |
gc.collect() | |
return None, None, None | |
except Exception as e: | |
error_msg = f"Transcription failed: {e}" | |
print(f"Error during transcription processing: {e}") | |
gr.Error(error_msg, duration=None) | |
return None, None, None | |
finally: | |
try: | |
if long_audio_settings_applied: | |
try: | |
print("Reverting long audio settings.") | |
model.change_attention_model("rel_pos") | |
model.change_subsampling_conv_chunking_factor(-1) | |
except Exception as revert_e: | |
print(f"Warning: Failed to revert long audio settings: {revert_e}") | |
gr.Warning(f"Issue reverting model settings after long transcription: {revert_e}", duration=5) | |
if device == 'cuda': | |
model.cpu() | |
torch.cuda.empty_cache() | |
gc.collect() | |
except Exception as cleanup_e: | |
print(f"Error during model cleanup: {cleanup_e}") | |
gr.Warning(f"Issue during model cleanup: {cleanup_e}", duration=5) | |
def save_transcripts(session_dir, audio_name, vis_data, word_vis_data): | |
""" | |
文字起こし結果を各種ファイル形式(CSV、SRT、VTT、JSON、LRC)で保存する。 | |
Args: | |
session_dir (str): セッションディレクトリのパス。 | |
audio_name (str): オーディオファイルの名前。 | |
vis_data (list): 表示用の文字起こし結果のリスト。 | |
word_vis_data (list): 単語レベルのタイムスタンプのリスト。 | |
Returns: | |
tuple: 各ファイルのダウンロードボタンの更新情報を含むタプル。 | |
""" | |
try: | |
csv_headers = ["Start (s)", "End (s)", "Segment"] | |
csv_file_path = Path(session_dir, f"transcription_{audio_name}.csv") | |
with open(csv_file_path, 'w', newline='', encoding='utf-8') as f: | |
writer = csv.writer(f) | |
writer.writerow(csv_headers) | |
writer.writerows(vis_data) | |
print(f"CSV transcript saved to temporary file: {csv_file_path}") | |
srt_file_path = Path(session_dir, f"transcription_{audio_name}.srt") | |
vtt_file_path = Path(session_dir, f"transcription_{audio_name}.vtt") | |
json_file_path = Path(session_dir, f"transcription_{audio_name}.json") | |
write_srt(vis_data, srt_file_path) | |
write_vtt(vis_data, word_vis_data, vtt_file_path) | |
write_json(vis_data, word_vis_data, json_file_path) | |
print(f"SRT, VTT, JSON transcript saved to temporary files: {srt_file_path}, {vtt_file_path}, {json_file_path}") | |
lrc_file_path = Path(session_dir, f"transcription_{audio_name}.lrc") | |
write_lrc(vis_data, lrc_file_path) | |
print(f"LRC transcript saved to temporary file: {lrc_file_path}") | |
return ( | |
gr.DownloadButton(value=csv_file_path.as_posix(), visible=True), | |
gr.DownloadButton(value=srt_file_path.as_posix(), visible=True), | |
gr.DownloadButton(value=vtt_file_path.as_posix(), visible=True), | |
gr.DownloadButton(value=json_file_path.as_posix(), visible=True), | |
gr.DownloadButton(value=lrc_file_path.as_posix(), visible=True) | |
) | |
except Exception as e: | |
gr.Error(f"Failed to create transcript files: {e}", duration=None) | |
print(f"Error writing transcript files: {e}") | |
return tuple([gr.DownloadButton(visible=False)] * 5) | |
def split_audio_with_overlap(audio_path: str, session_dir: str, chunk_length_sec: int = 3600, overlap_sec: int = 30) -> List[str]: | |
""" | |
音声ファイルをchunk_length_secごとにoverlap_secのオーバーラップ付きで分割し、 | |
分割ファイルのパスリストを返す。 | |
""" | |
audio = AudioSegment.from_file(audio_path) | |
duration = audio.duration_seconds | |
chunk_paths = [] | |
start = 0 | |
chunk_idx = 0 | |
while start < duration: | |
end = min(start + chunk_length_sec, duration) | |
# オーバーラップを考慮 | |
chunk_start = max(0, start - (overlap_sec if start > 0 else 0)) | |
chunk_end = min(end + (overlap_sec if end < duration else 0), duration) | |
chunk = audio[chunk_start * 1000:chunk_end * 1000] | |
chunk_path = Path(session_dir, f"chunk_{chunk_idx:03d}.wav").as_posix() | |
chunk.export(chunk_path, format="wav") | |
chunk_paths.append(chunk_path) | |
start += chunk_length_sec | |
chunk_idx += 1 | |
return chunk_paths | |
def get_transcripts_and_raw_times(audio_path, session_dir, progress=gr.Progress(track_tqdm=True)): | |
""" | |
オーディオファイルを処理し、文字起こし結果を生成する。 | |
3時間を超える場合は60分ごとに分割し、オーバーラップ付きでASRを実行してマージする。 | |
""" | |
if not audio_path: | |
gr.Error("No audio file path provided for transcription.", duration=None) | |
return [], [], [], None, gr.DownloadButton(visible=False), gr.DownloadButton(visible=False), gr.DownloadButton(visible=False), gr.DownloadButton(visible=False), gr.DownloadButton(visible=False) | |
audio_name = Path(audio_path).stem | |
processed_audio_path = None | |
temp_chunk_paths = [] | |
try: | |
# オーディオの前処理 | |
transcribe_path, info_path_name, duration_sec = preprocess_audio(audio_path, session_dir) | |
if not transcribe_path or not duration_sec: | |
return [], [], [], audio_path, gr.DownloadButton(visible=False), gr.DownloadButton(visible=False), gr.DownloadButton(visible=False), gr.DownloadButton(visible=False), gr.DownloadButton(visible=False) | |
processed_audio_path = transcribe_path if transcribe_path != audio_path else None # 3時間超の場合は分割して逐次ASR | |
if duration_sec > 10800: | |
gr.Info("Audio is longer than 3 hours. Splitting into 1-hour chunks with overlap for transcription.", duration=5) | |
chunk_paths = split_audio_with_overlap(transcribe_path, session_dir, chunk_length_sec=3600, overlap_sec=30) | |
temp_chunk_paths = chunk_paths.copy() | |
all_vis_data = [] | |
all_raw_times_data = [] | |
all_word_vis_data = [] | |
offset = 0.0 | |
prev_end = 0.0 | |
for i, chunk_path in enumerate(progress.tqdm(chunk_paths, desc="Processing audio chunks")): | |
chunk_audio = AudioSegment.from_file(chunk_path) | |
chunk_duration = chunk_audio.duration_seconds | |
# ASR実行 | |
result = transcribe_audio(chunk_path, model, chunk_duration, device) | |
if not result: | |
continue | |
vis_data, raw_times_data, word_vis_data = result | |
# タイムスタンプを全体のオフセットに合わせて補正 | |
vis_data_offset = [] | |
raw_times_data_offset = [] | |
word_vis_data_offset = [] | |
for row in vis_data: | |
s, e, seg = float(row[0]), float(row[1]), row[2] | |
vis_data_offset.append([f"{s+offset:.2f}", f"{e+offset:.2f}", seg]) | |
for row in raw_times_data: | |
s, e = float(row[0]), float(row[1]) | |
raw_times_data_offset.append([s+offset, e+offset]) | |
for row in word_vis_data: | |
s, e, w = float(row[0]), float(row[1]), row[2] | |
word_vis_data_offset.append([f"{s+offset:.2f}", f"{e+offset:.2f}", w]) | |
# オーバーラップ部分の重複除去(単純に前回のend以降のみ追加) | |
vis_data_offset = [row for row in vis_data_offset if float(row[0]) >= prev_end] | |
raw_times_data_offset = [row for row in raw_times_data_offset if row[0] >= prev_end] | |
word_vis_data_offset = [row for row in word_vis_data_offset if float(row[0]) >= prev_end] | |
if vis_data_offset: | |
prev_end = float(vis_data_offset[-1][1]) | |
all_vis_data.extend(vis_data_offset) | |
all_raw_times_data.extend(raw_times_data_offset) | |
all_word_vis_data.extend(word_vis_data_offset) | |
offset += chunk_duration - (30 if i < len(chunk_paths)-1 else 0) | |
# ファイルの保存 | |
button_updates = save_transcripts(session_dir, audio_name, all_vis_data, all_word_vis_data) | |
# 一時分割ファイル削除 | |
for p in temp_chunk_paths: | |
try: | |
os.remove(p) | |
except Exception: | |
pass | |
return ( | |
all_vis_data, | |
all_raw_times_data, | |
all_word_vis_data, | |
audio_path, | |
*button_updates | |
) | |
else: | |
# 3時間以内は従来通り | |
result = transcribe_audio(transcribe_path, model, duration_sec, device) | |
if not result: | |
return [], [], [], audio_path, gr.DownloadButton(visible=False), gr.DownloadButton(visible=False), gr.DownloadButton(visible=False), gr.DownloadButton(visible=False), gr.DownloadButton(visible=False) | |
vis_data, raw_times_data, word_vis_data = result | |
button_updates = save_transcripts(session_dir, audio_name, vis_data, word_vis_data) | |
return ( | |
vis_data, | |
raw_times_data, | |
word_vis_data, | |
audio_path, | |
*button_updates | |
) | |
finally: | |
if processed_audio_path and os.path.exists(processed_audio_path): | |
try: | |
os.remove(processed_audio_path) | |
print(f"Temporary audio file {processed_audio_path} removed.") | |
except Exception as e: | |
print(f"Error removing temporary audio file {processed_audio_path}: {e}") | |
# 分割ファイルの掃除 | |
for p in temp_chunk_paths: | |
if os.path.exists(p): | |
try: | |
os.remove(p) | |
except Exception: | |
pass | |
def play_segment(evt: gr.SelectData, raw_ts_list, current_audio_path): | |
if not isinstance(raw_ts_list, list): | |
print(f"Warning: raw_ts_list is not a list ({type(raw_ts_list)}). Cannot play segment.") | |
return gr.Audio(value=None, label="Selected Segment") | |
if not current_audio_path: | |
print("No audio path available to play segment from.") | |
return gr.Audio(value=None, label="Selected Segment") | |
selected_index = evt.index[0] | |
if selected_index < 0 or selected_index >= len(raw_ts_list): | |
print(f"Invalid index {selected_index} selected for list of length {len(raw_ts_list)}.") | |
return gr.Audio(value=None, label="Selected Segment") | |
if not isinstance(raw_ts_list[selected_index], (list, tuple)) or len(raw_ts_list[selected_index]) != 2: | |
print(f"Warning: Data at index {selected_index} is not in the expected format [start, end].") | |
return gr.Audio(value=None, label="Selected Segment") | |
start_time_s, end_time_s = raw_ts_list[selected_index] | |
print(f"Attempting to play segment: {current_audio_path} from {start_time_s:.2f}s to {end_time_s:.2f}s") | |
segment_data = get_audio_segment(current_audio_path, start_time_s, end_time_s) | |
if segment_data: | |
print("Segment data retrieved successfully.") | |
return gr.Audio(value=segment_data, autoplay=True, label=f"Segment: {start_time_s:.2f}s - {end_time_s:.2f}s", interactive=False) | |
else: | |
print("Failed to get audio segment data.") | |
return gr.Audio(value=None, label="Selected Segment") | |
def write_srt(segments, path): | |
def sec2srt(t): | |
h, rem = divmod(int(float(t)), 3600) | |
m, s = divmod(rem, 60) | |
ms = int((float(t) - int(float(t))) * 1000) | |
return f"{h:02}:{m:02}:{s:02},{ms:03}" | |
with open(path, "w", encoding="utf-8") as f: | |
for i, seg in enumerate(segments, 1): | |
f.write(f"{i}\n{sec2srt(seg[0])} --> {sec2srt(seg[1])}\n{seg[2]}\n\n") | |
def write_vtt(segments, words, path): | |
def sec2vtt(t): | |
h, rem = divmod(int(float(t)), 3600) | |
m, s = divmod(rem, 60) | |
ms = int((float(t) - int(float(t))) * 1000) | |
return f"{h:02}:{m:02}:{s:02}.{ms:03}" | |
with open(path, "w", encoding="utf-8") as f: | |
f.write("WEBVTT\n\n") | |
word_idx = 0 | |
for seg_idx, seg in enumerate(segments): # segmentにもインデックスが必要な場合に備えてenumerateする | |
s_start = float(seg[0]) | |
s_end = float(seg[1]) | |
# s_text = seg[2] # s_textはこの関数内では直接VTT出力に使われていない模様 | |
segment_words = [] | |
temp_word_idx = word_idx # 現在のword_idxから探索を開始 | |
while temp_word_idx < len(words): | |
w = words[temp_word_idx] | |
w_start_val = float(w[0]) | |
w_end_val = float(w[1]) | |
# 単語が現在のセグメントに完全に含まれるか、一部でも重なっていれば含める | |
# ここでは元のロジックを踏襲し、セグメント内に開始・終了がある単語を対象とする | |
if w_start_val >= s_start and w_end_val <= s_end: | |
segment_words.append(w) | |
if temp_word_idx == word_idx: # segment_words に追加された最初の単語なら word_idx を進める | |
word_idx = temp_word_idx + 1 | |
temp_word_idx += 1 | |
elif w_start_val < s_start and w_end_val > s_start: # 単語がセグメント開始をまたぐ場合 | |
# 必要であれば、このようなケースの単語も segment_words に含める処理を追加 | |
temp_word_idx += 1 | |
elif w_start_val > s_end: # 単語の開始がセグメントの終了より後なら、このセグメントの単語は終わり | |
break | |
else: # 上記以外 (単語がセグメントより完全に前など) | |
if temp_word_idx == word_idx: # word_idx が進まない場合を避ける | |
word_idx = temp_word_idx + 1 | |
temp_word_idx += 1 | |
# 各単語ごとにタイムスタンプを生成 | |
for i, word_data in enumerate(segment_words): | |
w_start = float(word_data[0]) | |
w_end = float(word_data[1]) | |
# 現在の単語を強調表示し、他の単語は通常表示 | |
colored_text = "" | |
for j, other_word_data in enumerate(segment_words): | |
if j == i: # 現在の単語 (i番目) を強調 | |
colored_text += f"<c.yellow><b>{other_word_data[2]}</b></c> " | |
else: | |
colored_text += f"{other_word_data[2]} " | |
f.write(f"{sec2vtt(w_start)} --> {sec2vtt(w_end)}\n{colored_text.strip()}\n\n") | |
def write_json(segments, words, path): | |
result = {"segments": []} | |
word_idx = 0 | |
for s in segments: | |
s_start = float(s[0]) | |
s_end = float(s[1]) | |
s_text = s[2] | |
word_list = [] | |
while word_idx < len(words): | |
w = words[word_idx] | |
w_start = float(w[0]) | |
w_end = float(w[1]) | |
if w_start >= s_start and w_end <= s_end: | |
word_list.append({"start": w_start, "end": w_end, "word": w[2]}) | |
word_idx += 1 | |
elif w_end < s_start: | |
word_idx += 1 | |
else: | |
break | |
result["segments"].append({ | |
"start": s_start, | |
"end": s_end, | |
"text": s_text, | |
"words": word_list | |
}) | |
with open(path, "w", encoding="utf-8") as f: | |
json.dump(result, f, ensure_ascii=False, indent=2) | |
def write_lrc(segments, path): | |
def sec2lrc(t): | |
m, s = divmod(float(t), 60) | |
return f"[{int(m):02}:{s:05.2f}]" | |
with open(path, "w", encoding="utf-8") as f: | |
for seg in segments: | |
f.write(f"{sec2lrc(seg[0])}{seg[2]}\n") | |
article = ( | |
"<p style='font-size: 1.1em;'>" | |
"このデモは <code><a href='https://huggingface.co/nvidia/parakeet-tdt-0.6b-v2' target='_blank'>parakeet-tdt-0.6b-v2</a></code> " | |
"(約6億パラメータ)を用いた高精度な英語音声文字起こしを実演します。" | |
"</p>" | |
"<p><strong style='color: red; font-size: 1.2em;'>主な特長:</strong></p>" | |
"<ul style='font-size: 1.1em;'>" " <li>自動句読点・大文字化</li>" | |
" <li>単語レベルのタイムスタンプ(下表クリックで該当区間を再生)</li>" | |
" <li>文字レベルのタイムスタンプ表示にも対応</li>" | |
" <li>自動チャンク処理による <strong>長時間音声</strong> の効率的な文字起こし(数時間以上の音声にも対応)</li>" | |
" <li>数字や歌詞など発話の多様なケースに高いロバスト性</li>" | |
"</ul>" | |
"<p style='font-size: 1.1em;'>" | |
"商用・非商用ともに <strong>ライセンス制限なく利用可能</strong> です。" | |
"</p>" | |
"<p style='text-align: center;'>" | |
"<a href='https://huggingface.co/nvidia/parakeet-tdt-0.6b-v2' target='_blank'>🎙️ モデル詳細</a> | " | |
"<a href='https://arxiv.org/abs/2305.05084' target='_blank'>📄 Fast Conformer 論文</a> | " | |
"<a href='https://arxiv.org/abs/2304.06795' target='_blank'>📚 TDT 論文</a> | " | |
"<a href='https://github.com/NVIDIA/NeMo' target='_blank'>🧑💻 NeMo リポジトリ</a>" | |
"</p>" | |
) | |
examples = [ | |
["data/example-yt_saTD1u8PorI.mp3"], | |
] | |
nvidia_theme = gr_themes.Default( | |
primary_hue=gr_themes.Color( | |
c50="#E6F1D9", c100="#CEE3B3", c200="#B5D58C", c300="#9CC766", | |
c400="#84B940", c500="#76B900", c600="#68A600", c700="#5A9200", | |
c800="#4C7E00", c900="#3E6A00", c950="#2F5600" | |
), | |
neutral_hue="gray", | |
font=[gr_themes.GoogleFont("Inter"), "ui-sans-serif", "system-ui", "sans-serif"], | |
).set() | |
with gr.Blocks(theme=nvidia_theme) as demo: | |
model_display_name = MODEL_NAME.split('/')[-1] if '/' in MODEL_NAME else MODEL_NAME | |
gr.Markdown(f"<h1 style='text-align: center; margin: 0 auto;'>長時間対応 音声文字起こし ({model_display_name})</h1>") | |
gr.HTML(article) | |
current_audio_path_state = gr.State(None) | |
raw_timestamps_list_state = gr.State([]) | |
session_dir_state = gr.State() | |
demo.load(start_session, outputs=[session_dir_state]) | |
with gr.Tabs(): | |
with gr.TabItem("Audio File"): | |
file_input = gr.Audio(sources=["upload"], type="filepath", label="Upload Audio File") | |
gr.Examples(examples=examples, inputs=[file_input], label="Example Audio Files (Click to Load)") | |
file_transcribe_btn = gr.Button("Transcribe Uploaded File", variant="primary") | |
with gr.TabItem("Microphone"): | |
mic_input = gr.Audio(sources=["microphone"], type="filepath", label="Record Audio") | |
mic_transcribe_btn = gr.Button("Transcribe Microphone Input", variant="primary") | |
gr.Markdown("---") | |
gr.Markdown("<p><strong style='color: #FF0000; font-size: 1.2em;'>Transcription Results</strong></p>") | |
download_btn = gr.DownloadButton(label="Download Segment Transcript (CSV)", visible=False) | |
srt_btn = gr.DownloadButton(label="Download SRT", visible=False) | |
vtt_btn = gr.DownloadButton(label="Download VTT", visible=False) | |
json_btn = gr.DownloadButton(label="Download JSON", visible=False) | |
lrc_btn = gr.DownloadButton(label="Download LRC", visible=False) | |
with gr.Tabs(): | |
with gr.TabItem("Segment View (Click row to play segment)"): | |
vis_timestamps_df = gr.DataFrame( | |
headers=["Start (s)", "End (s)", "Segment"], | |
datatype=["number", "number", "str"], | |
wrap=True, | |
) | |
selected_segment_player = gr.Audio(label="Selected Segment", interactive=False) | |
with gr.TabItem("Word View"): | |
word_vis_df = gr.DataFrame( | |
headers=["Start (s)", "End (s)", "Word"], | |
datatype=["number", "number", "str"], | |
wrap=False, | |
) | |
mic_transcribe_btn.click( | |
fn=get_transcripts_and_raw_times, | |
inputs=[mic_input, session_dir_state], | |
outputs=[vis_timestamps_df, raw_timestamps_list_state, word_vis_df, current_audio_path_state, download_btn, srt_btn, vtt_btn, json_btn, lrc_btn], | |
api_name="transcribe_mic" | |
) | |
file_transcribe_btn.click( | |
fn=get_transcripts_and_raw_times, | |
inputs=[file_input, session_dir_state], | |
outputs=[vis_timestamps_df, raw_timestamps_list_state, word_vis_df, current_audio_path_state, download_btn, srt_btn, vtt_btn, json_btn, lrc_btn], | |
api_name="transcribe_file" | |
) | |
vis_timestamps_df.select( | |
fn=play_segment, | |
inputs=[raw_timestamps_list_state, current_audio_path_state], | |
outputs=[selected_segment_player], | |
) | |
demo.unload(end_session) | |
if __name__ == "__main__": | |
print("Launching Gradio Demo...") | |
demo.queue( | |
max_size=5, | |
default_concurrency_limit=1 # イベントリスナーのデフォルト同時実行数を1に設定 | |
) | |
demo.launch( | |
server_name="127.0.0.1", | |
server_port=7860, | |
share=False, | |
max_threads=1 # サーバー全体の同時処理スレッド数を1に設定 | |
) |