# coding: utf-8 import torch import gc from pathlib import Path from pydub import AudioSegment # numpy は直接は使用されていませんが、pydubやNeMoの依存関係で間接的に必要になる可能性があります。 # import numpy as np import os import csv import json from typing import List, Tuple, Optional, Set # Python 3.9+ では Optional, Set は typing から不要な場合あり import argparse import time # ★処理時間計測のために追加 import sys # ★コマンドライン引数チェックのために追加 from nemo.collections.asr.models import ASRModel # NeMo ASRモデル import subprocess import shutil # --- グローバル設定 --- MODEL_NAME = "nvidia/parakeet-tdt-0.6b-v2" TARGET_SAMPLE_RATE = 16000 # 音声の長さに関する閾値 (秒) LONG_AUDIO_THRESHOLD_SECONDS = 480 # 8分 VERY_LONG_AUDIO_THRESHOLD_SECONDS = 10800 # 3時間 # チャンク分割時の設定 CHUNK_LENGTH_SECONDS = 1800 # 30分 CHUNK_OVERLAP_SECONDS = 60 # 1分 # セグメント処理の設定 MAX_SEGMENT_LENGTH_SECONDS = 15 # 最大セグメント長(秒)を15秒に短縮 MAX_SEGMENT_CHARS = 100 # 最大セグメント文字数を100文字に短縮 MIN_SEGMENT_GAP_SECONDS = 0.3 # 最小セグメント間隔(秒) # VTTファイルの最大サイズ(バイト) MAX_VTT_SIZE_BYTES = 10 * 1024 * 1024 # 10MB # 文の区切り文字 SENTENCE_ENDINGS = ['.', '!', '?', '。', '!', '?'] SENTENCE_PAUSES = [',', '、', ';', ';', ':', ':'] # ★ 入力ファイルの優先順位付き拡張子リスト INPUT_PRIORITY_EXTENSIONS: List[str] = ['.wav', '.mp3', '.mp4'] # ★ デフォルトで出力するフォーマットリスト DEFAULT_OUTPUT_FORMATS: List[str] = ["csv", "srt", "vtt", "json", "lrc"] # --- 音声前処理関数 --- def preprocess_audio_cli(audio_path_str: str, output_dir_for_temp_files: str) -> Tuple[Optional[str], Optional[str], Optional[float]]: """ オーディオファイルの前処理(リサンプリング、モノラル変換)を行います。 成功した場合、(処理済みファイルパス, 表示用名, 音声長) を返します。 失敗した場合、(None, None, None) を返します。 """ try: audio_file_path = Path(audio_path_str) original_path_name = audio_file_path.name audio_name_stem = audio_file_path.stem print(f" 音声ファイルをロード中: {original_path_name}") # まずffprobeで音声長を取得(4GB制限なし) duration_sec = get_audio_duration_with_ffprobe(audio_path_str) if duration_sec is None: print("エラー: ffprobeで音声長の取得に失敗しました") return None, None, None print(f" 音声長: {duration_sec:.2f} 秒") # ファイルサイズをチェック file_size = Path(audio_path_str).stat().st_size file_size_gb = file_size / (1024**3) print(f" ファイルサイズ: {file_size_gb:.2f} GB") # 4GB以上またはVERY_LONG_AUDIO_THRESHOLD_SECONDS以上の場合は直接ffmpegでチャンク分割 if file_size > 4 * 1024**3 or duration_sec > VERY_LONG_AUDIO_THRESHOLD_SECONDS: print(f" 大容量ファイル({file_size_gb:.2f}GB, {duration_sec/3600:.2f}時間)のため、ffmpegで直接チャンク分割処理を行います。") # 大容量ファイルの場合もモノラル変換を行う temp_mono_path = Path(output_dir_for_temp_files) / f"{audio_name_stem}_mono_temp.wav" try: cmd = [ 'ffmpeg', '-y', '-i', audio_path_str, '-ac', '1', # モノラルに変換 '-ar', str(TARGET_SAMPLE_RATE), # サンプルレートを設定 str(temp_mono_path) ] subprocess.run(cmd, capture_output=True, check=True) return temp_mono_path.as_posix(), f"{original_path_name} (大容量・モノラル)", duration_sec except subprocess.CalledProcessError as e: print(f" ffmpegでのモノラル変換に失敗: {e}") return audio_path_str, f"{original_path_name} (大容量)", duration_sec # 4GB未満の場合は従来のpydub処理 try: audio = AudioSegment.from_file(audio_path_str) except Exception as pydub_e: if "4GB" in str(pydub_e) or "Unable to process" in str(pydub_e): print(f" pydubで4GB制限エラー。ffmpegで処理します: {pydub_e}") return audio_path_str, f"{original_path_name} (大容量)", duration_sec else: raise pydub_e resampled = False mono_converted = False # リサンプリング処理 if audio.frame_rate != TARGET_SAMPLE_RATE: try: print(f" リサンプリング中: {audio.frame_rate}Hz -> {TARGET_SAMPLE_RATE}Hz") audio = audio.set_frame_rate(TARGET_SAMPLE_RATE) resampled = True except Exception as resample_e: print(f"エラー: 音声のリサンプリングに失敗しました: {resample_e}") return None, None, None # モノラル変換処理 if audio.channels > 1: try: print(f" モノラルに変換中 ({audio.channels}ch -> 1ch)") audio = audio.set_channels(1) mono_converted = True except Exception as mono_e: print(f"エラー: 音声のモノラル変換に失敗しました: {mono_e}") return None, None, None elif audio.channels == 1: print(" 音声は既にモノラルです。") processed_temp_file_path_obj = None # 前処理が行われた場合、一時ファイルに保存 if resampled or mono_converted: try: # ファイル名から特殊文字を除去してより安全な名前を生成 import re safe_stem = re.sub(r'[^\w\-_\.]', '_', audio_name_stem) temp_suffix = "_preprocessed_temp.wav" processed_temp_file_path_obj = Path(output_dir_for_temp_files, f"{safe_stem}{temp_suffix}") print(f" 前処理済み音声の一時保存先: {processed_temp_file_path_obj.name}") audio.export(processed_temp_file_path_obj, format="wav") path_for_transcription = processed_temp_file_path_obj.as_posix() display_name_for_info = f"{original_path_name} (前処理済み)" except Exception as export_e: print(f"エラー: 前処理済み音声のエクスポートに失敗しました: {export_e}") if processed_temp_file_path_obj and processed_temp_file_path_obj.exists(): try: os.remove(processed_temp_file_path_obj) except OSError: pass return None, None, None else: # 前処理が不要だった場合 print(" 前処理は不要でした。元のファイルを使用します。") path_for_transcription = audio_path_str display_name_for_info = original_path_name return path_for_transcription, display_name_for_info, duration_sec except FileNotFoundError: print(f"エラー: 音声ファイルが見つかりません: {audio_path_str}") return None, None, None except Exception as load_e: print(f"エラー: 音声ファイル '{original_path_name}' のロード/デコードに失敗しました: {load_e}") return None, None, None def get_audio_duration_with_ffprobe(audio_path_str: str) -> Optional[float]: """ffprobeを使用して音声ファイルの長さを取得(4GB制限なし)""" try: # ffprobeが利用可能かチェック if not shutil.which('ffprobe'): print("警告: ffprobeが見つかりません。pydubでの処理を試行します。") return None cmd = [ 'ffprobe', '-v', 'quiet', '-show_entries', 'format=duration', '-of', 'csv=p=0', audio_path_str ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if result.returncode == 0 and result.stdout.strip(): duration = float(result.stdout.strip()) return duration else: print(f"ffprobeエラー: {result.stderr}") return None except subprocess.TimeoutExpired: print("ffprobeがタイムアウトしました") return None except Exception as e: print(f"ffprobeでの音声長取得エラー: {e}") return None # --- 文字起こしコア関数 --- def find_natural_break_point(text: str, max_length: int) -> int: """テキスト内で自然な区切り点を探す""" if len(text) <= max_length: return len(text) # 文末で区切る for i in range(max_length, 0, -1): if i < len(text) and text[i] in SENTENCE_ENDINGS: return i + 1 # 文の区切りで区切る for i in range(max_length, 0, -1): if i < len(text) and text[i] in SENTENCE_PAUSES: return i + 1 # スペースで区切る for i in range(max_length, 0, -1): if i < len(text) and text[i].isspace(): return i + 1 # それでも見つからない場合は最大長で区切る return max_length def split_segment(segment: dict, max_length_seconds: float, max_chars: int) -> List[dict]: """セグメントを自然な区切りで分割する""" if (segment['end'] - segment['start']) <= max_length_seconds and len(segment['segment']) <= max_chars: return [segment] result = [] current_text = segment['segment'] current_start = segment['start'] total_duration = segment['end'] - segment['start'] while current_text: # 文字数に基づく分割点を探す break_point = find_natural_break_point(current_text, max_chars) # 時間に基づく分割点を計算 text_ratio = break_point / len(segment['segment']) segment_duration = total_duration * text_ratio # 分割点が最大長を超えないように調整 if segment_duration > max_length_seconds: time_ratio = max_length_seconds / total_duration break_point = int(len(segment['segment']) * time_ratio) break_point = find_natural_break_point(current_text, break_point) segment_duration = max_length_seconds # 新しいセグメントを作成 new_segment = { 'start': current_start, 'end': current_start + segment_duration, 'segment': current_text[:break_point].strip() } result.append(new_segment) # 残りのテキストと開始時間を更新 current_text = current_text[break_point:].strip() current_start = new_segment['end'] return result def transcribe_audio_cli( transcribe_path_str: str, model: ASRModel, duration_sec: float, device: str ) -> Tuple[Optional[List], Optional[List], Optional[List]]: long_audio_settings_applied = False original_model_dtype = model.dtype try: if device == 'cuda': torch.cuda.empty_cache() gc.collect() model.to(device) # 音声長に応じてモデル設定を変更 if duration_sec > LONG_AUDIO_THRESHOLD_SECONDS: try: print(f" 情報: 音声長 ({duration_sec:.0f}s) が閾値 ({LONG_AUDIO_THRESHOLD_SECONDS}s) を超えるため、長尺音声向け設定を適用します。") model.change_attention_model( self_attention_model="rel_pos_local_attn", att_context_size=[128, 128] ) model.change_subsampling_conv_chunking_factor(1) long_audio_settings_applied = True if device == 'cuda': torch.cuda.empty_cache() gc.collect() except Exception as setting_e: print(f" 警告: 長尺音声向け設定の適用に失敗しました: {setting_e}。デフォルト設定で続行します。") if device == 'cuda' and torch.cuda.is_bf16_supported(): print(" 情報: モデルを bfloat16 に変換して推論を実行します。") model.to(torch.bfloat16) elif model.dtype != original_model_dtype: model.to(original_model_dtype) print(f" 文字起こしを実行中 (デバイス: {device}, モデルdtype: {model.dtype})...") output = model.transcribe( [transcribe_path_str], timestamps=True, batch_size=2 ) if not output or not isinstance(output, list) or not output[0] or \ not hasattr(output[0], 'timestamp') or not output[0].timestamp or \ 'segment' not in output[0].timestamp: print(" エラー: 文字起こしに失敗したか、予期しない出力形式です。") return None, None, None segment_timestamps = output[0].timestamp['segment'] # セグメントの前処理:より適切なセグメント分割 processed_segments = [] current_segment = None for ts in segment_timestamps: if current_segment is None: current_segment = ts else: # セグメント結合の条件を厳格化 time_gap = ts['start'] - current_segment['end'] current_text = current_segment['segment'] next_text = ts['segment'] # 結合条件のチェック should_merge = ( time_gap < MIN_SEGMENT_GAP_SECONDS and # 時間間隔が短い len(current_text) + len(next_text) < MAX_SEGMENT_CHARS and # 文字数制限 (current_segment['end'] - current_segment['start']) < MAX_SEGMENT_LENGTH_SECONDS and # 現在のセグメントが短い (ts['end'] - ts['start']) < MAX_SEGMENT_LENGTH_SECONDS and # 次のセグメントが短い not any(current_text.strip().endswith(p) for p in SENTENCE_ENDINGS) # 文の区切りでない ) if should_merge: current_segment['end'] = ts['end'] current_segment['segment'] += ' ' + ts['segment'] else: # 現在のセグメントを分割 split_segments = split_segment(current_segment, MAX_SEGMENT_LENGTH_SECONDS, MAX_SEGMENT_CHARS) processed_segments.extend(split_segments) current_segment = ts if current_segment is not None: # 最後のセグメントも分割 split_segments = split_segment(current_segment, MAX_SEGMENT_LENGTH_SECONDS, MAX_SEGMENT_CHARS) processed_segments.extend(split_segments) # 処理済みセグメントからデータを生成 vis_data = [[f"{ts['start']:.2f}", f"{ts['end']:.2f}", ts['segment']] for ts in processed_segments] raw_times_data = [[ts['start'], ts['end']] for ts in processed_segments] # 単語タイムスタンプの処理を改善 word_timestamps_raw = output[0].timestamp.get("word", []) word_vis_data = [] for w in word_timestamps_raw: if not isinstance(w, dict) or not all(k in w for k in ['start', 'end', 'word']): continue # 単語のタイムスタンプを最も近いセグメントに割り当て word_start = float(w['start']) word_end = float(w['end']) # 単語が完全に含まれるセグメントを探す for seg in processed_segments: if word_start >= seg['start'] - 0.05 and word_end <= seg['end'] + 0.05: word_vis_data.append([f"{word_start:.2f}", f"{word_end:.2f}", w["word"]]) break print(" 文字起こし完了。") return vis_data, raw_times_data, word_vis_data except torch.cuda.OutOfMemoryError as oom_e: print(f" 致命的エラー: CUDAメモリ不足です。 {oom_e}") print(" バッチサイズを小さくする、他のGPU利用アプリを終了するなどの対策を試みてください。") return None, None, None except Exception as e: print(f" エラー: 文字起こし処理中に予期せぬエラーが発生しました: {e}") import traceback traceback.print_exc() return None, None, None finally: if long_audio_settings_applied: try: print(" 長尺音声向け設定を元に戻しています。") model.change_attention_model(self_attention_model="rel_pos") model.change_subsampling_conv_chunking_factor(-1) except Exception as revert_e: print(f" 警告: 長尺音声設定の復元に失敗: {revert_e}") model.to(original_model_dtype) if model.device.type != 'cpu': model.cpu() if device == 'cuda': torch.cuda.empty_cache() gc.collect() # --- 結果保存関数 --- def save_transcripts_cli(output_dir_str: str, audio_file_stem: str, vis_data: List, word_vis_data: List, formats: Optional[List[str]] = None): if formats is None: formats_to_save = DEFAULT_OUTPUT_FORMATS else: formats_to_save = formats output_dir_path = Path(output_dir_str) output_dir_path.mkdir(parents=True, exist_ok=True) saved_files_count = 0 print(f" 結果を保存中 (対象形式: {', '.join(formats_to_save)})...") try: if "csv" in formats_to_save: csv_file_path = output_dir_path / f"{audio_file_stem}.csv" csv_headers = ["Start (s)", "End (s)", "Segment"] with open(csv_file_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f); writer.writerow(csv_headers); writer.writerows(vis_data) print(f" CSVファイルを保存: {csv_file_path.name}"); saved_files_count +=1 if "srt" in formats_to_save: srt_file_path = output_dir_path / f"{audio_file_stem}.srt" write_srt(vis_data, srt_file_path) print(f" SRTファイルを保存: {srt_file_path.name}"); saved_files_count +=1 if "vtt" in formats_to_save: vtt_file_path = output_dir_path / f"{audio_file_stem}.vtt" try: write_vtt(vis_data, word_vis_data, vtt_file_path) print(f" VTTファイルを保存: {vtt_file_path.name}"); saved_files_count +=1 except ValueError as e: if "VTTファイルサイズが制限を超えました" in str(e): print(f" エラー: {e}") # 既に作成されたVTTファイルを削除 if vtt_file_path.exists(): vtt_file_path.unlink() raise # エラーを上位に伝播 if "json" in formats_to_save: json_file_path = output_dir_path / f"{audio_file_stem}.json" write_json(vis_data, word_vis_data, json_file_path) print(f" JSONファイルを保存: {json_file_path.name}"); saved_files_count +=1 if "lrc" in formats_to_save: lrc_file_path = output_dir_path / f"{audio_file_stem}.lrc" write_lrc(vis_data, lrc_file_path) print(f" LRCファイルを保存: {lrc_file_path.name}"); saved_files_count +=1 if saved_files_count == 0 and formats_to_save: print(f" 警告: 指定されたフォーマット {formats_to_save} でのファイルの保存は行われませんでした。") except Exception as e: print(f" エラー: 文字起こしファイルの保存中にエラーが発生しました: {e}") raise # エラーを上位に伝播 # --- 書き出しヘルパー関数群 (SRT, VTT, JSON, LRC) --- def write_srt(segments: List, path: Path): def sec2srt(t_float: float) -> str: h, rem = divmod(int(t_float), 3600); m, s = divmod(rem, 60) ms = int((t_float - int(t_float)) * 1000) return f"{h:02}:{m:02}:{s:02},{ms:03}" with open(path, "w", encoding="utf-8") as f: for i, seg_list in enumerate(segments, 1): f.write(f"{i}\n{sec2srt(float(seg_list[0]))} --> {sec2srt(float(seg_list[1]))}\n{seg_list[2]}\n\n") def write_vtt(segments: List, words: List, path: Path): def sec2vtt(t_float: float) -> str: h, rem = divmod(int(t_float), 3600) m, s = divmod(rem, 60) ms = int((t_float - int(t_float)) * 1000) return f"{h:02}:{m:02}:{s:02}.{ms:03}" with open(path, "w", encoding="utf-8") as f: f.write("WEBVTT\n\n") f.write("STYLE\n") f.write("::cue(.current) { color: #ffff00; font-weight: bold; }\n") f.write("::cue(.past) { color: #888888; }\n") f.write("::cue(.future) { color: #ffffff; }\n") f.write("::cue(.line) { background: rgba(0,0,0,0.7); padding: 4px; }\n\n") if not words: # 単語タイムスタンプがない場合は、セグメント単位で出力 for i, seg_list in enumerate(segments, 1): f.write(f"NOTE Segment {i}\n") f.write(f"{sec2vtt(float(seg_list[0]))} --> {sec2vtt(float(seg_list[1]))}\n{seg_list[2]}\n\n") # ファイルサイズをチェック current_size = f.tell() if current_size > MAX_VTT_SIZE_BYTES: print(f"警告: VTTファイルが{MAX_VTT_SIZE_BYTES/1024/1024:.1f}MBを超えました。処理を中止します。") raise ValueError("VTTファイルサイズが制限を超えました") return # セグメント単位で処理 for seg_data in segments: seg_start = float(seg_data[0]) seg_end = float(seg_data[1]) # このセグメントに含まれる単語を特定 segment_words = [] for word_idx, word_data in enumerate(words): word_start = float(word_data[0]) word_end = float(word_data[1]) if word_start >= seg_start - 0.1 and word_end <= seg_end + 0.1: segment_words.append((word_idx, word_data)) if not segment_words: continue # セグメント内の全単語のテキストを一度だけ生成 all_words = [w_data[2] for _, w_data in segment_words] # セグメント開始から最初の単語まで first_word_start = float(segment_words[0][1][0]) if seg_start < first_word_start - 0.05: f.write(f"{sec2vtt(seg_start)} --> {sec2vtt(first_word_start)}\n") f.write(f'{" ".join(f"{w}" for w in all_words)}\n\n') # ファイルサイズをチェック current_size = f.tell() if current_size > MAX_VTT_SIZE_BYTES: print(f"警告: VTTファイルが{MAX_VTT_SIZE_BYTES/1024/1024:.1f}MBを超えました。処理を中止します。") raise ValueError("VTTファイルサイズが制限を超えました") # 各単語の処理 for local_idx, (_, word_data) in enumerate(segment_words): w_start = float(word_data[0]) w_end = float(word_data[1]) # 単語の表示時間を出力 f.write(f"{sec2vtt(w_start)} --> {sec2vtt(w_end)}\n") # 現在の単語をハイライトしたテキストを生成 line_parts = [] for i, w in enumerate(all_words): if i == local_idx: line_parts.append(f'{w}') elif i < local_idx: line_parts.append(f'{w}') else: line_parts.append(f'{w}') f.write(f'{" ".join(line_parts)}\n\n') # ファイルサイズをチェック current_size = f.tell() if current_size > MAX_VTT_SIZE_BYTES: print(f"警告: VTTファイルが{MAX_VTT_SIZE_BYTES/1024/1024:.1f}MBを超えました。処理を中止します。") raise ValueError("VTTファイルサイズが制限を超えました") # 単語間の無音期間の処理 if local_idx < len(segment_words) - 1: next_word_start = float(segment_words[local_idx + 1][1][0]) gap_duration = next_word_start - w_end if gap_duration > 0.05: # 50ms以上の無音期間がある場合 f.write(f"{sec2vtt(w_end)} --> {sec2vtt(next_word_start)}\n") f.write(f'{" ".join(f"{w}" if i <= local_idx else f"{w}" for i, w in enumerate(all_words))}\n\n') # ファイルサイズをチェック current_size = f.tell() if current_size > MAX_VTT_SIZE_BYTES: print(f"警告: VTTファイルが{MAX_VTT_SIZE_BYTES/1024/1024:.1f}MBを超えました。処理を中止します。") raise ValueError("VTTファイルサイズが制限を超えました") # 最後の単語からセグメント終了まで last_word_end = float(segment_words[-1][1][1]) if last_word_end < seg_end - 0.05: f.write(f"{sec2vtt(last_word_end)} --> {sec2vtt(seg_end)}\n") f.write(f'{" ".join(f"{w}" for w in all_words)}\n\n') # ファイルサイズをチェック current_size = f.tell() if current_size > MAX_VTT_SIZE_BYTES: print(f"警告: VTTファイルが{MAX_VTT_SIZE_BYTES/1024/1024:.1f}MBを超えました。処理を中止します。") raise ValueError("VTTファイルサイズが制限を超えました") def write_json(segments: List, words: List, path: Path): result = {"segments": []}; word_idx = 0 for seg_data in segments: s_start_time = float(seg_data[0]); s_end_time = float(seg_data[1]); s_text = seg_data[2] segment_words_list: List[dict] = []; temp_current_word_idx = word_idx if words: while temp_current_word_idx < len(words): w_data = words[temp_current_word_idx]; w_start_time = float(w_data[0]); w_end_time = float(w_data[1]) if w_start_time >= s_start_time and w_end_time <= s_end_time + 0.1: segment_words_list.append({"start": w_start_time, "end": w_end_time, "word": w_data[2]}) temp_current_word_idx += 1 elif w_start_time < s_start_time : temp_current_word_idx += 1 elif w_start_time > s_end_time: break else: temp_current_word_idx += 1 word_idx = temp_current_word_idx result["segments"].append({"start": s_start_time, "end": s_end_time, "text": s_text, "words": segment_words_list}) with open(path, "w", encoding="utf-8") as f: json.dump(result, f, ensure_ascii=False, indent=2) def write_lrc(segments: List, path: Path): def sec2lrc(t_float: float) -> str: m, s = divmod(float(t_float), 60) return f"[{int(m):02d}:{s:05.2f}]" with open(path, "w", encoding="utf-8") as f: for seg_list in segments: f.write(f"{sec2lrc(float(seg_list[0]))}{seg_list[2]}\n") # --- 音声分割関数 --- def split_audio_with_overlap_cli( audio_path_str: str, output_dir_for_chunks: str, chunk_length_sec: int = CHUNK_LENGTH_SECONDS, overlap_sec: int = CHUNK_OVERLAP_SECONDS ) -> List[str]: print(f" 音声分割中: 基本チャンク長 {chunk_length_sec}s, オーバーラップ {overlap_sec}s") # ファイルサイズをチェックして処理方法を決定 file_size = Path(audio_path_str).stat().st_size file_size_gb = file_size / (1024**3) # 4GB以上の場合はffmpegを使用 if file_size > 4 * 1024**3: print(f" 大容量ファイル({file_size_gb:.2f}GB)のため、ffmpegで分割処理を実行します。") return split_audio_with_ffmpeg(audio_path_str, output_dir_for_chunks, chunk_length_sec, overlap_sec) # 4GB未満の場合は従来のpydub処理 try: audio = AudioSegment.from_file(audio_path_str) except Exception as e: if "4GB" in str(e) or "Unable to process" in str(e): print(f" pydubで4GB制限エラー。ffmpegで処理します: {e}") return split_audio_with_ffmpeg(audio_path_str, output_dir_for_chunks, chunk_length_sec, overlap_sec) else: print(f" エラー: 音声ファイル '{Path(audio_path_str).name}' のロード中にエラー(分割処理): {e}") return [] # 以下は既存のpydub処理... duration_ms = len(audio); chunk_length_ms = chunk_length_sec * 1000; overlap_ms = overlap_sec * 1000 chunk_paths_list: List[str] = []; start_ms = 0; chunk_idx = 0 audio_file_stem = Path(audio_path_str).stem while start_ms < duration_ms: actual_chunk_start_ms = max(0, start_ms - (overlap_ms if start_ms > 0 else 0) ) base_chunk_end_ms = start_ms + chunk_length_ms actual_chunk_end_ms = min(base_chunk_end_ms + (overlap_ms if base_chunk_end_ms < duration_ms else 0), duration_ms) if actual_chunk_start_ms >= actual_chunk_end_ms : if start_ms >= duration_ms: break print(f" 警告: チャンク計算で予期せぬ状態。スキップします。") start_ms += chunk_length_ms; continue chunk_segment = audio[actual_chunk_start_ms:actual_chunk_end_ms] chunk_file_name = f"{audio_file_stem}_chunk_{chunk_idx:03d}_temp.wav" chunk_file_path_obj = Path(output_dir_for_chunks, chunk_file_name) try: chunk_segment.export(chunk_file_path_obj, format="wav") chunk_paths_list.append(chunk_file_path_obj.as_posix()) except Exception as export_chunk_e: print(f" エラー: 一時チャンクファイル {chunk_file_name} のエクスポートに失敗: {export_chunk_e}") start_ms += chunk_length_ms; chunk_idx += 1 print(f" 音声を {len(chunk_paths_list)} 個のチャンクに分割しました。") return chunk_paths_list def split_audio_with_ffmpeg( audio_path_str: str, output_dir_for_chunks: str, chunk_length_sec: int, overlap_sec: int ) -> List[str]: """ffmpegを使用して大容量ファイルを分割""" try: if not shutil.which('ffmpeg'): print("エラー: ffmpegが見つかりません。4GB以上のファイルを処理するにはffmpegが必要です。") return [] # 音声長を取得 duration_sec = get_audio_duration_with_ffprobe(audio_path_str) if duration_sec is None: print("エラー: ffmpegでの分割処理で音声長を取得できませんでした") return [] chunk_paths_list: List[str] = [] audio_file_stem = Path(audio_path_str).stem start_sec = 0 chunk_idx = 0 while start_sec < duration_sec: # チャンク開始・終了時刻を計算 actual_start_sec = max(0, start_sec - (overlap_sec if start_sec > 0 else 0)) base_end_sec = start_sec + chunk_length_sec actual_end_sec = min(base_end_sec + (overlap_sec if base_end_sec < duration_sec else 0), duration_sec) if actual_start_sec >= actual_end_sec: break chunk_duration = actual_end_sec - actual_start_sec chunk_file_name = f"{audio_file_stem}_chunk_{chunk_idx:03d}_temp.wav" chunk_file_path = Path(output_dir_for_chunks) / chunk_file_name # ffmpegコマンドで音声を抽出・変換 cmd = [ 'ffmpeg', '-y', '-loglevel', 'error', '-ss', str(actual_start_sec), '-i', audio_path_str, '-t', str(chunk_duration), '-acodec', 'pcm_s16le', '-ar', str(TARGET_SAMPLE_RATE), '-ac', '1', # モノラル str(chunk_file_path) ] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) if result.returncode == 0: chunk_paths_list.append(chunk_file_path.as_posix()) print(f" チャンク {chunk_idx+1}: {actual_start_sec:.1f}s - {actual_end_sec:.1f}s -> {chunk_file_name}") else: print(f" エラー: チャンク {chunk_idx} の生成に失敗: {result.stderr}") except subprocess.TimeoutExpired: print(f" エラー: チャンク {chunk_idx} の生成がタイムアウトしました") start_sec += chunk_length_sec chunk_idx += 1 print(f" ffmpegで音声を {len(chunk_paths_list)} 個のチャンクに分割しました。") return chunk_paths_list except Exception as e: print(f" エラー: ffmpegでの音声分割中にエラー: {e}") return [] # --- 単一ファイル処理のメインロジック --- def process_single_file( input_file_path_obj: Path, asr_model_instance: ASRModel, device_to_use: str, output_formats_list: List[str] ) -> bool: input_file_stem = input_file_path_obj.stem output_and_temp_dir_str = input_file_path_obj.parent.as_posix() file_processing_start_time = time.time() actual_audio_duration_sec: Optional[float] = None success_status = False temp_preprocessed_audio_path_str: Optional[str] = None temp_chunk_file_paths_str_list: List[str] = [] try: print(f"--- ステップ1/3: {input_file_stem} の音声前処理 ---") processed_path_for_asr, _, duration_sec_val = preprocess_audio_cli( input_file_path_obj.as_posix(), output_and_temp_dir_str ) if not processed_path_for_asr or duration_sec_val is None: raise Exception("Preprocessing failed") actual_audio_duration_sec = duration_sec_val if processed_path_for_asr != input_file_path_obj.as_posix(): temp_preprocessed_audio_path_str = processed_path_for_asr print(f"--- ステップ2/3: {input_file_stem} の文字起こし (音声長: {actual_audio_duration_sec:.2f}秒) ---") final_vis_data: Optional[List] = None final_word_vis_data: Optional[List] = None if actual_audio_duration_sec > VERY_LONG_AUDIO_THRESHOLD_SECONDS: print(f" 情報: 音声長が{VERY_LONG_AUDIO_THRESHOLD_SECONDS/3600:.1f}時間を超えるため、分割処理します。") chunk_file_paths_str = split_audio_with_overlap_cli( processed_path_for_asr, output_and_temp_dir_str, chunk_length_sec=CHUNK_LENGTH_SECONDS, overlap_sec=CHUNK_OVERLAP_SECONDS ) if not chunk_file_paths_str: raise Exception(f"{input_file_path_obj.name} のチャンク分割に失敗しました。") temp_chunk_file_paths_str_list = chunk_file_paths_str[:] all_vis_data_merged: List[List[str]] = [] all_word_vis_data_merged: List[List[str]] = [] current_global_time_offset_sec = 0.0 last_global_segment_end_time_sec = 0.0 # チャンク処理前にGPUメモリをクリア if device_to_use == 'cuda': torch.cuda.empty_cache() gc.collect() print(f" 初期GPUメモリ使用量: {torch.cuda.memory_allocated() / 1024**2:.1f}MB") for i, chunk_file_path_str in enumerate(temp_chunk_file_paths_str_list): print(f" チャンク {i+1}/{len(temp_chunk_file_paths_str_list)} ({Path(chunk_file_path_str).name}) を処理中...") try: # 各チャンク処理前にGPUメモリをクリア if device_to_use == 'cuda': torch.cuda.empty_cache() gc.collect() print(f" チャンク処理前のGPUメモリ使用量: {torch.cuda.memory_allocated() / 1024**2:.1f}MB") estimated_chunk_duration_for_asr_settings = CHUNK_LENGTH_SECONDS + CHUNK_OVERLAP_SECONDS vis_data_chunk, _, word_vis_data_chunk = transcribe_audio_cli( chunk_file_path_str, asr_model_instance, estimated_chunk_duration_for_asr_settings, device_to_use ) # チャンク処理後のGPUメモリ使用量を確認 if device_to_use == 'cuda': print(f" チャンク処理後のGPUメモリ使用量: {torch.cuda.memory_allocated() / 1024**2:.1f}MB") if not vis_data_chunk: print(f" 警告: チャンク {Path(chunk_file_path_str).name} の文字起こしに失敗。スキップします。") current_global_time_offset_sec += CHUNK_LENGTH_SECONDS - (CHUNK_OVERLAP_SECONDS if i < len(temp_chunk_file_paths_str_list) - 1 else 0) continue # データのマージ処理 for seg_row_list in vis_data_chunk: s_local_sec = float(seg_row_list[0]) e_local_sec = float(seg_row_list[1]) text_seg = seg_row_list[2] s_global_sec = s_local_sec + current_global_time_offset_sec e_global_sec = e_local_sec + current_global_time_offset_sec if s_global_sec >= last_global_segment_end_time_sec - 0.1: all_vis_data_merged.append([f"{s_global_sec:.2f}", f"{e_global_sec:.2f}", text_seg]) last_global_segment_end_time_sec = max(last_global_segment_end_time_sec, e_global_sec) temp_last_word_global_end_time_sec = float(all_word_vis_data_merged[-1][1]) if all_word_vis_data_merged else 0.0 if word_vis_data_chunk: for word_row_list in word_vis_data_chunk: w_s_local_sec = float(word_row_list[0]) w_e_local_sec = float(word_row_list[1]) text_word = word_row_list[2] w_s_global_sec = w_s_local_sec + current_global_time_offset_sec w_e_global_sec = w_e_local_sec + current_global_time_offset_sec if w_s_global_sec >= temp_last_word_global_end_time_sec - 0.05: all_word_vis_data_merged.append([f"{w_s_global_sec:.2f}", f"{w_e_global_sec:.2f}", text_word]) temp_last_word_global_end_time_sec = max(temp_last_word_global_end_time_sec, w_e_global_sec) if i < len(temp_chunk_file_paths_str_list) - 1: current_global_time_offset_sec += (CHUNK_LENGTH_SECONDS - CHUNK_OVERLAP_SECONDS) # チャンク処理後にGPUメモリをクリア if device_to_use == 'cuda': torch.cuda.empty_cache() gc.collect() print(f" メモリクリア後のGPUメモリ使用量: {torch.cuda.memory_allocated() / 1024**2:.1f}MB") except Exception as chunk_proc_e: print(f" エラー: チャンク {Path(chunk_file_path_str).name} の処理中にエラー: {chunk_proc_e}") if i < len(temp_chunk_file_paths_str_list) - 1: current_global_time_offset_sec += (CHUNK_LENGTH_SECONDS - CHUNK_OVERLAP_SECONDS) final_vis_data = all_vis_data_merged final_word_vis_data = all_word_vis_data_merged if not final_vis_data: raise Exception("チャンク処理後、有効な文字起こしデータが得られませんでした。") else: vis_data_single, _, word_vis_data_single = transcribe_audio_cli( processed_path_for_asr, asr_model_instance, actual_audio_duration_sec, device_to_use ) if not vis_data_single: raise Exception(f"{input_file_path_obj.name} の文字起こしに失敗しました。") final_vis_data = vis_data_single final_word_vis_data = word_vis_data_single if final_vis_data: print(f"--- ステップ3/3: {input_file_stem} の文字起こし結果保存 ---") save_transcripts_cli(output_and_temp_dir_str, input_file_stem, final_vis_data, final_word_vis_data if final_word_vis_data else [], formats=output_formats_list) success_status = True else: print(f"情報: {input_file_path_obj.name} の文字起こし結果が空のため、ファイルは保存しませんでした。") success_status = True except Exception as e: print(f"エラー: ファイル {input_file_path_obj.name} の処理中にエラーが発生しました: {e}") success_status = False finally: file_processing_end_time = time.time() time_taken_seconds = file_processing_end_time - file_processing_start_time proc_m = int(time_taken_seconds // 60) proc_s = time_taken_seconds % 60 summary_message = f" --- {input_file_stem}: 処理サマリー ---\n" if actual_audio_duration_sec is not None: audio_m = int(actual_audio_duration_sec // 60) audio_s = actual_audio_duration_sec % 60 summary_message += f" 音声長: {audio_m}分{audio_s:.2f}秒 ({actual_audio_duration_sec:.2f}秒)\n" else: summary_message += " 音声長: 不明 (前処理で失敗した可能性があります)\n" summary_message += f" このファイルの総処理時間: {proc_m}分{proc_s:.2f}秒 ({time_taken_seconds:.2f}秒)\n" summary_message += f" 処理ステータス: {'成功' if success_status else '失敗'}" print(summary_message) if temp_preprocessed_audio_path_str and Path(temp_preprocessed_audio_path_str).exists(): try: os.remove(temp_preprocessed_audio_path_str); print(f" 一時ファイル {Path(temp_preprocessed_audio_path_str).name} を削除しました。") except OSError as e_os: print(f" 警告: 一時ファイル {Path(temp_preprocessed_audio_path_str).name} の削除に失敗: {e_os}") for chunk_f_str in temp_chunk_file_paths_str_list: if Path(chunk_f_str).exists(): try: os.remove(chunk_f_str); print(f" 一時チャンクファイル {Path(chunk_f_str).name} を削除しました。") except OSError as e_os_chunk: print(f" 警告: 一時チャンクファイル {Path(chunk_f_str).name} の削除に失敗: {e_os_chunk}") # process_single_file の最後では "ファイル処理終了" のログは batch_process_directory に任せる return success_status # --- ディレクトリ内ファイルの一括処理関数 --- def batch_process_directory( target_dir_str: str, asr_model_instance: ASRModel, device_to_use: str, output_formats: Optional[List[str]] = None ): batch_start_time = time.time() if output_formats is None: output_formats_to_use = DEFAULT_OUTPUT_FORMATS else: output_formats_to_use = output_formats target_dir_path = Path(target_dir_str) if not target_dir_path.is_dir(): print(f"エラー: 指定されたパス '{target_dir_str}' は有効なディレクトリではありません。"); return print(f"処理対象ディレクトリ: {target_dir_path.resolve()}") print(f"入力ファイルの探索優先順位: {', '.join(INPUT_PRIORITY_EXTENSIONS)}") print(f"出力ファイル形式: {', '.join(output_formats_to_use)}") all_files_in_dir = list(target_dir_path.iterdir()) potential_stems: Set[str] = set() for f_path_obj in all_files_in_dir: if f_path_obj.is_file() and f_path_obj.suffix.lower() in INPUT_PRIORITY_EXTENSIONS: potential_stems.add(f_path_obj.stem) if not potential_stems: print(f"情報: ディレクトリ '{target_dir_path.name}' に対象拡張子のファイルは見つかりませんでした。"); return print(f"{len(potential_stems)} 個のユニークなファイル名候補が見つかりました。優先順位に従って処理対象を選択します...") files_to_actually_process: List[Path] = [] for stem_name in sorted(list(potential_stems)): selected_file_for_this_stem: Optional[Path] = None for ext_priority in INPUT_PRIORITY_EXTENSIONS: potential_file = target_dir_path / f"{stem_name}{ext_priority}" if potential_file.exists() and potential_file.is_file(): selected_file_for_this_stem = potential_file print(f" ファイル名 '{stem_name}': '{potential_file.name}' を処理対象として選択。") break if selected_file_for_this_stem: files_to_actually_process.append(selected_file_for_this_stem) if not files_to_actually_process: print("情報: 優先順位適用後、実際に処理するファイルはありませんでした。"); return print(f"実際に処理するファイル数: {len(files_to_actually_process)} 個") processed_successfully_count = 0 skipped_due_to_existing_csv_count = 0 failed_count = 0 for input_file_to_process_obj in files_to_actually_process: print(f"\n======== ファイル処理開始: {input_file_to_process_obj.name} ========") # 各ファイルの開始ログ is_skipped_at_batch_level = False if "csv" in output_formats_to_use: output_csv_path_check = input_file_to_process_obj.with_suffix('.csv') if output_csv_path_check.exists(): print(f"スキップ (バッチレベル): CSV '{output_csv_path_check.name}' は既に存在します。") skipped_due_to_existing_csv_count += 1 is_skipped_at_batch_level = True print(f"======== ファイル処理終了 (スキップ): {input_file_to_process_obj.name} ========\n") # スキップ時の終了ログ if not is_skipped_at_batch_level: success_flag = process_single_file( input_file_to_process_obj, asr_model_instance, device_to_use, output_formats_to_use ) if success_flag: processed_successfully_count += 1 else: failed_count += 1 # process_single_file内で "ファイル処理終了" ログが出力される print("\n======== 全ファイルのバッチ処理が完了しました ========") total_considered = len(files_to_actually_process) print(f"総対象ファイル数(優先度選択後): {total_considered}") print(f" 処理成功ファイル数: {processed_successfully_count}") print(f" CSV既存によりスキップされたファイル数: {skipped_due_to_existing_csv_count}") print(f" 処理失敗ファイル数: {failed_count}") batch_end_time = time.time() total_batch_time_seconds = batch_end_time - batch_start_time batch_m = int(total_batch_time_seconds // 60) batch_s = total_batch_time_seconds % 60 print(f"バッチ処理全体の総所要時間: {batch_m}分{batch_s:.2f}秒 ({total_batch_time_seconds:.2f}秒)") # --- スクリプト実行のエントリポイント --- if __name__ == "__main__": # ★ 引数処理とGUI分岐のための準備 target_directory_arg: Optional[str] = None formats_arg_str: str = ",".join(DEFAULT_OUTPUT_FORMATS) # GUI時のデフォルト device_arg_str: Optional[str] = None # GUI時のデフォルト (自動判別) if len(sys.argv) == 1: # コマンドライン引数なしの場合 print("コマンドライン引数なしで起動されました。GUIでディレクトリを選択します。") try: import tkinter as tk from tkinter import filedialog def get_directory_from_gui_local() -> Optional[str]: """GUIでディレクトリ選択ダイアログを表示し、選択されたパスを返す""" root = tk.Tk() root.withdraw() # メインウィンドウは表示しない # ダイアログを最前面に表示する試み (環境による) root.attributes('-topmost', True) # WSL環境での初期ディレクトリを設定 initial_dir = "/mnt/t/demucs_folder/htdemucs" # Windowsのユーザーディレクトリを初期値として設定 selected_path = filedialog.askdirectory( title="処理対象のディレクトリを選択してください", initialdir=initial_dir ) root.attributes('-topmost', False) root.destroy() # Tkinterウィンドウを破棄 return selected_path if selected_path else None target_directory_arg = get_directory_from_gui_local() if not target_directory_arg: print("ディレクトリが選択されませんでした。処理を中止します。") sys.exit(0) # 正常終了 # formats_arg_str と device_arg_str は初期化されたデフォルト値を使用 print(f"GUIで選択されたディレクトリ: {target_directory_arg}") print(f"出力フォーマット (デフォルト): {formats_arg_str}") # device_arg_strがNoneの場合、後続の処理で自動判別される except ImportError: print("エラー: GUIモードに必要なTkinterライブラリが見つかりません。") print("Tkinterをインストールするか、コマンドライン引数を使用してスクリプトを実行してください。例:") print(f" python {Path(sys.argv[0]).name} /path/to/your/audio_directory") sys.exit(1) # エラー終了 except Exception as e_gui: print(f"GUIの表示中に予期せぬエラーが発生しました: {e_gui}") sys.exit(1) # エラー終了 else: # コマンドライン引数がある場合 parser = argparse.ArgumentParser( description="指定されたディレクトリ内の音声/動画ファイルをNVIDIA Parakeet ASRモデルで文字起こしします。\n" f"同じ名前のファイルが複数ある場合、{' > '.join(INPUT_PRIORITY_EXTENSIONS)} の優先順位で処理します。", formatter_class=argparse.RawTextHelpFormatter ) parser.add_argument( # 最初の引数は必須のディレクトリ "target_directory", type=str, help="処理対象のファイルが含まれるディレクトリのパス。" ) parser.add_argument( "--formats", type=str, default=",".join(DEFAULT_OUTPUT_FORMATS), help=(f"出力する文字起こしファイルの形式をカンマ区切りで指定。\n" f"例: csv,srt (デフォルト: {','.join(DEFAULT_OUTPUT_FORMATS)})\n" f"利用可能な形式: {','.join(DEFAULT_OUTPUT_FORMATS)}") ) parser.add_argument( "--device", type=str, default=None, choices=['cuda', 'cpu'], help="使用するデバイスを指定 (cuda または cpu)。指定がなければ自動判別。" ) args = parser.parse_args() # sys.argv[1:] から解析 target_directory_arg = args.target_directory formats_arg_str = args.formats device_arg_str = args.device # --- 共通のセットアップ処理 --- if device_arg_str: selected_device = device_arg_str else: selected_device = "cuda" if torch.cuda.is_available() else "cpu" print(f"使用デバイス: {selected_device.upper()}") if selected_device == "cuda": if not torch.cuda.is_available(): print("警告: CUDA指定だが利用不可。CPUを使用します。"); selected_device = "cpu" else: try: print(f"CUDAデバイス名: {torch.cuda.get_device_name(0)}") except Exception as e_cuda_name: print(f"CUDAデバイス名の取得失敗: {e_cuda_name}") print(f"ASRモデル '{MODEL_NAME}' をロードしています...") asr_model_main: Optional[ASRModel] = None try: asr_model_main = ASRModel.from_pretrained(model_name=MODEL_NAME) asr_model_main.eval() print(f"モデル '{MODEL_NAME}' のロード完了。") except Exception as model_load_e: print(f"致命的エラー: ASRモデル '{MODEL_NAME}' のロードに失敗: {model_load_e}"); sys.exit(1) output_formats_requested = [fmt.strip().lower() for fmt in formats_arg_str.split(',') if fmt.strip()] final_output_formats_to_use = [fmt for fmt in output_formats_requested if fmt in DEFAULT_OUTPUT_FORMATS] if not output_formats_requested and formats_arg_str: print(f"警告: 指定された出力フォーマット '{formats_arg_str}' は無効です。") if not final_output_formats_to_use : print(f"情報: 有効な出力フォーマットが指定されなかったため、デフォルトの全形式 ({','.join(DEFAULT_OUTPUT_FORMATS)}) で出力します。") final_output_formats_to_use = DEFAULT_OUTPUT_FORMATS # target_directory_arg が None でないことを確認 (GUIキャンセル時など) if not target_directory_arg: print("エラー: 処理対象のディレクトリが指定されていません。処理を中止します。") sys.exit(1) if not asr_model_main: # 通常、モデルロード失敗で既にexitしているはずだが念のため print("致命的エラー: ASRモデルがロードされていません。処理を中止します。") sys.exit(1) batch_process_directory( target_directory_arg, asr_model_main, selected_device, output_formats=final_output_formats_to_use )