Spaces:
Running
on
Zero
Running
on
Zero
import os | |
import json | |
import time | |
import sys | |
import requests | |
from pathlib import Path | |
# from pydub import AudioSegment # 現在のコードでは直接使用されていません | |
import argparse | |
from typing import List, Optional, Dict | |
import shutil | |
import subprocess | |
import collections # 追加 | |
import tkinter as tk | |
from tkinter import filedialog | |
try: | |
from gradio_client import Client, file as gradio_file | |
GRADIO_CLIENT_AVAILABLE = True | |
except ImportError: | |
GRADIO_CLIENT_AVAILABLE = False | |
print("Warning: gradio_client not found. Please install it with: pip install gradio_client") | |
# グローバル設定 | |
CHUNK_LENGTH_SECONDS = 3600 | |
CHUNK_OVERLAP_SECONDS = 30 | |
SPACE_URL = "https://sungo-ganpare-parakeet-tdt-0-6b-v2.hf.space" | |
MAX_VTT_SIZE_BYTES = 100 * 1024 * 1024 | |
TARGET_AUDIO_VIDEO_EXTENSIONS = [ | |
'.wav', '.mp3', '.m4a', '.flac', '.ogg', | |
'.mp4', '.mkv', '.mov', '.avi', '.webm' | |
] | |
# スキップ判定に使用する代表的な出力ファイルの拡張子 | |
PRIMARY_OUTPUT_EXTENSION_FOR_SKIP_CHECK = '.json' | |
def get_audio_duration_with_ffprobe(audio_path: str) -> Optional[float]: | |
"""ffprobeを使用して音声ファイルの長さを取得""" | |
try: | |
if not shutil.which('ffprobe'): | |
print("Warning: ffprobe not found") | |
return None | |
cmd = ['ffprobe', '-v', 'quiet', '-show_entries', 'format=duration', '-of', 'csv=p=0', audio_path] | |
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) | |
if result.returncode == 0 and result.stdout.strip(): | |
return float(result.stdout.strip()) | |
print(f"Warning: Could not get duration for {Path(audio_path).name} using ffprobe. Return code: {result.returncode}, Error: {result.stderr.strip()}") | |
return None | |
except Exception as e: | |
print(f"Error getting audio duration for {Path(audio_path).name}: {e}") | |
return None | |
def split_audio_with_ffmpeg(audio_path: str, output_dir_base: str, chunk_length_sec: int, overlap_sec: int) -> List[str]: | |
"""ffmpegを使用して音声ファイルを分割。一時チャンクは output_dir_base/temp_chunks 以下に保存""" | |
audio_file_obj = Path(audio_path) | |
try: | |
if not shutil.which('ffmpeg'): | |
print(f"Error: ffmpeg not found. Cannot split {audio_file_obj.name}.") | |
return [] | |
duration_sec = get_audio_duration_with_ffprobe(audio_path) | |
if duration_sec is None: | |
print(f"Could not determine duration for {audio_file_obj.name}. Skipping split.") | |
return [] | |
chunk_paths = [] | |
audio_stem = audio_file_obj.stem | |
# 一時チャンク保存用ディレクトリパス (毎回フルパスで指定) | |
temp_chunk_storage_dir = Path(output_dir_base) / "temp_chunks" / audio_stem | |
temp_chunk_storage_dir.mkdir(parents=True, exist_ok=True) | |
start_sec = 0 | |
chunk_idx = 0 | |
print(f"Splitting {audio_file_obj.name} into chunks (max {chunk_length_sec}s each)...") | |
while start_sec < duration_sec: | |
actual_start_sec = max(0, start_sec - (overlap_sec if start_sec > 0 else 0)) | |
base_end_sec = start_sec + chunk_length_sec | |
actual_end_sec = min(base_end_sec + (overlap_sec if base_end_sec < duration_sec else 0), duration_sec) | |
if actual_start_sec >= actual_end_sec: break | |
chunk_duration = actual_end_sec - actual_start_sec | |
chunk_file_name = f"{audio_stem}_chunk_{chunk_idx:03d}.wav" | |
chunk_file_path = temp_chunk_storage_dir / chunk_file_name | |
cmd = [ | |
'ffmpeg', '-y', '-loglevel', 'error', '-ss', str(actual_start_sec), | |
'-i', audio_path, '-t', str(chunk_duration), | |
'-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', str(chunk_file_path) | |
] | |
try: | |
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) | |
if result.returncode == 0: | |
chunk_paths.append(chunk_file_path.as_posix()) | |
# print(f" Created chunk {chunk_idx+1}: {actual_start_sec:.1f}s - {actual_end_sec:.1f}s -> {chunk_file_name}") | |
else: | |
print(f" Error creating chunk {chunk_idx+1} for {audio_file_obj.name}: {result.stderr.strip()}") | |
except subprocess.TimeoutExpired: | |
print(f" Timeout creating chunk {chunk_idx+1} for {audio_file_obj.name}") | |
start_sec += chunk_length_sec | |
chunk_idx += 1 | |
if chunk_paths: print(f" Finished splitting {audio_file_obj.name} into {len(chunk_paths)} chunks.") | |
else: print(f" No chunks created for {audio_file_obj.name}.") | |
return chunk_paths | |
except Exception as e: | |
print(f"Error splitting audio {audio_file_obj.name}: {e}") | |
return [] | |
# test_space_connection, process_chunk, write_srt, write_vtt, write_json_output, write_lrc は前回とほぼ同じ | |
# (ログ出力にファイル名を追加するなどの微調整は有効) | |
class GPUQuotaExceededError(Exception): | |
"""GPU制限に達した場合の例外""" | |
pass | |
def process_chunk(chunk_path: str, original_audio_filename: str) -> Optional[Dict]: | |
"""チャンクをSpaceに送信して処理""" | |
chunk_name = Path(chunk_path).name | |
if not GRADIO_CLIENT_AVAILABLE: | |
print(f"Error (gradio_client unavailable) processing {chunk_name} for {original_audio_filename}") | |
return None | |
try: | |
client = None | |
for attempt in range(3): | |
try: | |
# print(f" Connecting to Space (attempt {attempt + 1}/3) for {chunk_name}...") | |
client = Client(SPACE_URL) | |
break | |
except Exception as e: | |
error_msg = str(e).lower() | |
# GPU制限エラーを検知 | |
if any(keyword in error_msg for keyword in ['gpu', 'quota', 'limit', 'exceeded', 'unavailable']): | |
print(f" GPU quota exceeded detected: {e}") | |
raise GPUQuotaExceededError(f"GPU quota exceeded: {e}") | |
print(f" Connection attempt {attempt + 1} for {chunk_name} (from {original_audio_filename}) failed: {e}") | |
if attempt < 2: time.sleep(5) | |
else: raise | |
if client is None: return None | |
# print(f" Sending chunk to Space: {chunk_name} (from {original_audio_filename})") | |
result = None | |
api_methods_to_try = [{"name": "fn_index=1", "fn_index": 1}, {"name": "fn_index=0", "fn_index": 0}, {"name": "default", "fn_index": None}] | |
for method_info in api_methods_to_try: | |
try: | |
if method_info["fn_index"] is not None: | |
result = client.predict(gradio_file(chunk_path), fn_index=method_info["fn_index"]) | |
else: | |
result = client.predict(gradio_file(chunk_path)) | |
# print(f" Successfully used API method '{method_info['name']}' for {chunk_name}") | |
break | |
except Exception as api_e: | |
error_msg = str(api_e).lower() | |
# GPU制限エラーを検知 | |
if any(keyword in error_msg for keyword in ['gpu', 'quota', 'limit', 'exceeded', 'unavailable', 'out of memory', 'resource']): | |
print(f" GPU quota exceeded during API call: {api_e}") | |
raise GPUQuotaExceededError(f"GPU quota exceeded during API call: {api_e}") | |
# print(f" API method '{method_info['name']}' for {chunk_name} failed: {api_e}") | |
result = None | |
if result is None: | |
print(f" All API call methods failed for {chunk_name} (from {original_audio_filename})") | |
return None | |
# print(f" Received response from Space for {chunk_name} (type: {type(result)})") | |
if isinstance(result, dict): | |
return result | |
elif isinstance(result, str): | |
try: return json.loads(result) | |
except json.JSONDecodeError: | |
print(f" Failed to parse JSON response for {chunk_name}: {result[:100]}...") | |
return None | |
else: | |
print(f" Unexpected response format for {chunk_name}: {type(result)}") | |
return None | |
except GPUQuotaExceededError: | |
# GPU制限エラーは再発生させて上位で処理 | |
raise | |
except Exception as e: | |
error_msg = str(e).lower() | |
# 最後の砦としてもう一度GPU制限エラーをチェック | |
if any(keyword in error_msg for keyword in ['gpu', 'quota', 'limit', 'exceeded', 'unavailable', 'out of memory', 'resource']): | |
print(f"GPU quota exceeded detected in general exception: {e}") | |
raise GPUQuotaExceededError(f"GPU quota exceeded: {e}") | |
print(f"Error sending chunk {chunk_name} (from {original_audio_filename}) to Space: {e}") | |
return None | |
def merge_transcripts(chunk_results: List[Dict], overlap_sec: int, audio_filename: str) -> Dict: | |
merged_segments = [] | |
# print(f"Merging {len(chunk_results)} chunk results for {audio_filename}...") | |
cumulative_offset = 0.0 # チャンク間の累積オフセット | |
for i, chunk_result in enumerate(chunk_results): | |
if not isinstance(chunk_result, dict) or "segments" not in chunk_result: | |
# print(f" Skipping chunk {i+1} (invalid format) for {audio_filename}.") | |
continue | |
if "error" in chunk_result: | |
# print(f" Skipping chunk {i+1} (contains error: {chunk_result['error']}) for {audio_filename}.") | |
continue | |
chunk_start_time_in_global = cumulative_offset | |
last_segment_end_from_this_chunk = 0.0 | |
for seg_idx, seg in enumerate(chunk_result["segments"]): | |
if not (isinstance(seg, dict) and "start" in seg and "end" in seg and "text" in seg): | |
# print(f" Skipping invalid segment in chunk {i+1} of {audio_filename}.") | |
continue | |
original_seg_start = float(seg["start"]) | |
original_seg_end = float(seg["end"]) | |
# 最初のチャンク以外で、セグメントがオーバーラップ期間よりかなり手前から始まる場合はスキップ | |
# (これはチャンク分割とAPIの特性に依存するかもしれないので、慎重に) | |
if i > 0 and original_seg_end < overlap_sec * 0.5: # オーバーラップの中間点より前で終わるものは無視 | |
continue | |
# グローバルタイムラインにマッピング | |
seg_start = original_seg_start + chunk_start_time_in_global | |
seg_end = original_seg_end + chunk_start_time_in_global | |
# 前のマージ済みセグメントとの重複調整 | |
if merged_segments: | |
last_merged_seg_end = merged_segments[-1]["end"] | |
if seg_start < last_merged_seg_end: # 開始がかぶる場合 | |
if seg_end <= last_merged_seg_end: # 完全に内包されるか、同じ終端ならスキップ | |
continue | |
seg_start = last_merged_seg_end # 開始時刻を調整 | |
if seg_start >= seg_end: continue # 調整の結果、無効になったセグメント | |
processed_words = [] | |
if "words" in seg and isinstance(seg["words"], list): | |
for word_data in seg["words"]: | |
if not (isinstance(word_data, dict) and "start" in word_data and "end" in word_data and "word" in word_data): | |
continue | |
w_start = float(word_data["start"]) + chunk_start_time_in_global | |
w_end = float(word_data["end"]) + chunk_start_time_in_global | |
# 単語もセグメントの調整に合わせて調整 | |
w_start = max(w_start, seg_start) | |
w_end = min(w_end, seg_end) | |
if w_start >= w_end: continue | |
processed_words.append({"start": round(w_start, 3), "end": round(w_end, 3), "word": word_data["word"]}) | |
merged_segments.append({ | |
"start": round(seg_start, 3), "end": round(seg_end, 3), | |
"text": seg["text"], "words": processed_words | |
}) | |
last_segment_end_from_this_chunk = max(last_segment_end_from_this_chunk, original_seg_end) | |
# 次のチャンクのためのオフセットを更新 | |
# チャンクの有効長は CHUNK_LENGTH_SECONDS - overlap_sec だが、実際の文字起こし結果の長さに合わせる方が良い場合もある。 | |
# ここでは固定長で進める。APIが必ずしもチャンクいっぱいまで返さない可能性を考慮すると、 | |
# 実際の文字起こしセグメントの最後の終了時刻を基準にする方法もあるが、複雑になる。 | |
if last_segment_end_from_this_chunk > overlap_sec : # 少なくともオーバーラップ分は超えて文字起こしされた | |
cumulative_offset += max(0, last_segment_end_from_this_chunk - overlap_sec) | |
else: # オーバーラップ分すらまともに文字起こしされなかった場合、固定で進める | |
cumulative_offset += (CHUNK_LENGTH_SECONDS - overlap_sec) | |
if merged_segments: print(f" Finished merging transcripts for {audio_filename}.") | |
else: print(f" No segments to merge for {audio_filename}.") | |
return {"segments": merged_segments} | |
def save_transcript(result: Dict, output_path_stem_str: str, audio_filename: str): | |
output_path_obj = Path(output_path_stem_str) | |
# print(f"Saving transcripts for {audio_filename} to files starting with {output_path_obj.name}...") | |
segments_for_output = [] | |
all_words_for_output = [] | |
if "segments" in result and isinstance(result["segments"], list): | |
for seg in result["segments"]: | |
if isinstance(seg, dict) and "start" in seg and "end" in seg and "text" in seg: | |
segments_for_output.append( (seg["start"], seg["end"], seg["text"]) ) | |
if "words" in seg and isinstance(seg["words"], list): | |
for word_info in seg["words"]: | |
if isinstance(word_info, dict) and "start" in word_info and "end" in word_info and "word" in word_info: | |
all_words_for_output.append( (word_info["start"], word_info["end"], word_info["word"]) ) | |
if not segments_for_output: | |
print(f" No segments to write for {audio_filename}. Output files will be empty or not created.") | |
# 空でもファイルを作るか、作らないか。ここでは作る前提で進むが、内容は空になる。 | |
# return # 何も保存しない場合はここでリターン | |
# JSON | |
json_path = output_path_obj.with_suffix(".json") | |
write_json_output(segments_for_output, all_words_for_output, json_path) | |
print(f" Transcript saved: {json_path.name}") | |
# SRT | |
srt_path = output_path_obj.with_suffix(".srt") | |
write_srt(segments_for_output, srt_path) | |
print(f" Transcript saved: {srt_path.name}") | |
# VTT | |
vtt_path = output_path_obj.with_suffix(".vtt") | |
try: | |
write_vtt(segments_for_output, all_words_for_output, vtt_path) | |
print(f" Transcript saved: {vtt_path.name}") | |
except ValueError as e: #主にファイルサイズ超過 | |
print(f" Error saving VTT for {audio_filename} ({vtt_path.name}): {e}") | |
if vtt_path.exists(): | |
try: vtt_path.unlink() | |
except OSError as ose: print(f" Could not delete incomplete VTT file {vtt_path}: {ose}") | |
# LRC | |
lrc_path = output_path_obj.with_suffix(".lrc") | |
write_lrc(segments_for_output, lrc_path) | |
print(f" Transcript saved: {lrc_path.name}") | |
# write_srt, write_vtt, write_json_output, write_lrc は前回から変更なしでOK | |
def write_srt(segments: List, path: Path): | |
def sec2srt(t_float: float) -> str: | |
h, rem = divmod(int(t_float), 3600); m, s = divmod(rem, 60) | |
ms = int((t_float - int(t_float)) * 1000) | |
return f"{h:02}:{m:02}:{s:02},{ms:03}" | |
with open(path, "w", encoding="utf-8") as f: | |
if not segments: f.write("") # 空なら空ファイル | |
for i, seg_list in enumerate(segments, 1): | |
f.write(f"{i}\n{sec2srt(float(seg_list[0]))} --> {sec2srt(float(seg_list[1]))}\n{seg_list[2]}\n\n") | |
def write_vtt(segments: List, words: List, path: Path): # words は all_words_for_output が渡される | |
def sec2vtt(t_float: float) -> str: | |
h, rem = divmod(int(t_float), 3600); m, s = divmod(rem, 60) | |
ms = int((t_float - int(t_float)) * 1000) | |
return f"{h:02}:{m:02}:{s:02}.{ms:03}" | |
with open(path, "w", encoding="utf-8") as f: | |
f.write("WEBVTT\n\n") | |
if not segments: return # セグメントがなければヘッダだけ | |
f.write("STYLE\n") | |
f.write("::cue(.current) { color: #ffff00; font-weight: bold; }\n") | |
f.write("::cue(.past) { color: #888888; }\n") | |
f.write("::cue(.future) { color: #ffffff; }\n") | |
f.write("::cue(.line) { background: rgba(0,0,0,0.7); padding: 4px; }\n\n") | |
# words (単語タイムスタンプ) が提供されていればそれを使う、なければセグメント単位 | |
use_word_timestamps = bool(words) # wordsが空リストでもFalseになる | |
if not use_word_timestamps: | |
for i, seg_data in enumerate(segments, 1): # segments は (start, end, text) のタプルのリスト | |
f.write(f"NOTE Segment {i}\n") | |
f.write(f"{sec2vtt(float(seg_data[0]))} --> {sec2vtt(float(seg_data[1]))}\n{seg_data[2]}\n\n") | |
if f.tell() > MAX_VTT_SIZE_BYTES: | |
raise ValueError(f"VTT file size limit ({MAX_VTT_SIZE_BYTES/1024/1024:.1f}MB) exceeded for {path.name}") | |
return | |
# 以下、単語タイムスタンプがある場合の詳細なVTT生成ロジック (前回と同様) | |
segment_word_map = collections.defaultdict(list) | |
word_iter = iter(sorted(words, key=lambda x: float(x[0]))) # wordsは(start,end,text)のタプルリスト | |
current_word = next(word_iter, None) | |
for seg_idx, seg_data in enumerate(segments): | |
seg_start, seg_end, seg_text_full = float(seg_data[0]), float(seg_data[1]), seg_data[2] | |
while current_word: | |
word_start, word_end_time, word_text = float(current_word[0]), float(current_word[1]), current_word[2] | |
# 単語が現在のセグメントに属するか (開始時間で判断) | |
if word_start < seg_end - 0.01: # わずかな誤差を許容 | |
if word_start >= seg_start - 0.01 : | |
segment_word_map[seg_idx].append(current_word) | |
current_word = next(word_iter, None) # 次の単語へ | |
else: # この単語は次のセグメント以降に属する | |
break | |
# current_wordがNoneになった後も、残りのセグメントを処理する必要があるため、 | |
# word_iterを再初期化するか、またはこのループ構造を見直す必要がある。 | |
# より単純には、各セグメントについて全単語リストをフィルタリングする方が確実。 | |
# 単純化のため、セグメントごとに全単語をフィルタリングする方式に戻す | |
for seg_idx, seg_data in enumerate(segments): | |
seg_start, seg_end, seg_text_full = float(seg_data[0]), float(seg_data[1]), seg_data[2] | |
# このセグメントに含まれる単語を特定 | |
current_segment_words = [] | |
for word_data in words: # words は (start, end, text) のタプルのリスト | |
w_start, w_end = float(word_data[0]), float(word_data[1]) | |
# 単語がセグメントの範囲内にあるか(中央が範囲内、または一部がオーバーラップ) | |
if max(seg_start, w_start) < min(seg_end, w_end): | |
current_segment_words.append(word_data) | |
current_segment_words.sort(key=lambda x: float(x[0])) # 開始時間でソート | |
if not current_segment_words: | |
f.write(f"{sec2vtt(seg_start)} --> {sec2vtt(seg_end)}\n{seg_text_full}\n\n") | |
if f.tell() > MAX_VTT_SIZE_BYTES: raise ValueError(f"VTT size limit for {path.name}") | |
continue | |
all_words_text_in_segment = [w[2] for w in current_segment_words] | |
# セグメント開始から最初の単語まで (必要なら) | |
first_word_actual_start = float(current_segment_words[0][0]) | |
if seg_start < first_word_actual_start - 0.05: | |
f.write(f"{sec2vtt(seg_start)} --> {sec2vtt(first_word_actual_start)}\n") | |
f.write(f'<c.line>{" ".join(f"<c.future>{w_txt}</c>" for w_txt in all_words_text_in_segment)}</c>\n\n') | |
if f.tell() > MAX_VTT_SIZE_BYTES: raise ValueError(f"VTT size limit for {path.name}") | |
for local_idx, word_data in enumerate(current_segment_words): | |
w_s, w_e, w_txt = float(word_data[0]), float(word_data[1]), word_data[2] | |
f.write(f"{sec2vtt(w_s)} --> {sec2vtt(w_e)}\n") | |
line_parts = [f'<c.past>{t}</c>' for i, t in enumerate(all_words_text_in_segment) if i < local_idx] | |
line_parts.append(f'<c.current>{w_txt}</c>') | |
line_parts.extend(f'<c.future>{t}</c>' for i, t in enumerate(all_words_text_in_segment) if i > local_idx) | |
f.write(f'<c.line>{" ".join(line_parts)}</c>\n\n') | |
if f.tell() > MAX_VTT_SIZE_BYTES: raise ValueError(f"VTT size limit for {path.name}") | |
# 単語間の無音期間 (必要なら) | |
if local_idx < len(current_segment_words) - 1: | |
next_word_actual_start = float(current_segment_words[local_idx + 1][0]) | |
if w_e < next_word_actual_start - 0.05: # 50ms以上のギャップ | |
f.write(f"{sec2vtt(w_e)} --> {sec2vtt(next_word_actual_start)}\n") | |
# 現在の単語までpast、残りはfuture | |
past_part = [f'<c.past>{t}</c>' for i, t in enumerate(all_words_text_in_segment) if i <= local_idx] | |
future_part = [f'<c.future>{t}</c>' for i, t in enumerate(all_words_text_in_segment) if i > local_idx] | |
f.write(f'<c.line>{" ".join(past_part + future_part)}</c>\n\n') | |
if f.tell() > MAX_VTT_SIZE_BYTES: raise ValueError(f"VTT size limit for {path.name}") | |
# 最後の単語からセグメント終了まで (必要なら) | |
last_word_actual_end = float(current_segment_words[-1][1]) | |
if last_word_actual_end < seg_end - 0.05: | |
f.write(f"{sec2vtt(last_word_actual_end)} --> {sec2vtt(seg_end)}\n") | |
f.write(f'<c.line>{" ".join(f"<c.past>{w_txt}</c>" for w_txt in all_words_text_in_segment)}</c>\n\n') | |
if f.tell() > MAX_VTT_SIZE_BYTES: raise ValueError(f"VTT size limit for {path.name}") | |
def write_json_output(segments: List, words: List, path: Path): # segments: (start,end,text), words: (start,end,text) | |
result_data = {"segments": []} | |
# wordsをセグメントに割り当てる (write_vttのロジックと似たようなものが必要) | |
# もしwordsが空なら、segmentsのtextだけを使う | |
for seg_start, seg_end, seg_text in segments: | |
segment_entry = {"start": seg_start, "end": seg_end, "text": seg_text, "words": []} | |
if words: | |
# このセグメントに属する単語をフィルタリング | |
# words はソートされていると仮定 (呼び出し元でソートするか、ここでソート) | |
# words.sort(key=lambda x: float(x[0])) # 必要なら | |
for w_start, w_end, w_text in words: | |
# 単語がセグメントの範囲内にあるか (中央が範囲内、または一部がオーバーラップ) | |
if max(seg_start, w_start) < min(seg_end, w_end): # よりシンプルなオーバーラップ判定 | |
segment_entry["words"].append({"start": w_start, "end": w_end, "word": w_text}) | |
result_data["segments"].append(segment_entry) | |
with open(path, "w", encoding="utf-8") as f: | |
json.dump(result_data, f, ensure_ascii=False, indent=2) | |
def write_lrc(segments: List, path: Path): | |
def sec2lrc(t_float: float) -> str: | |
m, s = divmod(float(t_float), 60) | |
return f"[{int(m):02d}:{s:05.2f}]" | |
with open(path, "w", encoding="utf-8") as f: | |
if not segments: f.write("") | |
for seg_list in segments: # seg_list is (start, end, text) | |
f.write(f"{sec2lrc(float(seg_list[0]))}{seg_list[2]}\n") | |
def process_audio_file(input_path_str: str, output_dir_str: str): | |
original_input_path_obj = Path(input_path_str) | |
audio_filename = original_input_path_obj.name | |
print(f"Processing: {audio_filename}") | |
temp_wav_path_obj: Optional[Path] = None | |
current_processing_input_path = input_path_str # MP4等の場合、変換後のWAVパスに更新 | |
output_dir_path = Path(output_dir_str) | |
# 一時ファイル用ディレクトリ (入力ファイルごと) | |
# 例: output_dir/temp_processing/input_file_stem/ | |
base_temp_dir = output_dir_path / "temp_processing" / original_input_path_obj.stem | |
temp_conversion_dir = base_temp_dir / "conversion" | |
# チャンクは split_audio_with_ffmpeg 内で output_dir_path / "temp_chunks" / audio_stem に保存される | |
try: # WAV以外の入力はWAV (16kHz, mono) に変換 | |
if original_input_path_obj.suffix.lower() not in ['.wav']: | |
print(f" Converting {audio_filename} to WAV...") | |
temp_conversion_dir.mkdir(parents=True, exist_ok=True) | |
temp_wav_path_obj = temp_conversion_dir / f"{original_input_path_obj.stem}_converted.wav" | |
if not shutil.which('ffmpeg'): | |
print(f" Error: ffmpeg not found. Cannot convert {audio_filename}.") | |
return | |
cmd = [ | |
'ffmpeg', '-y', '-loglevel', 'error', '-i', input_path_str, | |
'-vn', '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', | |
temp_wav_path_obj.as_posix() | |
] | |
try: | |
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600) | |
if result.returncode == 0: | |
print(f" Successfully converted to {temp_wav_path_obj.name}") | |
current_processing_input_path = temp_wav_path_obj.as_posix() | |
else: | |
print(f" Error converting {audio_filename} to WAV: {result.stderr.strip()}") | |
return | |
except subprocess.TimeoutExpired: | |
print(f" Timeout converting {audio_filename} to WAV.") | |
return | |
except Exception as e_conv: | |
print(f" Exception during WAV conversion for {audio_filename}: {e_conv}") | |
return | |
# 音声分割 (出力ディレクトリのベースパスを渡す) | |
chunk_paths = split_audio_with_ffmpeg( | |
current_processing_input_path, output_dir_path.as_posix(), | |
CHUNK_LENGTH_SECONDS, CHUNK_OVERLAP_SECONDS | |
) | |
if not chunk_paths: | |
print(f" Failed to split {audio_filename}. Skipping transcription.") | |
return | |
print(f" Processing {len(chunk_paths)} chunks for {audio_filename} via API...") | |
chunk_results = [] | |
for i, chunk_p_str in enumerate(chunk_paths): | |
try: | |
api_result = process_chunk(chunk_p_str, audio_filename) | |
if api_result: | |
chunk_results.append(api_result) | |
print(f" Successfully processed chunk {i+1}/{len(chunk_paths)}") | |
else: | |
print(f" Failed to process chunk {i+1}/{len(chunk_paths)}") | |
except GPUQuotaExceededError as gpu_error: | |
print(f" GPU quota exceeded while processing {audio_filename}") | |
print(f" Error: {gpu_error}") | |
print(f" GPU制限に達しました。処理を強制終了します。") | |
raise # main()関数で捕捉するために再発生 | |
# APIリクエスト間の待機時間を追加 | |
if i < len(chunk_paths) - 1: # 最後のチャンクの後は待機不要 | |
wait_seconds = 5 | |
print(f" Waiting for {wait_seconds} seconds before processing next chunk...") | |
time.sleep(wait_seconds) | |
if not chunk_results: | |
print(f" No chunks successfully processed via API for {audio_filename}.") | |
return | |
merged_result = merge_transcripts(chunk_results, CHUNK_OVERLAP_SECONDS, audio_filename) | |
output_stem_str = (output_dir_path / original_input_path_obj.stem).as_posix() | |
save_transcript(merged_result, output_stem_str, audio_filename) | |
except Exception as e_main_proc: | |
print(f"An unexpected error occurred while processing {audio_filename}: {e_main_proc}") | |
import traceback | |
traceback.print_exc() | |
finally: | |
# 一時ファイル/ディレクトリの削除 | |
# split_audio_with_ffmpeg で作られたチャンク用ディレクトリを削除 | |
chunk_temp_parent_dir = output_dir_path / "temp_chunks" / original_input_path_obj.stem | |
if chunk_temp_parent_dir.exists(): | |
try: | |
shutil.rmtree(chunk_temp_parent_dir) | |
# print(f" Deleted temporary chunk directory: {chunk_temp_parent_dir}") | |
except OSError as e_del_chunk: | |
print(f" Error deleting temp chunk dir {chunk_temp_parent_dir}: {e_del_chunk}") | |
if temp_conversion_dir.exists() and temp_conversion_dir.parent == base_temp_dir : # base_temp_dirごと消すので個別削除は不要 | |
pass | |
# temp_processing/input_file_stem ディレクトリ全体を削除 | |
if base_temp_dir.exists(): | |
try: | |
shutil.rmtree(base_temp_dir) | |
print(f" Cleaned up temporary processing directory: {base_temp_dir}") | |
except OSError as e_del_base: | |
print(f" Error deleting base temp dir {base_temp_dir}: {e_del_base}") | |
def main(): | |
parser = argparse.ArgumentParser( | |
description="Transcribes audio/video files from a specified path (file or directory). " | |
"Outputs are saved in the same location as input files. " | |
"Skips already processed files (checks for .json output). " | |
"Prefers MP3 over MP4 if both exist with the same base name." | |
) | |
parser.add_argument( | |
"input_path", | |
nargs="?", # オプショナルにする | |
help="Path to an input audio/video file or a directory containing such files." | |
) | |
args = parser.parse_args() | |
# 引数が指定されていない場合はGUIでファイル選択 | |
if args.input_path is None: | |
root = tk.Tk() | |
root.withdraw() # メインウィンドウを非表示 | |
input_path = filedialog.askdirectory( | |
title="処理したい音声/動画ファイルのあるフォルダを選択してください", | |
initialdir=os.getcwd() | |
) | |
if not input_path: # キャンセルされた場合 | |
print("フォルダが選択されませんでした。") | |
return | |
input_path_obj = Path(input_path) | |
else: | |
input_path_obj = Path(args.input_path) | |
if not input_path_obj.exists(): | |
print(f"Error: Input path '{args.input_path}' does not exist.") | |
return | |
# 1. 処理対象候補のファイルリストを作成 (MP3優先ロジックを含む) | |
files_to_consider_processing = [] | |
if input_path_obj.is_file(): | |
if input_path_obj.suffix.lower() in TARGET_AUDIO_VIDEO_EXTENSIONS: | |
files_to_consider_processing.append(input_path_obj) | |
else: | |
print(f"Input file '{input_path_obj.name}' is not a supported type. Supported: {TARGET_AUDIO_VIDEO_EXTENSIONS}") | |
elif input_path_obj.is_dir(): | |
print(f"Scanning directory: {input_path_obj.resolve()}") | |
# ベース名でファイルをグループ化 | |
grouped_files = collections.defaultdict(list) | |
for item in sorted(input_path_obj.iterdir()): # sortedで処理順をある程度一定に | |
if item.is_file() and item.suffix.lower() in TARGET_AUDIO_VIDEO_EXTENSIONS: | |
grouped_files[item.stem].append(item) | |
if not grouped_files: | |
print(f"No supported files found in directory: {input_path_obj.resolve()}") | |
return | |
for base_name, file_group in grouped_files.items(): | |
mp3_file = next((f for f in file_group if f.suffix.lower() == '.mp3'), None) | |
mp4_file = next((f for f in file_group if f.suffix.lower() == '.mp4'), None) | |
chosen_file = None | |
if mp3_file: | |
chosen_file = mp3_file | |
if mp4_file and mp4_file != mp3_file: # MP4も存在する場合 (通常は別ファイルのはず) | |
print(f" MP3 found for '{base_name}', prioritizing '{mp3_file.name}' over '{mp4_file.name}'.") | |
elif mp4_file: | |
chosen_file = mp4_file | |
else: # MP3もMP4もない場合、リストの最初のファイル(何らかの音声/動画ファイル) | |
# TARGET_AUDIO_VIDEO_EXTENSIONS の順序やファイル名のソート順に依存する可能性あり | |
if file_group: chosen_file = file_group[0] | |
if chosen_file: | |
files_to_consider_processing.append(chosen_file) | |
else: | |
print(f"Error: Input path '{args.input_path}' is not a valid file or directory.") | |
return | |
if not files_to_consider_processing: | |
print("No files selected for processing.") | |
return | |
# 2. 処理済みファイルをスキップ | |
actual_files_to_process = [] | |
print(f"\nFound {len(files_to_consider_processing)} potential file(s). Checking for existing transcripts...") | |
for file_path in files_to_consider_processing: | |
output_dir = file_path.parent | |
# 代表的な出力ファイル (例: .json) の存在でスキップ判定 | |
expected_output_file = output_dir / f"{file_path.stem}{PRIMARY_OUTPUT_EXTENSION_FOR_SKIP_CHECK}" | |
if expected_output_file.exists(): | |
print(f" Skipping '{file_path.name}': Output '{expected_output_file.name}' already exists.") | |
else: | |
actual_files_to_process.append(file_path) | |
if not actual_files_to_process: | |
print("\nNo new files to process. All selected files seem to have existing transcripts.") | |
return | |
total_to_process_count = len(actual_files_to_process) | |
print(f"\nStarting processing for {total_to_process_count} new file(s)...") | |
for i, file_to_process_obj in enumerate(actual_files_to_process): | |
print(f"\n--- [{i+1}/{total_to_process_count}] Processing: {file_to_process_obj.name} ---") | |
output_dir_for_this_file = file_to_process_obj.parent.as_posix() | |
try: | |
process_audio_file(file_to_process_obj.as_posix(), output_dir_for_this_file) | |
print(f"--- Finished: {file_to_process_obj.name} ---") | |
except GPUQuotaExceededError as gpu_error: | |
print(f"\n=== GPU QUOTA EXCEEDED ===") | |
print(f"処理を中断します。GPU制限に達しました。") | |
print(f"Error details: {gpu_error}") | |
sys.exit(1) # 即座に強制終了 | |
print(f"\nAll {total_to_process_count} new file(s) processed.") | |
if __name__ == "__main__": | |
if not GRADIO_CLIENT_AVAILABLE: | |
print("Critical: gradio_client library is not installed. Please run: pip install gradio_client") | |
else: | |
main() |