parakeet-tdt-0.6b-v2 / transcribe_cli.py
sungo-ganpare's picture
GPU制限エラーを検知する例外クラスを追加し、処理中にGPU制限に達した場合のエラーハンドリングを強化。音声ファイル処理時にエラーを適切に報告するように修正。
a7307d4
# coding: utf-8
import torch
import gc
from pathlib import Path
from pydub import AudioSegment
# numpy は直接は使用されていませんが、pydubやNeMoの依存関係で間接的に必要になる可能性があります。
# import numpy as np
import os
import csv
import json
from typing import List, Tuple, Optional, Set # Python 3.9+ では Optional, Set は typing から不要な場合あり
import argparse
import time # ★処理時間計測のために追加
import sys # ★コマンドライン引数チェックのために追加
from nemo.collections.asr.models import ASRModel # NeMo ASRモデル
import subprocess
import shutil
# --- グローバル設定 ---
MODEL_NAME = "nvidia/parakeet-tdt-0.6b-v2"
TARGET_SAMPLE_RATE = 16000
# 音声の長さに関する閾値 (秒)
LONG_AUDIO_THRESHOLD_SECONDS = 480 # 8分
VERY_LONG_AUDIO_THRESHOLD_SECONDS = 10800 # 3時間
# チャンク分割時の設定
CHUNK_LENGTH_SECONDS = 1800 # 30分
CHUNK_OVERLAP_SECONDS = 60 # 1分
# セグメント処理の設定
MAX_SEGMENT_LENGTH_SECONDS = 15 # 最大セグメント長(秒)を15秒に短縮
MAX_SEGMENT_CHARS = 100 # 最大セグメント文字数を100文字に短縮
MIN_SEGMENT_GAP_SECONDS = 0.3 # 最小セグメント間隔(秒)
# VTTファイルの最大サイズ(バイト)
MAX_VTT_SIZE_BYTES = 10 * 1024 * 1024 # 10MB
# 文の区切り文字
SENTENCE_ENDINGS = ['.', '!', '?', '。', '!', '?']
SENTENCE_PAUSES = [',', '、', ';', ';', ':', ':']
# ★ 入力ファイルの優先順位付き拡張子リスト
INPUT_PRIORITY_EXTENSIONS: List[str] = ['.wav', '.mp3', '.mp4']
# ★ デフォルトで出力するフォーマットリスト
DEFAULT_OUTPUT_FORMATS: List[str] = ["csv", "srt", "vtt", "json", "lrc"]
# --- 音声前処理関数 ---
def preprocess_audio_cli(audio_path_str: str, output_dir_for_temp_files: str) -> Tuple[Optional[str], Optional[str], Optional[float]]:
"""
オーディオファイルの前処理(リサンプリング、モノラル変換)を行います。
成功した場合、(処理済みファイルパス, 表示用名, 音声長) を返します。
失敗した場合、(None, None, None) を返します。
"""
try:
audio_file_path = Path(audio_path_str)
original_path_name = audio_file_path.name
audio_name_stem = audio_file_path.stem
print(f" 音声ファイルをロード中: {original_path_name}")
# まずffprobeで音声長を取得(4GB制限なし)
duration_sec = get_audio_duration_with_ffprobe(audio_path_str)
if duration_sec is None:
print("エラー: ffprobeで音声長の取得に失敗しました")
return None, None, None
print(f" 音声長: {duration_sec:.2f} 秒")
# ファイルサイズをチェック
file_size = Path(audio_path_str).stat().st_size
file_size_gb = file_size / (1024**3)
print(f" ファイルサイズ: {file_size_gb:.2f} GB")
# 4GB以上またはVERY_LONG_AUDIO_THRESHOLD_SECONDS以上の場合は直接ffmpegでチャンク分割
if file_size > 4 * 1024**3 or duration_sec > VERY_LONG_AUDIO_THRESHOLD_SECONDS:
print(f" 大容量ファイル({file_size_gb:.2f}GB, {duration_sec/3600:.2f}時間)のため、ffmpegで直接チャンク分割処理を行います。")
# 大容量ファイルの場合もモノラル変換を行う
temp_mono_path = Path(output_dir_for_temp_files) / f"{audio_name_stem}_mono_temp.wav"
try:
cmd = [
'ffmpeg', '-y', '-i', audio_path_str,
'-ac', '1', # モノラルに変換
'-ar', str(TARGET_SAMPLE_RATE), # サンプルレートを設定
str(temp_mono_path)
]
subprocess.run(cmd, capture_output=True, check=True)
return temp_mono_path.as_posix(), f"{original_path_name} (大容量・モノラル)", duration_sec
except subprocess.CalledProcessError as e:
print(f" ffmpegでのモノラル変換に失敗: {e}")
return audio_path_str, f"{original_path_name} (大容量)", duration_sec
# 4GB未満の場合は従来のpydub処理
try:
audio = AudioSegment.from_file(audio_path_str)
except Exception as pydub_e:
if "4GB" in str(pydub_e) or "Unable to process" in str(pydub_e):
print(f" pydubで4GB制限エラー。ffmpegで処理します: {pydub_e}")
return audio_path_str, f"{original_path_name} (大容量)", duration_sec
else:
raise pydub_e
resampled = False
mono_converted = False
# リサンプリング処理
if audio.frame_rate != TARGET_SAMPLE_RATE:
try:
print(f" リサンプリング中: {audio.frame_rate}Hz -> {TARGET_SAMPLE_RATE}Hz")
audio = audio.set_frame_rate(TARGET_SAMPLE_RATE)
resampled = True
except Exception as resample_e:
print(f"エラー: 音声のリサンプリングに失敗しました: {resample_e}")
return None, None, None
# モノラル変換処理
if audio.channels > 1:
try:
print(f" モノラルに変換中 ({audio.channels}ch -> 1ch)")
audio = audio.set_channels(1)
mono_converted = True
except Exception as mono_e:
print(f"エラー: 音声のモノラル変換に失敗しました: {mono_e}")
return None, None, None
elif audio.channels == 1:
print(" 音声は既にモノラルです。")
processed_temp_file_path_obj = None
# 前処理が行われた場合、一時ファイルに保存
if resampled or mono_converted:
try:
# ファイル名から特殊文字を除去してより安全な名前を生成
import re
safe_stem = re.sub(r'[^\w\-_\.]', '_', audio_name_stem)
temp_suffix = "_preprocessed_temp.wav"
processed_temp_file_path_obj = Path(output_dir_for_temp_files, f"{safe_stem}{temp_suffix}")
print(f" 前処理済み音声の一時保存先: {processed_temp_file_path_obj.name}")
audio.export(processed_temp_file_path_obj, format="wav")
path_for_transcription = processed_temp_file_path_obj.as_posix()
display_name_for_info = f"{original_path_name} (前処理済み)"
except Exception as export_e:
print(f"エラー: 前処理済み音声のエクスポートに失敗しました: {export_e}")
if processed_temp_file_path_obj and processed_temp_file_path_obj.exists():
try:
os.remove(processed_temp_file_path_obj)
except OSError:
pass
return None, None, None
else:
# 前処理が不要だった場合
print(" 前処理は不要でした。元のファイルを使用します。")
path_for_transcription = audio_path_str
display_name_for_info = original_path_name
return path_for_transcription, display_name_for_info, duration_sec
except FileNotFoundError:
print(f"エラー: 音声ファイルが見つかりません: {audio_path_str}")
return None, None, None
except Exception as load_e:
print(f"エラー: 音声ファイル '{original_path_name}' のロード/デコードに失敗しました: {load_e}")
return None, None, None
def get_audio_duration_with_ffprobe(audio_path_str: str) -> Optional[float]:
"""ffprobeを使用して音声ファイルの長さを取得(4GB制限なし)"""
try:
# ffprobeが利用可能かチェック
if not shutil.which('ffprobe'):
print("警告: ffprobeが見つかりません。pydubでの処理を試行します。")
return None
cmd = [
'ffprobe', '-v', 'quiet', '-show_entries', 'format=duration',
'-of', 'csv=p=0', audio_path_str
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0 and result.stdout.strip():
duration = float(result.stdout.strip())
return duration
else:
print(f"ffprobeエラー: {result.stderr}")
return None
except subprocess.TimeoutExpired:
print("ffprobeがタイムアウトしました")
return None
except Exception as e:
print(f"ffprobeでの音声長取得エラー: {e}")
return None
# --- 文字起こしコア関数 ---
def find_natural_break_point(text: str, max_length: int) -> int:
"""テキスト内で自然な区切り点を探す"""
if len(text) <= max_length:
return len(text)
# 文末で区切る
for i in range(max_length, 0, -1):
if i < len(text) and text[i] in SENTENCE_ENDINGS:
return i + 1
# 文の区切りで区切る
for i in range(max_length, 0, -1):
if i < len(text) and text[i] in SENTENCE_PAUSES:
return i + 1
# スペースで区切る
for i in range(max_length, 0, -1):
if i < len(text) and text[i].isspace():
return i + 1
# それでも見つからない場合は最大長で区切る
return max_length
def split_segment(segment: dict, max_length_seconds: float, max_chars: int) -> List[dict]:
"""セグメントを自然な区切りで分割する"""
if (segment['end'] - segment['start']) <= max_length_seconds and len(segment['segment']) <= max_chars:
return [segment]
result = []
current_text = segment['segment']
current_start = segment['start']
total_duration = segment['end'] - segment['start']
while current_text:
# 文字数に基づく分割点を探す
break_point = find_natural_break_point(current_text, max_chars)
# 時間に基づく分割点を計算
text_ratio = break_point / len(segment['segment'])
segment_duration = total_duration * text_ratio
# 分割点が最大長を超えないように調整
if segment_duration > max_length_seconds:
time_ratio = max_length_seconds / total_duration
break_point = int(len(segment['segment']) * time_ratio)
break_point = find_natural_break_point(current_text, break_point)
segment_duration = max_length_seconds
# 新しいセグメントを作成
new_segment = {
'start': current_start,
'end': current_start + segment_duration,
'segment': current_text[:break_point].strip()
}
result.append(new_segment)
# 残りのテキストと開始時間を更新
current_text = current_text[break_point:].strip()
current_start = new_segment['end']
return result
def transcribe_audio_cli(
transcribe_path_str: str,
model: ASRModel,
duration_sec: float,
device: str
) -> Tuple[Optional[List], Optional[List], Optional[List]]:
long_audio_settings_applied = False
original_model_dtype = model.dtype
try:
if device == 'cuda':
torch.cuda.empty_cache()
gc.collect()
model.to(device)
# 音声長に応じてモデル設定を変更
if duration_sec > LONG_AUDIO_THRESHOLD_SECONDS:
try:
print(f" 情報: 音声長 ({duration_sec:.0f}s) が閾値 ({LONG_AUDIO_THRESHOLD_SECONDS}s) を超えるため、長尺音声向け設定を適用します。")
model.change_attention_model(
self_attention_model="rel_pos_local_attn",
att_context_size=[128, 128]
)
model.change_subsampling_conv_chunking_factor(1)
long_audio_settings_applied = True
if device == 'cuda':
torch.cuda.empty_cache()
gc.collect()
except Exception as setting_e:
print(f" 警告: 長尺音声向け設定の適用に失敗しました: {setting_e}。デフォルト設定で続行します。")
if device == 'cuda' and torch.cuda.is_bf16_supported():
print(" 情報: モデルを bfloat16 に変換して推論を実行します。")
model.to(torch.bfloat16)
elif model.dtype != original_model_dtype:
model.to(original_model_dtype)
print(f" 文字起こしを実行中 (デバイス: {device}, モデルdtype: {model.dtype})...")
output = model.transcribe(
[transcribe_path_str],
timestamps=True,
batch_size=2
)
if not output or not isinstance(output, list) or not output[0] or \
not hasattr(output[0], 'timestamp') or not output[0].timestamp or \
'segment' not in output[0].timestamp:
print(" エラー: 文字起こしに失敗したか、予期しない出力形式です。")
return None, None, None
segment_timestamps = output[0].timestamp['segment']
# セグメントの前処理:より適切なセグメント分割
processed_segments = []
current_segment = None
for ts in segment_timestamps:
if current_segment is None:
current_segment = ts
else:
# セグメント結合の条件を厳格化
time_gap = ts['start'] - current_segment['end']
current_text = current_segment['segment']
next_text = ts['segment']
# 結合条件のチェック
should_merge = (
time_gap < MIN_SEGMENT_GAP_SECONDS and # 時間間隔が短い
len(current_text) + len(next_text) < MAX_SEGMENT_CHARS and # 文字数制限
(current_segment['end'] - current_segment['start']) < MAX_SEGMENT_LENGTH_SECONDS and # 現在のセグメントが短い
(ts['end'] - ts['start']) < MAX_SEGMENT_LENGTH_SECONDS and # 次のセグメントが短い
not any(current_text.strip().endswith(p) for p in SENTENCE_ENDINGS) # 文の区切りでない
)
if should_merge:
current_segment['end'] = ts['end']
current_segment['segment'] += ' ' + ts['segment']
else:
# 現在のセグメントを分割
split_segments = split_segment(current_segment, MAX_SEGMENT_LENGTH_SECONDS, MAX_SEGMENT_CHARS)
processed_segments.extend(split_segments)
current_segment = ts
if current_segment is not None:
# 最後のセグメントも分割
split_segments = split_segment(current_segment, MAX_SEGMENT_LENGTH_SECONDS, MAX_SEGMENT_CHARS)
processed_segments.extend(split_segments)
# 処理済みセグメントからデータを生成
vis_data = [[f"{ts['start']:.2f}", f"{ts['end']:.2f}", ts['segment']] for ts in processed_segments]
raw_times_data = [[ts['start'], ts['end']] for ts in processed_segments]
# 単語タイムスタンプの処理を改善
word_timestamps_raw = output[0].timestamp.get("word", [])
word_vis_data = []
for w in word_timestamps_raw:
if not isinstance(w, dict) or not all(k in w for k in ['start', 'end', 'word']):
continue
# 単語のタイムスタンプを最も近いセグメントに割り当て
word_start = float(w['start'])
word_end = float(w['end'])
# 単語が完全に含まれるセグメントを探す
for seg in processed_segments:
if word_start >= seg['start'] - 0.05 and word_end <= seg['end'] + 0.05:
word_vis_data.append([f"{word_start:.2f}", f"{word_end:.2f}", w["word"]])
break
print(" 文字起こし完了。")
return vis_data, raw_times_data, word_vis_data
except torch.cuda.OutOfMemoryError as oom_e:
print(f" 致命的エラー: CUDAメモリ不足です。 {oom_e}")
print(" バッチサイズを小さくする、他のGPU利用アプリを終了するなどの対策を試みてください。")
return None, None, None
except Exception as e:
print(f" エラー: 文字起こし処理中に予期せぬエラーが発生しました: {e}")
import traceback
traceback.print_exc()
return None, None, None
finally:
if long_audio_settings_applied:
try:
print(" 長尺音声向け設定を元に戻しています。")
model.change_attention_model(self_attention_model="rel_pos")
model.change_subsampling_conv_chunking_factor(-1)
except Exception as revert_e:
print(f" 警告: 長尺音声設定の復元に失敗: {revert_e}")
model.to(original_model_dtype)
if model.device.type != 'cpu':
model.cpu()
if device == 'cuda':
torch.cuda.empty_cache()
gc.collect()
# --- 結果保存関数 ---
def save_transcripts_cli(output_dir_str: str, audio_file_stem: str,
vis_data: List, word_vis_data: List, formats: Optional[List[str]] = None):
if formats is None:
formats_to_save = DEFAULT_OUTPUT_FORMATS
else:
formats_to_save = formats
output_dir_path = Path(output_dir_str)
output_dir_path.mkdir(parents=True, exist_ok=True)
saved_files_count = 0
print(f" 結果を保存中 (対象形式: {', '.join(formats_to_save)})...")
try:
if "csv" in formats_to_save:
csv_file_path = output_dir_path / f"{audio_file_stem}.csv"
csv_headers = ["Start (s)", "End (s)", "Segment"]
with open(csv_file_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f); writer.writerow(csv_headers); writer.writerows(vis_data)
print(f" CSVファイルを保存: {csv_file_path.name}"); saved_files_count +=1
if "srt" in formats_to_save:
srt_file_path = output_dir_path / f"{audio_file_stem}.srt"
write_srt(vis_data, srt_file_path)
print(f" SRTファイルを保存: {srt_file_path.name}"); saved_files_count +=1
if "vtt" in formats_to_save:
vtt_file_path = output_dir_path / f"{audio_file_stem}.vtt"
try:
write_vtt(vis_data, word_vis_data, vtt_file_path)
print(f" VTTファイルを保存: {vtt_file_path.name}"); saved_files_count +=1
except ValueError as e:
if "VTTファイルサイズが制限を超えました" in str(e):
print(f" エラー: {e}")
# 既に作成されたVTTファイルを削除
if vtt_file_path.exists():
vtt_file_path.unlink()
raise # エラーを上位に伝播
if "json" in formats_to_save:
json_file_path = output_dir_path / f"{audio_file_stem}.json"
write_json(vis_data, word_vis_data, json_file_path)
print(f" JSONファイルを保存: {json_file_path.name}"); saved_files_count +=1
if "lrc" in formats_to_save:
lrc_file_path = output_dir_path / f"{audio_file_stem}.lrc"
write_lrc(vis_data, lrc_file_path)
print(f" LRCファイルを保存: {lrc_file_path.name}"); saved_files_count +=1
if saved_files_count == 0 and formats_to_save:
print(f" 警告: 指定されたフォーマット {formats_to_save} でのファイルの保存は行われませんでした。")
except Exception as e:
print(f" エラー: 文字起こしファイルの保存中にエラーが発生しました: {e}")
raise # エラーを上位に伝播
# --- 書き出しヘルパー関数群 (SRT, VTT, JSON, LRC) ---
def write_srt(segments: List, path: Path):
def sec2srt(t_float: float) -> str:
h, rem = divmod(int(t_float), 3600); m, s = divmod(rem, 60)
ms = int((t_float - int(t_float)) * 1000)
return f"{h:02}:{m:02}:{s:02},{ms:03}"
with open(path, "w", encoding="utf-8") as f:
for i, seg_list in enumerate(segments, 1):
f.write(f"{i}\n{sec2srt(float(seg_list[0]))} --> {sec2srt(float(seg_list[1]))}\n{seg_list[2]}\n\n")
def write_vtt(segments: List, words: List, path: Path):
def sec2vtt(t_float: float) -> str:
h, rem = divmod(int(t_float), 3600)
m, s = divmod(rem, 60)
ms = int((t_float - int(t_float)) * 1000)
return f"{h:02}:{m:02}:{s:02}.{ms:03}"
with open(path, "w", encoding="utf-8") as f:
f.write("WEBVTT\n\n")
f.write("STYLE\n")
f.write("::cue(.current) { color: #ffff00; font-weight: bold; }\n")
f.write("::cue(.past) { color: #888888; }\n")
f.write("::cue(.future) { color: #ffffff; }\n")
f.write("::cue(.line) { background: rgba(0,0,0,0.7); padding: 4px; }\n\n")
if not words:
# 単語タイムスタンプがない場合は、セグメント単位で出力
for i, seg_list in enumerate(segments, 1):
f.write(f"NOTE Segment {i}\n")
f.write(f"{sec2vtt(float(seg_list[0]))} --> {sec2vtt(float(seg_list[1]))}\n{seg_list[2]}\n\n")
# ファイルサイズをチェック
current_size = f.tell()
if current_size > MAX_VTT_SIZE_BYTES:
print(f"警告: VTTファイルが{MAX_VTT_SIZE_BYTES/1024/1024:.1f}MBを超えました。処理を中止します。")
raise ValueError("VTTファイルサイズが制限を超えました")
return
# セグメント単位で処理
for seg_data in segments:
seg_start = float(seg_data[0])
seg_end = float(seg_data[1])
# このセグメントに含まれる単語を特定
segment_words = []
for word_idx, word_data in enumerate(words):
word_start = float(word_data[0])
word_end = float(word_data[1])
if word_start >= seg_start - 0.1 and word_end <= seg_end + 0.1:
segment_words.append((word_idx, word_data))
if not segment_words:
continue
# セグメント内の全単語のテキストを一度だけ生成
all_words = [w_data[2] for _, w_data in segment_words]
# セグメント開始から最初の単語まで
first_word_start = float(segment_words[0][1][0])
if seg_start < first_word_start - 0.05:
f.write(f"{sec2vtt(seg_start)} --> {sec2vtt(first_word_start)}\n")
f.write(f'<c.line>{" ".join(f"<c.future>{w}</c>" for w in all_words)}</c>\n\n')
# ファイルサイズをチェック
current_size = f.tell()
if current_size > MAX_VTT_SIZE_BYTES:
print(f"警告: VTTファイルが{MAX_VTT_SIZE_BYTES/1024/1024:.1f}MBを超えました。処理を中止します。")
raise ValueError("VTTファイルサイズが制限を超えました")
# 各単語の処理
for local_idx, (_, word_data) in enumerate(segment_words):
w_start = float(word_data[0])
w_end = float(word_data[1])
# 単語の表示時間を出力
f.write(f"{sec2vtt(w_start)} --> {sec2vtt(w_end)}\n")
# 現在の単語をハイライトしたテキストを生成
line_parts = []
for i, w in enumerate(all_words):
if i == local_idx:
line_parts.append(f'<c.current>{w}</c>')
elif i < local_idx:
line_parts.append(f'<c.past>{w}</c>')
else:
line_parts.append(f'<c.future>{w}</c>')
f.write(f'<c.line>{" ".join(line_parts)}</c>\n\n')
# ファイルサイズをチェック
current_size = f.tell()
if current_size > MAX_VTT_SIZE_BYTES:
print(f"警告: VTTファイルが{MAX_VTT_SIZE_BYTES/1024/1024:.1f}MBを超えました。処理を中止します。")
raise ValueError("VTTファイルサイズが制限を超えました")
# 単語間の無音期間の処理
if local_idx < len(segment_words) - 1:
next_word_start = float(segment_words[local_idx + 1][1][0])
gap_duration = next_word_start - w_end
if gap_duration > 0.05: # 50ms以上の無音期間がある場合
f.write(f"{sec2vtt(w_end)} --> {sec2vtt(next_word_start)}\n")
f.write(f'<c.line>{" ".join(f"<c.past>{w}</c>" if i <= local_idx else f"<c.future>{w}</c>" for i, w in enumerate(all_words))}</c>\n\n')
# ファイルサイズをチェック
current_size = f.tell()
if current_size > MAX_VTT_SIZE_BYTES:
print(f"警告: VTTファイルが{MAX_VTT_SIZE_BYTES/1024/1024:.1f}MBを超えました。処理を中止します。")
raise ValueError("VTTファイルサイズが制限を超えました")
# 最後の単語からセグメント終了まで
last_word_end = float(segment_words[-1][1][1])
if last_word_end < seg_end - 0.05:
f.write(f"{sec2vtt(last_word_end)} --> {sec2vtt(seg_end)}\n")
f.write(f'<c.line>{" ".join(f"<c.past>{w}</c>" for w in all_words)}</c>\n\n')
# ファイルサイズをチェック
current_size = f.tell()
if current_size > MAX_VTT_SIZE_BYTES:
print(f"警告: VTTファイルが{MAX_VTT_SIZE_BYTES/1024/1024:.1f}MBを超えました。処理を中止します。")
raise ValueError("VTTファイルサイズが制限を超えました")
def write_json(segments: List, words: List, path: Path):
result = {"segments": []}; word_idx = 0
for seg_data in segments:
s_start_time = float(seg_data[0]); s_end_time = float(seg_data[1]); s_text = seg_data[2]
segment_words_list: List[dict] = []; temp_current_word_idx = word_idx
if words:
while temp_current_word_idx < len(words):
w_data = words[temp_current_word_idx]; w_start_time = float(w_data[0]); w_end_time = float(w_data[1])
if w_start_time >= s_start_time and w_end_time <= s_end_time + 0.1:
segment_words_list.append({"start": w_start_time, "end": w_end_time, "word": w_data[2]})
temp_current_word_idx += 1
elif w_start_time < s_start_time :
temp_current_word_idx += 1
elif w_start_time > s_end_time:
break
else:
temp_current_word_idx += 1
word_idx = temp_current_word_idx
result["segments"].append({"start": s_start_time, "end": s_end_time, "text": s_text, "words": segment_words_list})
with open(path, "w", encoding="utf-8") as f:
json.dump(result, f, ensure_ascii=False, indent=2)
def write_lrc(segments: List, path: Path):
def sec2lrc(t_float: float) -> str:
m, s = divmod(float(t_float), 60)
return f"[{int(m):02d}:{s:05.2f}]"
with open(path, "w", encoding="utf-8") as f:
for seg_list in segments:
f.write(f"{sec2lrc(float(seg_list[0]))}{seg_list[2]}\n")
# --- 音声分割関数 ---
def split_audio_with_overlap_cli(
audio_path_str: str,
output_dir_for_chunks: str,
chunk_length_sec: int = CHUNK_LENGTH_SECONDS,
overlap_sec: int = CHUNK_OVERLAP_SECONDS
) -> List[str]:
print(f" 音声分割中: 基本チャンク長 {chunk_length_sec}s, オーバーラップ {overlap_sec}s")
# ファイルサイズをチェックして処理方法を決定
file_size = Path(audio_path_str).stat().st_size
file_size_gb = file_size / (1024**3)
# 4GB以上の場合はffmpegを使用
if file_size > 4 * 1024**3:
print(f" 大容量ファイル({file_size_gb:.2f}GB)のため、ffmpegで分割処理を実行します。")
return split_audio_with_ffmpeg(audio_path_str, output_dir_for_chunks, chunk_length_sec, overlap_sec)
# 4GB未満の場合は従来のpydub処理
try:
audio = AudioSegment.from_file(audio_path_str)
except Exception as e:
if "4GB" in str(e) or "Unable to process" in str(e):
print(f" pydubで4GB制限エラー。ffmpegで処理します: {e}")
return split_audio_with_ffmpeg(audio_path_str, output_dir_for_chunks, chunk_length_sec, overlap_sec)
else:
print(f" エラー: 音声ファイル '{Path(audio_path_str).name}' のロード中にエラー(分割処理): {e}")
return []
# 以下は既存のpydub処理...
duration_ms = len(audio); chunk_length_ms = chunk_length_sec * 1000; overlap_ms = overlap_sec * 1000
chunk_paths_list: List[str] = []; start_ms = 0; chunk_idx = 0
audio_file_stem = Path(audio_path_str).stem
while start_ms < duration_ms:
actual_chunk_start_ms = max(0, start_ms - (overlap_ms if start_ms > 0 else 0) )
base_chunk_end_ms = start_ms + chunk_length_ms
actual_chunk_end_ms = min(base_chunk_end_ms + (overlap_ms if base_chunk_end_ms < duration_ms else 0), duration_ms)
if actual_chunk_start_ms >= actual_chunk_end_ms :
if start_ms >= duration_ms: break
print(f" 警告: チャンク計算で予期せぬ状態。スキップします。")
start_ms += chunk_length_ms; continue
chunk_segment = audio[actual_chunk_start_ms:actual_chunk_end_ms]
chunk_file_name = f"{audio_file_stem}_chunk_{chunk_idx:03d}_temp.wav"
chunk_file_path_obj = Path(output_dir_for_chunks, chunk_file_name)
try:
chunk_segment.export(chunk_file_path_obj, format="wav")
chunk_paths_list.append(chunk_file_path_obj.as_posix())
except Exception as export_chunk_e:
print(f" エラー: 一時チャンクファイル {chunk_file_name} のエクスポートに失敗: {export_chunk_e}")
start_ms += chunk_length_ms; chunk_idx += 1
print(f" 音声を {len(chunk_paths_list)} 個のチャンクに分割しました。")
return chunk_paths_list
def split_audio_with_ffmpeg(
audio_path_str: str,
output_dir_for_chunks: str,
chunk_length_sec: int,
overlap_sec: int
) -> List[str]:
"""ffmpegを使用して大容量ファイルを分割"""
try:
if not shutil.which('ffmpeg'):
print("エラー: ffmpegが見つかりません。4GB以上のファイルを処理するにはffmpegが必要です。")
return []
# 音声長を取得
duration_sec = get_audio_duration_with_ffprobe(audio_path_str)
if duration_sec is None:
print("エラー: ffmpegでの分割処理で音声長を取得できませんでした")
return []
chunk_paths_list: List[str] = []
audio_file_stem = Path(audio_path_str).stem
start_sec = 0
chunk_idx = 0
while start_sec < duration_sec:
# チャンク開始・終了時刻を計算
actual_start_sec = max(0, start_sec - (overlap_sec if start_sec > 0 else 0))
base_end_sec = start_sec + chunk_length_sec
actual_end_sec = min(base_end_sec + (overlap_sec if base_end_sec < duration_sec else 0), duration_sec)
if actual_start_sec >= actual_end_sec:
break
chunk_duration = actual_end_sec - actual_start_sec
chunk_file_name = f"{audio_file_stem}_chunk_{chunk_idx:03d}_temp.wav"
chunk_file_path = Path(output_dir_for_chunks) / chunk_file_name
# ffmpegコマンドで音声を抽出・変換
cmd = [
'ffmpeg', '-y', '-loglevel', 'error',
'-ss', str(actual_start_sec),
'-i', audio_path_str,
'-t', str(chunk_duration),
'-acodec', 'pcm_s16le',
'-ar', str(TARGET_SAMPLE_RATE),
'-ac', '1', # モノラル
str(chunk_file_path)
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode == 0:
chunk_paths_list.append(chunk_file_path.as_posix())
print(f" チャンク {chunk_idx+1}: {actual_start_sec:.1f}s - {actual_end_sec:.1f}s -> {chunk_file_name}")
else:
print(f" エラー: チャンク {chunk_idx} の生成に失敗: {result.stderr}")
except subprocess.TimeoutExpired:
print(f" エラー: チャンク {chunk_idx} の生成がタイムアウトしました")
start_sec += chunk_length_sec
chunk_idx += 1
print(f" ffmpegで音声を {len(chunk_paths_list)} 個のチャンクに分割しました。")
return chunk_paths_list
except Exception as e:
print(f" エラー: ffmpegでの音声分割中にエラー: {e}")
return []
# --- 単一ファイル処理のメインロジック ---
def process_single_file(
input_file_path_obj: Path,
asr_model_instance: ASRModel,
device_to_use: str,
output_formats_list: List[str]
) -> bool:
input_file_stem = input_file_path_obj.stem
output_and_temp_dir_str = input_file_path_obj.parent.as_posix()
file_processing_start_time = time.time()
actual_audio_duration_sec: Optional[float] = None
success_status = False
temp_preprocessed_audio_path_str: Optional[str] = None
temp_chunk_file_paths_str_list: List[str] = []
try:
print(f"--- ステップ1/3: {input_file_stem} の音声前処理 ---")
processed_path_for_asr, _, duration_sec_val = preprocess_audio_cli(
input_file_path_obj.as_posix(), output_and_temp_dir_str
)
if not processed_path_for_asr or duration_sec_val is None:
raise Exception("Preprocessing failed")
actual_audio_duration_sec = duration_sec_val
if processed_path_for_asr != input_file_path_obj.as_posix():
temp_preprocessed_audio_path_str = processed_path_for_asr
print(f"--- ステップ2/3: {input_file_stem} の文字起こし (音声長: {actual_audio_duration_sec:.2f}秒) ---")
final_vis_data: Optional[List] = None
final_word_vis_data: Optional[List] = None
if actual_audio_duration_sec > VERY_LONG_AUDIO_THRESHOLD_SECONDS:
print(f" 情報: 音声長が{VERY_LONG_AUDIO_THRESHOLD_SECONDS/3600:.1f}時間を超えるため、分割処理します。")
chunk_file_paths_str = split_audio_with_overlap_cli(
processed_path_for_asr, output_and_temp_dir_str,
chunk_length_sec=CHUNK_LENGTH_SECONDS, overlap_sec=CHUNK_OVERLAP_SECONDS
)
if not chunk_file_paths_str:
raise Exception(f"{input_file_path_obj.name} のチャンク分割に失敗しました。")
temp_chunk_file_paths_str_list = chunk_file_paths_str[:]
all_vis_data_merged: List[List[str]] = []
all_word_vis_data_merged: List[List[str]] = []
current_global_time_offset_sec = 0.0
last_global_segment_end_time_sec = 0.0
# チャンク処理前にGPUメモリをクリア
if device_to_use == 'cuda':
torch.cuda.empty_cache()
gc.collect()
print(f" 初期GPUメモリ使用量: {torch.cuda.memory_allocated() / 1024**2:.1f}MB")
for i, chunk_file_path_str in enumerate(temp_chunk_file_paths_str_list):
print(f" チャンク {i+1}/{len(temp_chunk_file_paths_str_list)} ({Path(chunk_file_path_str).name}) を処理中...")
try:
# 各チャンク処理前にGPUメモリをクリア
if device_to_use == 'cuda':
torch.cuda.empty_cache()
gc.collect()
print(f" チャンク処理前のGPUメモリ使用量: {torch.cuda.memory_allocated() / 1024**2:.1f}MB")
estimated_chunk_duration_for_asr_settings = CHUNK_LENGTH_SECONDS + CHUNK_OVERLAP_SECONDS
vis_data_chunk, _, word_vis_data_chunk = transcribe_audio_cli(
chunk_file_path_str, asr_model_instance,
estimated_chunk_duration_for_asr_settings, device_to_use
)
# チャンク処理後のGPUメモリ使用量を確認
if device_to_use == 'cuda':
print(f" チャンク処理後のGPUメモリ使用量: {torch.cuda.memory_allocated() / 1024**2:.1f}MB")
if not vis_data_chunk:
print(f" 警告: チャンク {Path(chunk_file_path_str).name} の文字起こしに失敗。スキップします。")
current_global_time_offset_sec += CHUNK_LENGTH_SECONDS - (CHUNK_OVERLAP_SECONDS if i < len(temp_chunk_file_paths_str_list) - 1 else 0)
continue
# データのマージ処理
for seg_row_list in vis_data_chunk:
s_local_sec = float(seg_row_list[0])
e_local_sec = float(seg_row_list[1])
text_seg = seg_row_list[2]
s_global_sec = s_local_sec + current_global_time_offset_sec
e_global_sec = e_local_sec + current_global_time_offset_sec
if s_global_sec >= last_global_segment_end_time_sec - 0.1:
all_vis_data_merged.append([f"{s_global_sec:.2f}", f"{e_global_sec:.2f}", text_seg])
last_global_segment_end_time_sec = max(last_global_segment_end_time_sec, e_global_sec)
temp_last_word_global_end_time_sec = float(all_word_vis_data_merged[-1][1]) if all_word_vis_data_merged else 0.0
if word_vis_data_chunk:
for word_row_list in word_vis_data_chunk:
w_s_local_sec = float(word_row_list[0])
w_e_local_sec = float(word_row_list[1])
text_word = word_row_list[2]
w_s_global_sec = w_s_local_sec + current_global_time_offset_sec
w_e_global_sec = w_e_local_sec + current_global_time_offset_sec
if w_s_global_sec >= temp_last_word_global_end_time_sec - 0.05:
all_word_vis_data_merged.append([f"{w_s_global_sec:.2f}", f"{w_e_global_sec:.2f}", text_word])
temp_last_word_global_end_time_sec = max(temp_last_word_global_end_time_sec, w_e_global_sec)
if i < len(temp_chunk_file_paths_str_list) - 1:
current_global_time_offset_sec += (CHUNK_LENGTH_SECONDS - CHUNK_OVERLAP_SECONDS)
# チャンク処理後にGPUメモリをクリア
if device_to_use == 'cuda':
torch.cuda.empty_cache()
gc.collect()
print(f" メモリクリア後のGPUメモリ使用量: {torch.cuda.memory_allocated() / 1024**2:.1f}MB")
except Exception as chunk_proc_e:
print(f" エラー: チャンク {Path(chunk_file_path_str).name} の処理中にエラー: {chunk_proc_e}")
if i < len(temp_chunk_file_paths_str_list) - 1:
current_global_time_offset_sec += (CHUNK_LENGTH_SECONDS - CHUNK_OVERLAP_SECONDS)
final_vis_data = all_vis_data_merged
final_word_vis_data = all_word_vis_data_merged
if not final_vis_data:
raise Exception("チャンク処理後、有効な文字起こしデータが得られませんでした。")
else:
vis_data_single, _, word_vis_data_single = transcribe_audio_cli(
processed_path_for_asr, asr_model_instance, actual_audio_duration_sec, device_to_use
)
if not vis_data_single:
raise Exception(f"{input_file_path_obj.name} の文字起こしに失敗しました。")
final_vis_data = vis_data_single
final_word_vis_data = word_vis_data_single
if final_vis_data:
print(f"--- ステップ3/3: {input_file_stem} の文字起こし結果保存 ---")
save_transcripts_cli(output_and_temp_dir_str, input_file_stem,
final_vis_data, final_word_vis_data if final_word_vis_data else [],
formats=output_formats_list)
success_status = True
else:
print(f"情報: {input_file_path_obj.name} の文字起こし結果が空のため、ファイルは保存しませんでした。")
success_status = True
except Exception as e:
print(f"エラー: ファイル {input_file_path_obj.name} の処理中にエラーが発生しました: {e}")
success_status = False
finally:
file_processing_end_time = time.time()
time_taken_seconds = file_processing_end_time - file_processing_start_time
proc_m = int(time_taken_seconds // 60)
proc_s = time_taken_seconds % 60
summary_message = f" --- {input_file_stem}: 処理サマリー ---\n"
if actual_audio_duration_sec is not None:
audio_m = int(actual_audio_duration_sec // 60)
audio_s = actual_audio_duration_sec % 60
summary_message += f" 音声長: {audio_m}{audio_s:.2f}秒 ({actual_audio_duration_sec:.2f}秒)\n"
else:
summary_message += " 音声長: 不明 (前処理で失敗した可能性があります)\n"
summary_message += f" このファイルの総処理時間: {proc_m}{proc_s:.2f}秒 ({time_taken_seconds:.2f}秒)\n"
summary_message += f" 処理ステータス: {'成功' if success_status else '失敗'}"
print(summary_message)
if temp_preprocessed_audio_path_str and Path(temp_preprocessed_audio_path_str).exists():
try: os.remove(temp_preprocessed_audio_path_str); print(f" 一時ファイル {Path(temp_preprocessed_audio_path_str).name} を削除しました。")
except OSError as e_os: print(f" 警告: 一時ファイル {Path(temp_preprocessed_audio_path_str).name} の削除に失敗: {e_os}")
for chunk_f_str in temp_chunk_file_paths_str_list:
if Path(chunk_f_str).exists():
try: os.remove(chunk_f_str); print(f" 一時チャンクファイル {Path(chunk_f_str).name} を削除しました。")
except OSError as e_os_chunk: print(f" 警告: 一時チャンクファイル {Path(chunk_f_str).name} の削除に失敗: {e_os_chunk}")
# process_single_file の最後では "ファイル処理終了" のログは batch_process_directory に任せる
return success_status
# --- ディレクトリ内ファイルの一括処理関数 ---
def batch_process_directory(
target_dir_str: str,
asr_model_instance: ASRModel,
device_to_use: str,
output_formats: Optional[List[str]] = None
):
batch_start_time = time.time()
if output_formats is None:
output_formats_to_use = DEFAULT_OUTPUT_FORMATS
else:
output_formats_to_use = output_formats
target_dir_path = Path(target_dir_str)
if not target_dir_path.is_dir():
print(f"エラー: 指定されたパス '{target_dir_str}' は有効なディレクトリではありません。"); return
print(f"処理対象ディレクトリ: {target_dir_path.resolve()}")
print(f"入力ファイルの探索優先順位: {', '.join(INPUT_PRIORITY_EXTENSIONS)}")
print(f"出力ファイル形式: {', '.join(output_formats_to_use)}")
all_files_in_dir = list(target_dir_path.iterdir())
potential_stems: Set[str] = set()
for f_path_obj in all_files_in_dir:
if f_path_obj.is_file() and f_path_obj.suffix.lower() in INPUT_PRIORITY_EXTENSIONS:
potential_stems.add(f_path_obj.stem)
if not potential_stems:
print(f"情報: ディレクトリ '{target_dir_path.name}' に対象拡張子のファイルは見つかりませんでした。"); return
print(f"{len(potential_stems)} 個のユニークなファイル名候補が見つかりました。優先順位に従って処理対象を選択します...")
files_to_actually_process: List[Path] = []
for stem_name in sorted(list(potential_stems)):
selected_file_for_this_stem: Optional[Path] = None
for ext_priority in INPUT_PRIORITY_EXTENSIONS:
potential_file = target_dir_path / f"{stem_name}{ext_priority}"
if potential_file.exists() and potential_file.is_file():
selected_file_for_this_stem = potential_file
print(f" ファイル名 '{stem_name}': '{potential_file.name}' を処理対象として選択。")
break
if selected_file_for_this_stem: files_to_actually_process.append(selected_file_for_this_stem)
if not files_to_actually_process:
print("情報: 優先順位適用後、実際に処理するファイルはありませんでした。"); return
print(f"実際に処理するファイル数: {len(files_to_actually_process)} 個")
processed_successfully_count = 0
skipped_due_to_existing_csv_count = 0
failed_count = 0
for input_file_to_process_obj in files_to_actually_process:
print(f"\n======== ファイル処理開始: {input_file_to_process_obj.name} ========") # 各ファイルの開始ログ
is_skipped_at_batch_level = False
if "csv" in output_formats_to_use:
output_csv_path_check = input_file_to_process_obj.with_suffix('.csv')
if output_csv_path_check.exists():
print(f"スキップ (バッチレベル): CSV '{output_csv_path_check.name}' は既に存在します。")
skipped_due_to_existing_csv_count += 1
is_skipped_at_batch_level = True
print(f"======== ファイル処理終了 (スキップ): {input_file_to_process_obj.name} ========\n") # スキップ時の終了ログ
if not is_skipped_at_batch_level:
success_flag = process_single_file(
input_file_to_process_obj,
asr_model_instance,
device_to_use,
output_formats_to_use
)
if success_flag:
processed_successfully_count += 1
else:
failed_count += 1
# process_single_file内で "ファイル処理終了" ログが出力される
print("\n======== 全ファイルのバッチ処理が完了しました ========")
total_considered = len(files_to_actually_process)
print(f"総対象ファイル数(優先度選択後): {total_considered}")
print(f" 処理成功ファイル数: {processed_successfully_count}")
print(f" CSV既存によりスキップされたファイル数: {skipped_due_to_existing_csv_count}")
print(f" 処理失敗ファイル数: {failed_count}")
batch_end_time = time.time()
total_batch_time_seconds = batch_end_time - batch_start_time
batch_m = int(total_batch_time_seconds // 60)
batch_s = total_batch_time_seconds % 60
print(f"バッチ処理全体の総所要時間: {batch_m}{batch_s:.2f}秒 ({total_batch_time_seconds:.2f}秒)")
# --- スクリプト実行のエントリポイント ---
if __name__ == "__main__":
# ★ 引数処理とGUI分岐のための準備
target_directory_arg: Optional[str] = None
formats_arg_str: str = ",".join(DEFAULT_OUTPUT_FORMATS) # GUI時のデフォルト
device_arg_str: Optional[str] = None # GUI時のデフォルト (自動判別)
if len(sys.argv) == 1: # コマンドライン引数なしの場合
print("コマンドライン引数なしで起動されました。GUIでディレクトリを選択します。")
try:
import tkinter as tk
from tkinter import filedialog
def get_directory_from_gui_local() -> Optional[str]:
"""GUIでディレクトリ選択ダイアログを表示し、選択されたパスを返す"""
root = tk.Tk()
root.withdraw() # メインウィンドウは表示しない
# ダイアログを最前面に表示する試み (環境による)
root.attributes('-topmost', True)
# WSL環境での初期ディレクトリを設定
initial_dir = "/mnt/t/demucs_folder/htdemucs" # Windowsのユーザーディレクトリを初期値として設定
selected_path = filedialog.askdirectory(
title="処理対象のディレクトリを選択してください",
initialdir=initial_dir
)
root.attributes('-topmost', False)
root.destroy() # Tkinterウィンドウを破棄
return selected_path if selected_path else None
target_directory_arg = get_directory_from_gui_local()
if not target_directory_arg:
print("ディレクトリが選択されませんでした。処理を中止します。")
sys.exit(0) # 正常終了
# formats_arg_str と device_arg_str は初期化されたデフォルト値を使用
print(f"GUIで選択されたディレクトリ: {target_directory_arg}")
print(f"出力フォーマット (デフォルト): {formats_arg_str}")
# device_arg_strがNoneの場合、後続の処理で自動判別される
except ImportError:
print("エラー: GUIモードに必要なTkinterライブラリが見つかりません。")
print("Tkinterをインストールするか、コマンドライン引数を使用してスクリプトを実行してください。例:")
print(f" python {Path(sys.argv[0]).name} /path/to/your/audio_directory")
sys.exit(1) # エラー終了
except Exception as e_gui:
print(f"GUIの表示中に予期せぬエラーが発生しました: {e_gui}")
sys.exit(1) # エラー終了
else: # コマンドライン引数がある場合
parser = argparse.ArgumentParser(
description="指定されたディレクトリ内の音声/動画ファイルをNVIDIA Parakeet ASRモデルで文字起こしします。\n"
f"同じ名前のファイルが複数ある場合、{' > '.join(INPUT_PRIORITY_EXTENSIONS)} の優先順位で処理します。",
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument( # 最初の引数は必須のディレクトリ
"target_directory", type=str,
help="処理対象のファイルが含まれるディレクトリのパス。"
)
parser.add_argument(
"--formats", type=str, default=",".join(DEFAULT_OUTPUT_FORMATS),
help=(f"出力する文字起こしファイルの形式をカンマ区切りで指定。\n"
f"例: csv,srt (デフォルト: {','.join(DEFAULT_OUTPUT_FORMATS)})\n"
f"利用可能な形式: {','.join(DEFAULT_OUTPUT_FORMATS)}")
)
parser.add_argument(
"--device", type=str, default=None, choices=['cuda', 'cpu'],
help="使用するデバイスを指定 (cuda または cpu)。指定がなければ自動判別。"
)
args = parser.parse_args() # sys.argv[1:] から解析
target_directory_arg = args.target_directory
formats_arg_str = args.formats
device_arg_str = args.device
# --- 共通のセットアップ処理 ---
if device_arg_str: selected_device = device_arg_str
else: selected_device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"使用デバイス: {selected_device.upper()}")
if selected_device == "cuda":
if not torch.cuda.is_available():
print("警告: CUDA指定だが利用不可。CPUを使用します。"); selected_device = "cpu"
else:
try: print(f"CUDAデバイス名: {torch.cuda.get_device_name(0)}")
except Exception as e_cuda_name: print(f"CUDAデバイス名の取得失敗: {e_cuda_name}")
print(f"ASRモデル '{MODEL_NAME}' をロードしています...")
asr_model_main: Optional[ASRModel] = None
try:
asr_model_main = ASRModel.from_pretrained(model_name=MODEL_NAME)
asr_model_main.eval()
print(f"モデル '{MODEL_NAME}' のロード完了。")
except Exception as model_load_e:
print(f"致命的エラー: ASRモデル '{MODEL_NAME}' のロードに失敗: {model_load_e}"); sys.exit(1)
output_formats_requested = [fmt.strip().lower() for fmt in formats_arg_str.split(',') if fmt.strip()]
final_output_formats_to_use = [fmt for fmt in output_formats_requested if fmt in DEFAULT_OUTPUT_FORMATS]
if not output_formats_requested and formats_arg_str:
print(f"警告: 指定された出力フォーマット '{formats_arg_str}' は無効です。")
if not final_output_formats_to_use :
print(f"情報: 有効な出力フォーマットが指定されなかったため、デフォルトの全形式 ({','.join(DEFAULT_OUTPUT_FORMATS)}) で出力します。")
final_output_formats_to_use = DEFAULT_OUTPUT_FORMATS
# target_directory_arg が None でないことを確認 (GUIキャンセル時など)
if not target_directory_arg:
print("エラー: 処理対象のディレクトリが指定されていません。処理を中止します。")
sys.exit(1)
if not asr_model_main: # 通常、モデルロード失敗で既にexitしているはずだが念のため
print("致命的エラー: ASRモデルがロードされていません。処理を中止します。")
sys.exit(1)
batch_process_directory(
target_directory_arg, asr_model_main, selected_device,
output_formats=final_output_formats_to_use
)