Spaces:
Running
on
Zero
Running
on
Zero
# coding: utf-8 | |
import torch | |
import gc | |
from pathlib import Path | |
from pydub import AudioSegment | |
# numpy は直接は使用されていませんが、pydubやNeMoの依存関係で間接的に必要になる可能性があります。 | |
# import numpy as np | |
import os | |
import csv | |
import json | |
from typing import List, Tuple, Optional, Set # Python 3.9+ では Optional, Set は typing から不要な場合あり | |
import argparse | |
import time # ★処理時間計測のために追加 | |
import sys # ★コマンドライン引数チェックのために追加 | |
from nemo.collections.asr.models import ASRModel # NeMo ASRモデル | |
import subprocess | |
import shutil | |
# --- グローバル設定 --- | |
MODEL_NAME = "nvidia/parakeet-tdt-0.6b-v2" | |
TARGET_SAMPLE_RATE = 16000 | |
# 音声の長さに関する閾値 (秒) | |
LONG_AUDIO_THRESHOLD_SECONDS = 480 # 8分 | |
VERY_LONG_AUDIO_THRESHOLD_SECONDS = 10800 # 3時間 | |
# チャンク分割時の設定 | |
CHUNK_LENGTH_SECONDS = 1800 # 30分 | |
CHUNK_OVERLAP_SECONDS = 60 # 1分 | |
# セグメント処理の設定 | |
MAX_SEGMENT_LENGTH_SECONDS = 15 # 最大セグメント長(秒)を15秒に短縮 | |
MAX_SEGMENT_CHARS = 100 # 最大セグメント文字数を100文字に短縮 | |
MIN_SEGMENT_GAP_SECONDS = 0.3 # 最小セグメント間隔(秒) | |
# VTTファイルの最大サイズ(バイト) | |
MAX_VTT_SIZE_BYTES = 10 * 1024 * 1024 # 10MB | |
# 文の区切り文字 | |
SENTENCE_ENDINGS = ['.', '!', '?', '。', '!', '?'] | |
SENTENCE_PAUSES = [',', '、', ';', ';', ':', ':'] | |
# ★ 入力ファイルの優先順位付き拡張子リスト | |
INPUT_PRIORITY_EXTENSIONS: List[str] = ['.wav', '.mp3', '.mp4'] | |
# ★ デフォルトで出力するフォーマットリスト | |
DEFAULT_OUTPUT_FORMATS: List[str] = ["csv", "srt", "vtt", "json", "lrc"] | |
# --- 音声前処理関数 --- | |
def preprocess_audio_cli(audio_path_str: str, output_dir_for_temp_files: str) -> Tuple[Optional[str], Optional[str], Optional[float]]: | |
""" | |
オーディオファイルの前処理(リサンプリング、モノラル変換)を行います。 | |
成功した場合、(処理済みファイルパス, 表示用名, 音声長) を返します。 | |
失敗した場合、(None, None, None) を返します。 | |
""" | |
try: | |
audio_file_path = Path(audio_path_str) | |
original_path_name = audio_file_path.name | |
audio_name_stem = audio_file_path.stem | |
print(f" 音声ファイルをロード中: {original_path_name}") | |
# まずffprobeで音声長を取得(4GB制限なし) | |
duration_sec = get_audio_duration_with_ffprobe(audio_path_str) | |
if duration_sec is None: | |
print("エラー: ffprobeで音声長の取得に失敗しました") | |
return None, None, None | |
print(f" 音声長: {duration_sec:.2f} 秒") | |
# ファイルサイズをチェック | |
file_size = Path(audio_path_str).stat().st_size | |
file_size_gb = file_size / (1024**3) | |
print(f" ファイルサイズ: {file_size_gb:.2f} GB") | |
# 4GB以上またはVERY_LONG_AUDIO_THRESHOLD_SECONDS以上の場合は直接ffmpegでチャンク分割 | |
if file_size > 4 * 1024**3 or duration_sec > VERY_LONG_AUDIO_THRESHOLD_SECONDS: | |
print(f" 大容量ファイル({file_size_gb:.2f}GB, {duration_sec/3600:.2f}時間)のため、ffmpegで直接チャンク分割処理を行います。") | |
# 大容量ファイルの場合もモノラル変換を行う | |
temp_mono_path = Path(output_dir_for_temp_files) / f"{audio_name_stem}_mono_temp.wav" | |
try: | |
cmd = [ | |
'ffmpeg', '-y', '-i', audio_path_str, | |
'-ac', '1', # モノラルに変換 | |
'-ar', str(TARGET_SAMPLE_RATE), # サンプルレートを設定 | |
str(temp_mono_path) | |
] | |
subprocess.run(cmd, capture_output=True, check=True) | |
return temp_mono_path.as_posix(), f"{original_path_name} (大容量・モノラル)", duration_sec | |
except subprocess.CalledProcessError as e: | |
print(f" ffmpegでのモノラル変換に失敗: {e}") | |
return audio_path_str, f"{original_path_name} (大容量)", duration_sec | |
# 4GB未満の場合は従来のpydub処理 | |
try: | |
audio = AudioSegment.from_file(audio_path_str) | |
except Exception as pydub_e: | |
if "4GB" in str(pydub_e) or "Unable to process" in str(pydub_e): | |
print(f" pydubで4GB制限エラー。ffmpegで処理します: {pydub_e}") | |
return audio_path_str, f"{original_path_name} (大容量)", duration_sec | |
else: | |
raise pydub_e | |
resampled = False | |
mono_converted = False | |
# リサンプリング処理 | |
if audio.frame_rate != TARGET_SAMPLE_RATE: | |
try: | |
print(f" リサンプリング中: {audio.frame_rate}Hz -> {TARGET_SAMPLE_RATE}Hz") | |
audio = audio.set_frame_rate(TARGET_SAMPLE_RATE) | |
resampled = True | |
except Exception as resample_e: | |
print(f"エラー: 音声のリサンプリングに失敗しました: {resample_e}") | |
return None, None, None | |
# モノラル変換処理 | |
if audio.channels > 1: | |
try: | |
print(f" モノラルに変換中 ({audio.channels}ch -> 1ch)") | |
audio = audio.set_channels(1) | |
mono_converted = True | |
except Exception as mono_e: | |
print(f"エラー: 音声のモノラル変換に失敗しました: {mono_e}") | |
return None, None, None | |
elif audio.channels == 1: | |
print(" 音声は既にモノラルです。") | |
processed_temp_file_path_obj = None | |
# 前処理が行われた場合、一時ファイルに保存 | |
if resampled or mono_converted: | |
try: | |
# ファイル名から特殊文字を除去してより安全な名前を生成 | |
import re | |
safe_stem = re.sub(r'[^\w\-_\.]', '_', audio_name_stem) | |
temp_suffix = "_preprocessed_temp.wav" | |
processed_temp_file_path_obj = Path(output_dir_for_temp_files, f"{safe_stem}{temp_suffix}") | |
print(f" 前処理済み音声の一時保存先: {processed_temp_file_path_obj.name}") | |
audio.export(processed_temp_file_path_obj, format="wav") | |
path_for_transcription = processed_temp_file_path_obj.as_posix() | |
display_name_for_info = f"{original_path_name} (前処理済み)" | |
except Exception as export_e: | |
print(f"エラー: 前処理済み音声のエクスポートに失敗しました: {export_e}") | |
if processed_temp_file_path_obj and processed_temp_file_path_obj.exists(): | |
try: | |
os.remove(processed_temp_file_path_obj) | |
except OSError: | |
pass | |
return None, None, None | |
else: | |
# 前処理が不要だった場合 | |
print(" 前処理は不要でした。元のファイルを使用します。") | |
path_for_transcription = audio_path_str | |
display_name_for_info = original_path_name | |
return path_for_transcription, display_name_for_info, duration_sec | |
except FileNotFoundError: | |
print(f"エラー: 音声ファイルが見つかりません: {audio_path_str}") | |
return None, None, None | |
except Exception as load_e: | |
print(f"エラー: 音声ファイル '{original_path_name}' のロード/デコードに失敗しました: {load_e}") | |
return None, None, None | |
def get_audio_duration_with_ffprobe(audio_path_str: str) -> Optional[float]: | |
"""ffprobeを使用して音声ファイルの長さを取得(4GB制限なし)""" | |
try: | |
# ffprobeが利用可能かチェック | |
if not shutil.which('ffprobe'): | |
print("警告: ffprobeが見つかりません。pydubでの処理を試行します。") | |
return None | |
cmd = [ | |
'ffprobe', '-v', 'quiet', '-show_entries', 'format=duration', | |
'-of', 'csv=p=0', audio_path_str | |
] | |
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) | |
if result.returncode == 0 and result.stdout.strip(): | |
duration = float(result.stdout.strip()) | |
return duration | |
else: | |
print(f"ffprobeエラー: {result.stderr}") | |
return None | |
except subprocess.TimeoutExpired: | |
print("ffprobeがタイムアウトしました") | |
return None | |
except Exception as e: | |
print(f"ffprobeでの音声長取得エラー: {e}") | |
return None | |
# --- 文字起こしコア関数 --- | |
def find_natural_break_point(text: str, max_length: int) -> int: | |
"""テキスト内で自然な区切り点を探す""" | |
if len(text) <= max_length: | |
return len(text) | |
# 文末で区切る | |
for i in range(max_length, 0, -1): | |
if i < len(text) and text[i] in SENTENCE_ENDINGS: | |
return i + 1 | |
# 文の区切りで区切る | |
for i in range(max_length, 0, -1): | |
if i < len(text) and text[i] in SENTENCE_PAUSES: | |
return i + 1 | |
# スペースで区切る | |
for i in range(max_length, 0, -1): | |
if i < len(text) and text[i].isspace(): | |
return i + 1 | |
# それでも見つからない場合は最大長で区切る | |
return max_length | |
def split_segment(segment: dict, max_length_seconds: float, max_chars: int) -> List[dict]: | |
"""セグメントを自然な区切りで分割する""" | |
if (segment['end'] - segment['start']) <= max_length_seconds and len(segment['segment']) <= max_chars: | |
return [segment] | |
result = [] | |
current_text = segment['segment'] | |
current_start = segment['start'] | |
total_duration = segment['end'] - segment['start'] | |
while current_text: | |
# 文字数に基づく分割点を探す | |
break_point = find_natural_break_point(current_text, max_chars) | |
# 時間に基づく分割点を計算 | |
text_ratio = break_point / len(segment['segment']) | |
segment_duration = total_duration * text_ratio | |
# 分割点が最大長を超えないように調整 | |
if segment_duration > max_length_seconds: | |
time_ratio = max_length_seconds / total_duration | |
break_point = int(len(segment['segment']) * time_ratio) | |
break_point = find_natural_break_point(current_text, break_point) | |
segment_duration = max_length_seconds | |
# 新しいセグメントを作成 | |
new_segment = { | |
'start': current_start, | |
'end': current_start + segment_duration, | |
'segment': current_text[:break_point].strip() | |
} | |
result.append(new_segment) | |
# 残りのテキストと開始時間を更新 | |
current_text = current_text[break_point:].strip() | |
current_start = new_segment['end'] | |
return result | |
def transcribe_audio_cli( | |
transcribe_path_str: str, | |
model: ASRModel, | |
duration_sec: float, | |
device: str | |
) -> Tuple[Optional[List], Optional[List], Optional[List]]: | |
long_audio_settings_applied = False | |
original_model_dtype = model.dtype | |
try: | |
if device == 'cuda': | |
torch.cuda.empty_cache() | |
gc.collect() | |
model.to(device) | |
# 音声長に応じてモデル設定を変更 | |
if duration_sec > LONG_AUDIO_THRESHOLD_SECONDS: | |
try: | |
print(f" 情報: 音声長 ({duration_sec:.0f}s) が閾値 ({LONG_AUDIO_THRESHOLD_SECONDS}s) を超えるため、長尺音声向け設定を適用します。") | |
model.change_attention_model( | |
self_attention_model="rel_pos_local_attn", | |
att_context_size=[128, 128] | |
) | |
model.change_subsampling_conv_chunking_factor(1) | |
long_audio_settings_applied = True | |
if device == 'cuda': | |
torch.cuda.empty_cache() | |
gc.collect() | |
except Exception as setting_e: | |
print(f" 警告: 長尺音声向け設定の適用に失敗しました: {setting_e}。デフォルト設定で続行します。") | |
if device == 'cuda' and torch.cuda.is_bf16_supported(): | |
print(" 情報: モデルを bfloat16 に変換して推論を実行します。") | |
model.to(torch.bfloat16) | |
elif model.dtype != original_model_dtype: | |
model.to(original_model_dtype) | |
print(f" 文字起こしを実行中 (デバイス: {device}, モデルdtype: {model.dtype})...") | |
output = model.transcribe( | |
[transcribe_path_str], | |
timestamps=True, | |
batch_size=2 | |
) | |
if not output or not isinstance(output, list) or not output[0] or \ | |
not hasattr(output[0], 'timestamp') or not output[0].timestamp or \ | |
'segment' not in output[0].timestamp: | |
print(" エラー: 文字起こしに失敗したか、予期しない出力形式です。") | |
return None, None, None | |
segment_timestamps = output[0].timestamp['segment'] | |
# セグメントの前処理:より適切なセグメント分割 | |
processed_segments = [] | |
current_segment = None | |
for ts in segment_timestamps: | |
if current_segment is None: | |
current_segment = ts | |
else: | |
# セグメント結合の条件を厳格化 | |
time_gap = ts['start'] - current_segment['end'] | |
current_text = current_segment['segment'] | |
next_text = ts['segment'] | |
# 結合条件のチェック | |
should_merge = ( | |
time_gap < MIN_SEGMENT_GAP_SECONDS and # 時間間隔が短い | |
len(current_text) + len(next_text) < MAX_SEGMENT_CHARS and # 文字数制限 | |
(current_segment['end'] - current_segment['start']) < MAX_SEGMENT_LENGTH_SECONDS and # 現在のセグメントが短い | |
(ts['end'] - ts['start']) < MAX_SEGMENT_LENGTH_SECONDS and # 次のセグメントが短い | |
not any(current_text.strip().endswith(p) for p in SENTENCE_ENDINGS) # 文の区切りでない | |
) | |
if should_merge: | |
current_segment['end'] = ts['end'] | |
current_segment['segment'] += ' ' + ts['segment'] | |
else: | |
# 現在のセグメントを分割 | |
split_segments = split_segment(current_segment, MAX_SEGMENT_LENGTH_SECONDS, MAX_SEGMENT_CHARS) | |
processed_segments.extend(split_segments) | |
current_segment = ts | |
if current_segment is not None: | |
# 最後のセグメントも分割 | |
split_segments = split_segment(current_segment, MAX_SEGMENT_LENGTH_SECONDS, MAX_SEGMENT_CHARS) | |
processed_segments.extend(split_segments) | |
# 処理済みセグメントからデータを生成 | |
vis_data = [[f"{ts['start']:.2f}", f"{ts['end']:.2f}", ts['segment']] for ts in processed_segments] | |
raw_times_data = [[ts['start'], ts['end']] for ts in processed_segments] | |
# 単語タイムスタンプの処理を改善 | |
word_timestamps_raw = output[0].timestamp.get("word", []) | |
word_vis_data = [] | |
for w in word_timestamps_raw: | |
if not isinstance(w, dict) or not all(k in w for k in ['start', 'end', 'word']): | |
continue | |
# 単語のタイムスタンプを最も近いセグメントに割り当て | |
word_start = float(w['start']) | |
word_end = float(w['end']) | |
# 単語が完全に含まれるセグメントを探す | |
for seg in processed_segments: | |
if word_start >= seg['start'] - 0.05 and word_end <= seg['end'] + 0.05: | |
word_vis_data.append([f"{word_start:.2f}", f"{word_end:.2f}", w["word"]]) | |
break | |
print(" 文字起こし完了。") | |
return vis_data, raw_times_data, word_vis_data | |
except torch.cuda.OutOfMemoryError as oom_e: | |
print(f" 致命的エラー: CUDAメモリ不足です。 {oom_e}") | |
print(" バッチサイズを小さくする、他のGPU利用アプリを終了するなどの対策を試みてください。") | |
return None, None, None | |
except Exception as e: | |
print(f" エラー: 文字起こし処理中に予期せぬエラーが発生しました: {e}") | |
import traceback | |
traceback.print_exc() | |
return None, None, None | |
finally: | |
if long_audio_settings_applied: | |
try: | |
print(" 長尺音声向け設定を元に戻しています。") | |
model.change_attention_model(self_attention_model="rel_pos") | |
model.change_subsampling_conv_chunking_factor(-1) | |
except Exception as revert_e: | |
print(f" 警告: 長尺音声設定の復元に失敗: {revert_e}") | |
model.to(original_model_dtype) | |
if model.device.type != 'cpu': | |
model.cpu() | |
if device == 'cuda': | |
torch.cuda.empty_cache() | |
gc.collect() | |
# --- 結果保存関数 --- | |
def save_transcripts_cli(output_dir_str: str, audio_file_stem: str, | |
vis_data: List, word_vis_data: List, formats: Optional[List[str]] = None): | |
if formats is None: | |
formats_to_save = DEFAULT_OUTPUT_FORMATS | |
else: | |
formats_to_save = formats | |
output_dir_path = Path(output_dir_str) | |
output_dir_path.mkdir(parents=True, exist_ok=True) | |
saved_files_count = 0 | |
print(f" 結果を保存中 (対象形式: {', '.join(formats_to_save)})...") | |
try: | |
if "csv" in formats_to_save: | |
csv_file_path = output_dir_path / f"{audio_file_stem}.csv" | |
csv_headers = ["Start (s)", "End (s)", "Segment"] | |
with open(csv_file_path, 'w', newline='', encoding='utf-8') as f: | |
writer = csv.writer(f); writer.writerow(csv_headers); writer.writerows(vis_data) | |
print(f" CSVファイルを保存: {csv_file_path.name}"); saved_files_count +=1 | |
if "srt" in formats_to_save: | |
srt_file_path = output_dir_path / f"{audio_file_stem}.srt" | |
write_srt(vis_data, srt_file_path) | |
print(f" SRTファイルを保存: {srt_file_path.name}"); saved_files_count +=1 | |
if "vtt" in formats_to_save: | |
vtt_file_path = output_dir_path / f"{audio_file_stem}.vtt" | |
try: | |
write_vtt(vis_data, word_vis_data, vtt_file_path) | |
print(f" VTTファイルを保存: {vtt_file_path.name}"); saved_files_count +=1 | |
except ValueError as e: | |
if "VTTファイルサイズが制限を超えました" in str(e): | |
print(f" エラー: {e}") | |
# 既に作成されたVTTファイルを削除 | |
if vtt_file_path.exists(): | |
vtt_file_path.unlink() | |
raise # エラーを上位に伝播 | |
if "json" in formats_to_save: | |
json_file_path = output_dir_path / f"{audio_file_stem}.json" | |
write_json(vis_data, word_vis_data, json_file_path) | |
print(f" JSONファイルを保存: {json_file_path.name}"); saved_files_count +=1 | |
if "lrc" in formats_to_save: | |
lrc_file_path = output_dir_path / f"{audio_file_stem}.lrc" | |
write_lrc(vis_data, lrc_file_path) | |
print(f" LRCファイルを保存: {lrc_file_path.name}"); saved_files_count +=1 | |
if saved_files_count == 0 and formats_to_save: | |
print(f" 警告: 指定されたフォーマット {formats_to_save} でのファイルの保存は行われませんでした。") | |
except Exception as e: | |
print(f" エラー: 文字起こしファイルの保存中にエラーが発生しました: {e}") | |
raise # エラーを上位に伝播 | |
# --- 書き出しヘルパー関数群 (SRT, VTT, JSON, LRC) --- | |
def write_srt(segments: List, path: Path): | |
def sec2srt(t_float: float) -> str: | |
h, rem = divmod(int(t_float), 3600); m, s = divmod(rem, 60) | |
ms = int((t_float - int(t_float)) * 1000) | |
return f"{h:02}:{m:02}:{s:02},{ms:03}" | |
with open(path, "w", encoding="utf-8") as f: | |
for i, seg_list in enumerate(segments, 1): | |
f.write(f"{i}\n{sec2srt(float(seg_list[0]))} --> {sec2srt(float(seg_list[1]))}\n{seg_list[2]}\n\n") | |
def write_vtt(segments: List, words: List, path: Path): | |
def sec2vtt(t_float: float) -> str: | |
h, rem = divmod(int(t_float), 3600) | |
m, s = divmod(rem, 60) | |
ms = int((t_float - int(t_float)) * 1000) | |
return f"{h:02}:{m:02}:{s:02}.{ms:03}" | |
with open(path, "w", encoding="utf-8") as f: | |
f.write("WEBVTT\n\n") | |
f.write("STYLE\n") | |
f.write("::cue(.current) { color: #ffff00; font-weight: bold; }\n") | |
f.write("::cue(.past) { color: #888888; }\n") | |
f.write("::cue(.future) { color: #ffffff; }\n") | |
f.write("::cue(.line) { background: rgba(0,0,0,0.7); padding: 4px; }\n\n") | |
if not words: | |
# 単語タイムスタンプがない場合は、セグメント単位で出力 | |
for i, seg_list in enumerate(segments, 1): | |
f.write(f"NOTE Segment {i}\n") | |
f.write(f"{sec2vtt(float(seg_list[0]))} --> {sec2vtt(float(seg_list[1]))}\n{seg_list[2]}\n\n") | |
# ファイルサイズをチェック | |
current_size = f.tell() | |
if current_size > MAX_VTT_SIZE_BYTES: | |
print(f"警告: VTTファイルが{MAX_VTT_SIZE_BYTES/1024/1024:.1f}MBを超えました。処理を中止します。") | |
raise ValueError("VTTファイルサイズが制限を超えました") | |
return | |
# セグメント単位で処理 | |
for seg_data in segments: | |
seg_start = float(seg_data[0]) | |
seg_end = float(seg_data[1]) | |
# このセグメントに含まれる単語を特定 | |
segment_words = [] | |
for word_idx, word_data in enumerate(words): | |
word_start = float(word_data[0]) | |
word_end = float(word_data[1]) | |
if word_start >= seg_start - 0.1 and word_end <= seg_end + 0.1: | |
segment_words.append((word_idx, word_data)) | |
if not segment_words: | |
continue | |
# セグメント内の全単語のテキストを一度だけ生成 | |
all_words = [w_data[2] for _, w_data in segment_words] | |
# セグメント開始から最初の単語まで | |
first_word_start = float(segment_words[0][1][0]) | |
if seg_start < first_word_start - 0.05: | |
f.write(f"{sec2vtt(seg_start)} --> {sec2vtt(first_word_start)}\n") | |
f.write(f'<c.line>{" ".join(f"<c.future>{w}</c>" for w in all_words)}</c>\n\n') | |
# ファイルサイズをチェック | |
current_size = f.tell() | |
if current_size > MAX_VTT_SIZE_BYTES: | |
print(f"警告: VTTファイルが{MAX_VTT_SIZE_BYTES/1024/1024:.1f}MBを超えました。処理を中止します。") | |
raise ValueError("VTTファイルサイズが制限を超えました") | |
# 各単語の処理 | |
for local_idx, (_, word_data) in enumerate(segment_words): | |
w_start = float(word_data[0]) | |
w_end = float(word_data[1]) | |
# 単語の表示時間を出力 | |
f.write(f"{sec2vtt(w_start)} --> {sec2vtt(w_end)}\n") | |
# 現在の単語をハイライトしたテキストを生成 | |
line_parts = [] | |
for i, w in enumerate(all_words): | |
if i == local_idx: | |
line_parts.append(f'<c.current>{w}</c>') | |
elif i < local_idx: | |
line_parts.append(f'<c.past>{w}</c>') | |
else: | |
line_parts.append(f'<c.future>{w}</c>') | |
f.write(f'<c.line>{" ".join(line_parts)}</c>\n\n') | |
# ファイルサイズをチェック | |
current_size = f.tell() | |
if current_size > MAX_VTT_SIZE_BYTES: | |
print(f"警告: VTTファイルが{MAX_VTT_SIZE_BYTES/1024/1024:.1f}MBを超えました。処理を中止します。") | |
raise ValueError("VTTファイルサイズが制限を超えました") | |
# 単語間の無音期間の処理 | |
if local_idx < len(segment_words) - 1: | |
next_word_start = float(segment_words[local_idx + 1][1][0]) | |
gap_duration = next_word_start - w_end | |
if gap_duration > 0.05: # 50ms以上の無音期間がある場合 | |
f.write(f"{sec2vtt(w_end)} --> {sec2vtt(next_word_start)}\n") | |
f.write(f'<c.line>{" ".join(f"<c.past>{w}</c>" if i <= local_idx else f"<c.future>{w}</c>" for i, w in enumerate(all_words))}</c>\n\n') | |
# ファイルサイズをチェック | |
current_size = f.tell() | |
if current_size > MAX_VTT_SIZE_BYTES: | |
print(f"警告: VTTファイルが{MAX_VTT_SIZE_BYTES/1024/1024:.1f}MBを超えました。処理を中止します。") | |
raise ValueError("VTTファイルサイズが制限を超えました") | |
# 最後の単語からセグメント終了まで | |
last_word_end = float(segment_words[-1][1][1]) | |
if last_word_end < seg_end - 0.05: | |
f.write(f"{sec2vtt(last_word_end)} --> {sec2vtt(seg_end)}\n") | |
f.write(f'<c.line>{" ".join(f"<c.past>{w}</c>" for w in all_words)}</c>\n\n') | |
# ファイルサイズをチェック | |
current_size = f.tell() | |
if current_size > MAX_VTT_SIZE_BYTES: | |
print(f"警告: VTTファイルが{MAX_VTT_SIZE_BYTES/1024/1024:.1f}MBを超えました。処理を中止します。") | |
raise ValueError("VTTファイルサイズが制限を超えました") | |
def write_json(segments: List, words: List, path: Path): | |
result = {"segments": []}; word_idx = 0 | |
for seg_data in segments: | |
s_start_time = float(seg_data[0]); s_end_time = float(seg_data[1]); s_text = seg_data[2] | |
segment_words_list: List[dict] = []; temp_current_word_idx = word_idx | |
if words: | |
while temp_current_word_idx < len(words): | |
w_data = words[temp_current_word_idx]; w_start_time = float(w_data[0]); w_end_time = float(w_data[1]) | |
if w_start_time >= s_start_time and w_end_time <= s_end_time + 0.1: | |
segment_words_list.append({"start": w_start_time, "end": w_end_time, "word": w_data[2]}) | |
temp_current_word_idx += 1 | |
elif w_start_time < s_start_time : | |
temp_current_word_idx += 1 | |
elif w_start_time > s_end_time: | |
break | |
else: | |
temp_current_word_idx += 1 | |
word_idx = temp_current_word_idx | |
result["segments"].append({"start": s_start_time, "end": s_end_time, "text": s_text, "words": segment_words_list}) | |
with open(path, "w", encoding="utf-8") as f: | |
json.dump(result, f, ensure_ascii=False, indent=2) | |
def write_lrc(segments: List, path: Path): | |
def sec2lrc(t_float: float) -> str: | |
m, s = divmod(float(t_float), 60) | |
return f"[{int(m):02d}:{s:05.2f}]" | |
with open(path, "w", encoding="utf-8") as f: | |
for seg_list in segments: | |
f.write(f"{sec2lrc(float(seg_list[0]))}{seg_list[2]}\n") | |
# --- 音声分割関数 --- | |
def split_audio_with_overlap_cli( | |
audio_path_str: str, | |
output_dir_for_chunks: str, | |
chunk_length_sec: int = CHUNK_LENGTH_SECONDS, | |
overlap_sec: int = CHUNK_OVERLAP_SECONDS | |
) -> List[str]: | |
print(f" 音声分割中: 基本チャンク長 {chunk_length_sec}s, オーバーラップ {overlap_sec}s") | |
# ファイルサイズをチェックして処理方法を決定 | |
file_size = Path(audio_path_str).stat().st_size | |
file_size_gb = file_size / (1024**3) | |
# 4GB以上の場合はffmpegを使用 | |
if file_size > 4 * 1024**3: | |
print(f" 大容量ファイル({file_size_gb:.2f}GB)のため、ffmpegで分割処理を実行します。") | |
return split_audio_with_ffmpeg(audio_path_str, output_dir_for_chunks, chunk_length_sec, overlap_sec) | |
# 4GB未満の場合は従来のpydub処理 | |
try: | |
audio = AudioSegment.from_file(audio_path_str) | |
except Exception as e: | |
if "4GB" in str(e) or "Unable to process" in str(e): | |
print(f" pydubで4GB制限エラー。ffmpegで処理します: {e}") | |
return split_audio_with_ffmpeg(audio_path_str, output_dir_for_chunks, chunk_length_sec, overlap_sec) | |
else: | |
print(f" エラー: 音声ファイル '{Path(audio_path_str).name}' のロード中にエラー(分割処理): {e}") | |
return [] | |
# 以下は既存のpydub処理... | |
duration_ms = len(audio); chunk_length_ms = chunk_length_sec * 1000; overlap_ms = overlap_sec * 1000 | |
chunk_paths_list: List[str] = []; start_ms = 0; chunk_idx = 0 | |
audio_file_stem = Path(audio_path_str).stem | |
while start_ms < duration_ms: | |
actual_chunk_start_ms = max(0, start_ms - (overlap_ms if start_ms > 0 else 0) ) | |
base_chunk_end_ms = start_ms + chunk_length_ms | |
actual_chunk_end_ms = min(base_chunk_end_ms + (overlap_ms if base_chunk_end_ms < duration_ms else 0), duration_ms) | |
if actual_chunk_start_ms >= actual_chunk_end_ms : | |
if start_ms >= duration_ms: break | |
print(f" 警告: チャンク計算で予期せぬ状態。スキップします。") | |
start_ms += chunk_length_ms; continue | |
chunk_segment = audio[actual_chunk_start_ms:actual_chunk_end_ms] | |
chunk_file_name = f"{audio_file_stem}_chunk_{chunk_idx:03d}_temp.wav" | |
chunk_file_path_obj = Path(output_dir_for_chunks, chunk_file_name) | |
try: | |
chunk_segment.export(chunk_file_path_obj, format="wav") | |
chunk_paths_list.append(chunk_file_path_obj.as_posix()) | |
except Exception as export_chunk_e: | |
print(f" エラー: 一時チャンクファイル {chunk_file_name} のエクスポートに失敗: {export_chunk_e}") | |
start_ms += chunk_length_ms; chunk_idx += 1 | |
print(f" 音声を {len(chunk_paths_list)} 個のチャンクに分割しました。") | |
return chunk_paths_list | |
def split_audio_with_ffmpeg( | |
audio_path_str: str, | |
output_dir_for_chunks: str, | |
chunk_length_sec: int, | |
overlap_sec: int | |
) -> List[str]: | |
"""ffmpegを使用して大容量ファイルを分割""" | |
try: | |
if not shutil.which('ffmpeg'): | |
print("エラー: ffmpegが見つかりません。4GB以上のファイルを処理するにはffmpegが必要です。") | |
return [] | |
# 音声長を取得 | |
duration_sec = get_audio_duration_with_ffprobe(audio_path_str) | |
if duration_sec is None: | |
print("エラー: ffmpegでの分割処理で音声長を取得できませんでした") | |
return [] | |
chunk_paths_list: List[str] = [] | |
audio_file_stem = Path(audio_path_str).stem | |
start_sec = 0 | |
chunk_idx = 0 | |
while start_sec < duration_sec: | |
# チャンク開始・終了時刻を計算 | |
actual_start_sec = max(0, start_sec - (overlap_sec if start_sec > 0 else 0)) | |
base_end_sec = start_sec + chunk_length_sec | |
actual_end_sec = min(base_end_sec + (overlap_sec if base_end_sec < duration_sec else 0), duration_sec) | |
if actual_start_sec >= actual_end_sec: | |
break | |
chunk_duration = actual_end_sec - actual_start_sec | |
chunk_file_name = f"{audio_file_stem}_chunk_{chunk_idx:03d}_temp.wav" | |
chunk_file_path = Path(output_dir_for_chunks) / chunk_file_name | |
# ffmpegコマンドで音声を抽出・変換 | |
cmd = [ | |
'ffmpeg', '-y', '-loglevel', 'error', | |
'-ss', str(actual_start_sec), | |
'-i', audio_path_str, | |
'-t', str(chunk_duration), | |
'-acodec', 'pcm_s16le', | |
'-ar', str(TARGET_SAMPLE_RATE), | |
'-ac', '1', # モノラル | |
str(chunk_file_path) | |
] | |
try: | |
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) | |
if result.returncode == 0: | |
chunk_paths_list.append(chunk_file_path.as_posix()) | |
print(f" チャンク {chunk_idx+1}: {actual_start_sec:.1f}s - {actual_end_sec:.1f}s -> {chunk_file_name}") | |
else: | |
print(f" エラー: チャンク {chunk_idx} の生成に失敗: {result.stderr}") | |
except subprocess.TimeoutExpired: | |
print(f" エラー: チャンク {chunk_idx} の生成がタイムアウトしました") | |
start_sec += chunk_length_sec | |
chunk_idx += 1 | |
print(f" ffmpegで音声を {len(chunk_paths_list)} 個のチャンクに分割しました。") | |
return chunk_paths_list | |
except Exception as e: | |
print(f" エラー: ffmpegでの音声分割中にエラー: {e}") | |
return [] | |
# --- 単一ファイル処理のメインロジック --- | |
def process_single_file( | |
input_file_path_obj: Path, | |
asr_model_instance: ASRModel, | |
device_to_use: str, | |
output_formats_list: List[str] | |
) -> bool: | |
input_file_stem = input_file_path_obj.stem | |
output_and_temp_dir_str = input_file_path_obj.parent.as_posix() | |
file_processing_start_time = time.time() | |
actual_audio_duration_sec: Optional[float] = None | |
success_status = False | |
temp_preprocessed_audio_path_str: Optional[str] = None | |
temp_chunk_file_paths_str_list: List[str] = [] | |
try: | |
print(f"--- ステップ1/3: {input_file_stem} の音声前処理 ---") | |
processed_path_for_asr, _, duration_sec_val = preprocess_audio_cli( | |
input_file_path_obj.as_posix(), output_and_temp_dir_str | |
) | |
if not processed_path_for_asr or duration_sec_val is None: | |
raise Exception("Preprocessing failed") | |
actual_audio_duration_sec = duration_sec_val | |
if processed_path_for_asr != input_file_path_obj.as_posix(): | |
temp_preprocessed_audio_path_str = processed_path_for_asr | |
print(f"--- ステップ2/3: {input_file_stem} の文字起こし (音声長: {actual_audio_duration_sec:.2f}秒) ---") | |
final_vis_data: Optional[List] = None | |
final_word_vis_data: Optional[List] = None | |
if actual_audio_duration_sec > VERY_LONG_AUDIO_THRESHOLD_SECONDS: | |
print(f" 情報: 音声長が{VERY_LONG_AUDIO_THRESHOLD_SECONDS/3600:.1f}時間を超えるため、分割処理します。") | |
chunk_file_paths_str = split_audio_with_overlap_cli( | |
processed_path_for_asr, output_and_temp_dir_str, | |
chunk_length_sec=CHUNK_LENGTH_SECONDS, overlap_sec=CHUNK_OVERLAP_SECONDS | |
) | |
if not chunk_file_paths_str: | |
raise Exception(f"{input_file_path_obj.name} のチャンク分割に失敗しました。") | |
temp_chunk_file_paths_str_list = chunk_file_paths_str[:] | |
all_vis_data_merged: List[List[str]] = [] | |
all_word_vis_data_merged: List[List[str]] = [] | |
current_global_time_offset_sec = 0.0 | |
last_global_segment_end_time_sec = 0.0 | |
# チャンク処理前にGPUメモリをクリア | |
if device_to_use == 'cuda': | |
torch.cuda.empty_cache() | |
gc.collect() | |
print(f" 初期GPUメモリ使用量: {torch.cuda.memory_allocated() / 1024**2:.1f}MB") | |
for i, chunk_file_path_str in enumerate(temp_chunk_file_paths_str_list): | |
print(f" チャンク {i+1}/{len(temp_chunk_file_paths_str_list)} ({Path(chunk_file_path_str).name}) を処理中...") | |
try: | |
# 各チャンク処理前にGPUメモリをクリア | |
if device_to_use == 'cuda': | |
torch.cuda.empty_cache() | |
gc.collect() | |
print(f" チャンク処理前のGPUメモリ使用量: {torch.cuda.memory_allocated() / 1024**2:.1f}MB") | |
estimated_chunk_duration_for_asr_settings = CHUNK_LENGTH_SECONDS + CHUNK_OVERLAP_SECONDS | |
vis_data_chunk, _, word_vis_data_chunk = transcribe_audio_cli( | |
chunk_file_path_str, asr_model_instance, | |
estimated_chunk_duration_for_asr_settings, device_to_use | |
) | |
# チャンク処理後のGPUメモリ使用量を確認 | |
if device_to_use == 'cuda': | |
print(f" チャンク処理後のGPUメモリ使用量: {torch.cuda.memory_allocated() / 1024**2:.1f}MB") | |
if not vis_data_chunk: | |
print(f" 警告: チャンク {Path(chunk_file_path_str).name} の文字起こしに失敗。スキップします。") | |
current_global_time_offset_sec += CHUNK_LENGTH_SECONDS - (CHUNK_OVERLAP_SECONDS if i < len(temp_chunk_file_paths_str_list) - 1 else 0) | |
continue | |
# データのマージ処理 | |
for seg_row_list in vis_data_chunk: | |
s_local_sec = float(seg_row_list[0]) | |
e_local_sec = float(seg_row_list[1]) | |
text_seg = seg_row_list[2] | |
s_global_sec = s_local_sec + current_global_time_offset_sec | |
e_global_sec = e_local_sec + current_global_time_offset_sec | |
if s_global_sec >= last_global_segment_end_time_sec - 0.1: | |
all_vis_data_merged.append([f"{s_global_sec:.2f}", f"{e_global_sec:.2f}", text_seg]) | |
last_global_segment_end_time_sec = max(last_global_segment_end_time_sec, e_global_sec) | |
temp_last_word_global_end_time_sec = float(all_word_vis_data_merged[-1][1]) if all_word_vis_data_merged else 0.0 | |
if word_vis_data_chunk: | |
for word_row_list in word_vis_data_chunk: | |
w_s_local_sec = float(word_row_list[0]) | |
w_e_local_sec = float(word_row_list[1]) | |
text_word = word_row_list[2] | |
w_s_global_sec = w_s_local_sec + current_global_time_offset_sec | |
w_e_global_sec = w_e_local_sec + current_global_time_offset_sec | |
if w_s_global_sec >= temp_last_word_global_end_time_sec - 0.05: | |
all_word_vis_data_merged.append([f"{w_s_global_sec:.2f}", f"{w_e_global_sec:.2f}", text_word]) | |
temp_last_word_global_end_time_sec = max(temp_last_word_global_end_time_sec, w_e_global_sec) | |
if i < len(temp_chunk_file_paths_str_list) - 1: | |
current_global_time_offset_sec += (CHUNK_LENGTH_SECONDS - CHUNK_OVERLAP_SECONDS) | |
# チャンク処理後にGPUメモリをクリア | |
if device_to_use == 'cuda': | |
torch.cuda.empty_cache() | |
gc.collect() | |
print(f" メモリクリア後のGPUメモリ使用量: {torch.cuda.memory_allocated() / 1024**2:.1f}MB") | |
except Exception as chunk_proc_e: | |
print(f" エラー: チャンク {Path(chunk_file_path_str).name} の処理中にエラー: {chunk_proc_e}") | |
if i < len(temp_chunk_file_paths_str_list) - 1: | |
current_global_time_offset_sec += (CHUNK_LENGTH_SECONDS - CHUNK_OVERLAP_SECONDS) | |
final_vis_data = all_vis_data_merged | |
final_word_vis_data = all_word_vis_data_merged | |
if not final_vis_data: | |
raise Exception("チャンク処理後、有効な文字起こしデータが得られませんでした。") | |
else: | |
vis_data_single, _, word_vis_data_single = transcribe_audio_cli( | |
processed_path_for_asr, asr_model_instance, actual_audio_duration_sec, device_to_use | |
) | |
if not vis_data_single: | |
raise Exception(f"{input_file_path_obj.name} の文字起こしに失敗しました。") | |
final_vis_data = vis_data_single | |
final_word_vis_data = word_vis_data_single | |
if final_vis_data: | |
print(f"--- ステップ3/3: {input_file_stem} の文字起こし結果保存 ---") | |
save_transcripts_cli(output_and_temp_dir_str, input_file_stem, | |
final_vis_data, final_word_vis_data if final_word_vis_data else [], | |
formats=output_formats_list) | |
success_status = True | |
else: | |
print(f"情報: {input_file_path_obj.name} の文字起こし結果が空のため、ファイルは保存しませんでした。") | |
success_status = True | |
except Exception as e: | |
print(f"エラー: ファイル {input_file_path_obj.name} の処理中にエラーが発生しました: {e}") | |
success_status = False | |
finally: | |
file_processing_end_time = time.time() | |
time_taken_seconds = file_processing_end_time - file_processing_start_time | |
proc_m = int(time_taken_seconds // 60) | |
proc_s = time_taken_seconds % 60 | |
summary_message = f" --- {input_file_stem}: 処理サマリー ---\n" | |
if actual_audio_duration_sec is not None: | |
audio_m = int(actual_audio_duration_sec // 60) | |
audio_s = actual_audio_duration_sec % 60 | |
summary_message += f" 音声長: {audio_m}分{audio_s:.2f}秒 ({actual_audio_duration_sec:.2f}秒)\n" | |
else: | |
summary_message += " 音声長: 不明 (前処理で失敗した可能性があります)\n" | |
summary_message += f" このファイルの総処理時間: {proc_m}分{proc_s:.2f}秒 ({time_taken_seconds:.2f}秒)\n" | |
summary_message += f" 処理ステータス: {'成功' if success_status else '失敗'}" | |
print(summary_message) | |
if temp_preprocessed_audio_path_str and Path(temp_preprocessed_audio_path_str).exists(): | |
try: os.remove(temp_preprocessed_audio_path_str); print(f" 一時ファイル {Path(temp_preprocessed_audio_path_str).name} を削除しました。") | |
except OSError as e_os: print(f" 警告: 一時ファイル {Path(temp_preprocessed_audio_path_str).name} の削除に失敗: {e_os}") | |
for chunk_f_str in temp_chunk_file_paths_str_list: | |
if Path(chunk_f_str).exists(): | |
try: os.remove(chunk_f_str); print(f" 一時チャンクファイル {Path(chunk_f_str).name} を削除しました。") | |
except OSError as e_os_chunk: print(f" 警告: 一時チャンクファイル {Path(chunk_f_str).name} の削除に失敗: {e_os_chunk}") | |
# process_single_file の最後では "ファイル処理終了" のログは batch_process_directory に任せる | |
return success_status | |
# --- ディレクトリ内ファイルの一括処理関数 --- | |
def batch_process_directory( | |
target_dir_str: str, | |
asr_model_instance: ASRModel, | |
device_to_use: str, | |
output_formats: Optional[List[str]] = None | |
): | |
batch_start_time = time.time() | |
if output_formats is None: | |
output_formats_to_use = DEFAULT_OUTPUT_FORMATS | |
else: | |
output_formats_to_use = output_formats | |
target_dir_path = Path(target_dir_str) | |
if not target_dir_path.is_dir(): | |
print(f"エラー: 指定されたパス '{target_dir_str}' は有効なディレクトリではありません。"); return | |
print(f"処理対象ディレクトリ: {target_dir_path.resolve()}") | |
print(f"入力ファイルの探索優先順位: {', '.join(INPUT_PRIORITY_EXTENSIONS)}") | |
print(f"出力ファイル形式: {', '.join(output_formats_to_use)}") | |
all_files_in_dir = list(target_dir_path.iterdir()) | |
potential_stems: Set[str] = set() | |
for f_path_obj in all_files_in_dir: | |
if f_path_obj.is_file() and f_path_obj.suffix.lower() in INPUT_PRIORITY_EXTENSIONS: | |
potential_stems.add(f_path_obj.stem) | |
if not potential_stems: | |
print(f"情報: ディレクトリ '{target_dir_path.name}' に対象拡張子のファイルは見つかりませんでした。"); return | |
print(f"{len(potential_stems)} 個のユニークなファイル名候補が見つかりました。優先順位に従って処理対象を選択します...") | |
files_to_actually_process: List[Path] = [] | |
for stem_name in sorted(list(potential_stems)): | |
selected_file_for_this_stem: Optional[Path] = None | |
for ext_priority in INPUT_PRIORITY_EXTENSIONS: | |
potential_file = target_dir_path / f"{stem_name}{ext_priority}" | |
if potential_file.exists() and potential_file.is_file(): | |
selected_file_for_this_stem = potential_file | |
print(f" ファイル名 '{stem_name}': '{potential_file.name}' を処理対象として選択。") | |
break | |
if selected_file_for_this_stem: files_to_actually_process.append(selected_file_for_this_stem) | |
if not files_to_actually_process: | |
print("情報: 優先順位適用後、実際に処理するファイルはありませんでした。"); return | |
print(f"実際に処理するファイル数: {len(files_to_actually_process)} 個") | |
processed_successfully_count = 0 | |
skipped_due_to_existing_csv_count = 0 | |
failed_count = 0 | |
for input_file_to_process_obj in files_to_actually_process: | |
print(f"\n======== ファイル処理開始: {input_file_to_process_obj.name} ========") # 各ファイルの開始ログ | |
is_skipped_at_batch_level = False | |
if "csv" in output_formats_to_use: | |
output_csv_path_check = input_file_to_process_obj.with_suffix('.csv') | |
if output_csv_path_check.exists(): | |
print(f"スキップ (バッチレベル): CSV '{output_csv_path_check.name}' は既に存在します。") | |
skipped_due_to_existing_csv_count += 1 | |
is_skipped_at_batch_level = True | |
print(f"======== ファイル処理終了 (スキップ): {input_file_to_process_obj.name} ========\n") # スキップ時の終了ログ | |
if not is_skipped_at_batch_level: | |
success_flag = process_single_file( | |
input_file_to_process_obj, | |
asr_model_instance, | |
device_to_use, | |
output_formats_to_use | |
) | |
if success_flag: | |
processed_successfully_count += 1 | |
else: | |
failed_count += 1 | |
# process_single_file内で "ファイル処理終了" ログが出力される | |
print("\n======== 全ファイルのバッチ処理が完了しました ========") | |
total_considered = len(files_to_actually_process) | |
print(f"総対象ファイル数(優先度選択後): {total_considered}") | |
print(f" 処理成功ファイル数: {processed_successfully_count}") | |
print(f" CSV既存によりスキップされたファイル数: {skipped_due_to_existing_csv_count}") | |
print(f" 処理失敗ファイル数: {failed_count}") | |
batch_end_time = time.time() | |
total_batch_time_seconds = batch_end_time - batch_start_time | |
batch_m = int(total_batch_time_seconds // 60) | |
batch_s = total_batch_time_seconds % 60 | |
print(f"バッチ処理全体の総所要時間: {batch_m}分{batch_s:.2f}秒 ({total_batch_time_seconds:.2f}秒)") | |
# --- スクリプト実行のエントリポイント --- | |
if __name__ == "__main__": | |
# ★ 引数処理とGUI分岐のための準備 | |
target_directory_arg: Optional[str] = None | |
formats_arg_str: str = ",".join(DEFAULT_OUTPUT_FORMATS) # GUI時のデフォルト | |
device_arg_str: Optional[str] = None # GUI時のデフォルト (自動判別) | |
if len(sys.argv) == 1: # コマンドライン引数なしの場合 | |
print("コマンドライン引数なしで起動されました。GUIでディレクトリを選択します。") | |
try: | |
import tkinter as tk | |
from tkinter import filedialog | |
def get_directory_from_gui_local() -> Optional[str]: | |
"""GUIでディレクトリ選択ダイアログを表示し、選択されたパスを返す""" | |
root = tk.Tk() | |
root.withdraw() # メインウィンドウは表示しない | |
# ダイアログを最前面に表示する試み (環境による) | |
root.attributes('-topmost', True) | |
# WSL環境での初期ディレクトリを設定 | |
initial_dir = "/mnt/t/demucs_folder/htdemucs" # Windowsのユーザーディレクトリを初期値として設定 | |
selected_path = filedialog.askdirectory( | |
title="処理対象のディレクトリを選択してください", | |
initialdir=initial_dir | |
) | |
root.attributes('-topmost', False) | |
root.destroy() # Tkinterウィンドウを破棄 | |
return selected_path if selected_path else None | |
target_directory_arg = get_directory_from_gui_local() | |
if not target_directory_arg: | |
print("ディレクトリが選択されませんでした。処理を中止します。") | |
sys.exit(0) # 正常終了 | |
# formats_arg_str と device_arg_str は初期化されたデフォルト値を使用 | |
print(f"GUIで選択されたディレクトリ: {target_directory_arg}") | |
print(f"出力フォーマット (デフォルト): {formats_arg_str}") | |
# device_arg_strがNoneの場合、後続の処理で自動判別される | |
except ImportError: | |
print("エラー: GUIモードに必要なTkinterライブラリが見つかりません。") | |
print("Tkinterをインストールするか、コマンドライン引数を使用してスクリプトを実行してください。例:") | |
print(f" python {Path(sys.argv[0]).name} /path/to/your/audio_directory") | |
sys.exit(1) # エラー終了 | |
except Exception as e_gui: | |
print(f"GUIの表示中に予期せぬエラーが発生しました: {e_gui}") | |
sys.exit(1) # エラー終了 | |
else: # コマンドライン引数がある場合 | |
parser = argparse.ArgumentParser( | |
description="指定されたディレクトリ内の音声/動画ファイルをNVIDIA Parakeet ASRモデルで文字起こしします。\n" | |
f"同じ名前のファイルが複数ある場合、{' > '.join(INPUT_PRIORITY_EXTENSIONS)} の優先順位で処理します。", | |
formatter_class=argparse.RawTextHelpFormatter | |
) | |
parser.add_argument( # 最初の引数は必須のディレクトリ | |
"target_directory", type=str, | |
help="処理対象のファイルが含まれるディレクトリのパス。" | |
) | |
parser.add_argument( | |
"--formats", type=str, default=",".join(DEFAULT_OUTPUT_FORMATS), | |
help=(f"出力する文字起こしファイルの形式をカンマ区切りで指定。\n" | |
f"例: csv,srt (デフォルト: {','.join(DEFAULT_OUTPUT_FORMATS)})\n" | |
f"利用可能な形式: {','.join(DEFAULT_OUTPUT_FORMATS)}") | |
) | |
parser.add_argument( | |
"--device", type=str, default=None, choices=['cuda', 'cpu'], | |
help="使用するデバイスを指定 (cuda または cpu)。指定がなければ自動判別。" | |
) | |
args = parser.parse_args() # sys.argv[1:] から解析 | |
target_directory_arg = args.target_directory | |
formats_arg_str = args.formats | |
device_arg_str = args.device | |
# --- 共通のセットアップ処理 --- | |
if device_arg_str: selected_device = device_arg_str | |
else: selected_device = "cuda" if torch.cuda.is_available() else "cpu" | |
print(f"使用デバイス: {selected_device.upper()}") | |
if selected_device == "cuda": | |
if not torch.cuda.is_available(): | |
print("警告: CUDA指定だが利用不可。CPUを使用します。"); selected_device = "cpu" | |
else: | |
try: print(f"CUDAデバイス名: {torch.cuda.get_device_name(0)}") | |
except Exception as e_cuda_name: print(f"CUDAデバイス名の取得失敗: {e_cuda_name}") | |
print(f"ASRモデル '{MODEL_NAME}' をロードしています...") | |
asr_model_main: Optional[ASRModel] = None | |
try: | |
asr_model_main = ASRModel.from_pretrained(model_name=MODEL_NAME) | |
asr_model_main.eval() | |
print(f"モデル '{MODEL_NAME}' のロード完了。") | |
except Exception as model_load_e: | |
print(f"致命的エラー: ASRモデル '{MODEL_NAME}' のロードに失敗: {model_load_e}"); sys.exit(1) | |
output_formats_requested = [fmt.strip().lower() for fmt in formats_arg_str.split(',') if fmt.strip()] | |
final_output_formats_to_use = [fmt for fmt in output_formats_requested if fmt in DEFAULT_OUTPUT_FORMATS] | |
if not output_formats_requested and formats_arg_str: | |
print(f"警告: 指定された出力フォーマット '{formats_arg_str}' は無効です。") | |
if not final_output_formats_to_use : | |
print(f"情報: 有効な出力フォーマットが指定されなかったため、デフォルトの全形式 ({','.join(DEFAULT_OUTPUT_FORMATS)}) で出力します。") | |
final_output_formats_to_use = DEFAULT_OUTPUT_FORMATS | |
# target_directory_arg が None でないことを確認 (GUIキャンセル時など) | |
if not target_directory_arg: | |
print("エラー: 処理対象のディレクトリが指定されていません。処理を中止します。") | |
sys.exit(1) | |
if not asr_model_main: # 通常、モデルロード失敗で既にexitしているはずだが念のため | |
print("致命的エラー: ASRモデルがロードされていません。処理を中止します。") | |
sys.exit(1) | |
batch_process_directory( | |
target_directory_arg, asr_model_main, selected_device, | |
output_formats=final_output_formats_to_use | |
) |