parakeet-tdt-0.6b-v2 / local_controller.py
sungo-ganpare's picture
GPU制限エラーを検知する例外クラスを追加し、処理中にGPU制限に達した場合のエラーハンドリングを強化。音声ファイル処理時にエラーを適切に報告するように修正。
a7307d4
import os
import json
import time
import sys
import requests
from pathlib import Path
# from pydub import AudioSegment # 現在のコードでは直接使用されていません
import argparse
from typing import List, Optional, Dict
import shutil
import subprocess
import collections # 追加
import tkinter as tk
from tkinter import filedialog
try:
from gradio_client import Client, file as gradio_file
GRADIO_CLIENT_AVAILABLE = True
except ImportError:
GRADIO_CLIENT_AVAILABLE = False
print("Warning: gradio_client not found. Please install it with: pip install gradio_client")
# グローバル設定
CHUNK_LENGTH_SECONDS = 3600
CHUNK_OVERLAP_SECONDS = 30
SPACE_URL = "https://sungo-ganpare-parakeet-tdt-0-6b-v2.hf.space"
MAX_VTT_SIZE_BYTES = 100 * 1024 * 1024
TARGET_AUDIO_VIDEO_EXTENSIONS = [
'.wav', '.mp3', '.m4a', '.flac', '.ogg',
'.mp4', '.mkv', '.mov', '.avi', '.webm'
]
# スキップ判定に使用する代表的な出力ファイルの拡張子
PRIMARY_OUTPUT_EXTENSION_FOR_SKIP_CHECK = '.json'
def get_audio_duration_with_ffprobe(audio_path: str) -> Optional[float]:
"""ffprobeを使用して音声ファイルの長さを取得"""
try:
if not shutil.which('ffprobe'):
print("Warning: ffprobe not found")
return None
cmd = ['ffprobe', '-v', 'quiet', '-show_entries', 'format=duration', '-of', 'csv=p=0', audio_path]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0 and result.stdout.strip():
return float(result.stdout.strip())
print(f"Warning: Could not get duration for {Path(audio_path).name} using ffprobe. Return code: {result.returncode}, Error: {result.stderr.strip()}")
return None
except Exception as e:
print(f"Error getting audio duration for {Path(audio_path).name}: {e}")
return None
def split_audio_with_ffmpeg(audio_path: str, output_dir_base: str, chunk_length_sec: int, overlap_sec: int) -> List[str]:
"""ffmpegを使用して音声ファイルを分割。一時チャンクは output_dir_base/temp_chunks 以下に保存"""
audio_file_obj = Path(audio_path)
try:
if not shutil.which('ffmpeg'):
print(f"Error: ffmpeg not found. Cannot split {audio_file_obj.name}.")
return []
duration_sec = get_audio_duration_with_ffprobe(audio_path)
if duration_sec is None:
print(f"Could not determine duration for {audio_file_obj.name}. Skipping split.")
return []
chunk_paths = []
audio_stem = audio_file_obj.stem
# 一時チャンク保存用ディレクトリパス (毎回フルパスで指定)
temp_chunk_storage_dir = Path(output_dir_base) / "temp_chunks" / audio_stem
temp_chunk_storage_dir.mkdir(parents=True, exist_ok=True)
start_sec = 0
chunk_idx = 0
print(f"Splitting {audio_file_obj.name} into chunks (max {chunk_length_sec}s each)...")
while start_sec < duration_sec:
actual_start_sec = max(0, start_sec - (overlap_sec if start_sec > 0 else 0))
base_end_sec = start_sec + chunk_length_sec
actual_end_sec = min(base_end_sec + (overlap_sec if base_end_sec < duration_sec else 0), duration_sec)
if actual_start_sec >= actual_end_sec: break
chunk_duration = actual_end_sec - actual_start_sec
chunk_file_name = f"{audio_stem}_chunk_{chunk_idx:03d}.wav"
chunk_file_path = temp_chunk_storage_dir / chunk_file_name
cmd = [
'ffmpeg', '-y', '-loglevel', 'error', '-ss', str(actual_start_sec),
'-i', audio_path, '-t', str(chunk_duration),
'-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', str(chunk_file_path)
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode == 0:
chunk_paths.append(chunk_file_path.as_posix())
# print(f" Created chunk {chunk_idx+1}: {actual_start_sec:.1f}s - {actual_end_sec:.1f}s -> {chunk_file_name}")
else:
print(f" Error creating chunk {chunk_idx+1} for {audio_file_obj.name}: {result.stderr.strip()}")
except subprocess.TimeoutExpired:
print(f" Timeout creating chunk {chunk_idx+1} for {audio_file_obj.name}")
start_sec += chunk_length_sec
chunk_idx += 1
if chunk_paths: print(f" Finished splitting {audio_file_obj.name} into {len(chunk_paths)} chunks.")
else: print(f" No chunks created for {audio_file_obj.name}.")
return chunk_paths
except Exception as e:
print(f"Error splitting audio {audio_file_obj.name}: {e}")
return []
# test_space_connection, process_chunk, write_srt, write_vtt, write_json_output, write_lrc は前回とほぼ同じ
# (ログ出力にファイル名を追加するなどの微調整は有効)
class GPUQuotaExceededError(Exception):
"""GPU制限に達した場合の例外"""
pass
def process_chunk(chunk_path: str, original_audio_filename: str) -> Optional[Dict]:
"""チャンクをSpaceに送信して処理"""
chunk_name = Path(chunk_path).name
if not GRADIO_CLIENT_AVAILABLE:
print(f"Error (gradio_client unavailable) processing {chunk_name} for {original_audio_filename}")
return None
try:
client = None
for attempt in range(3):
try:
# print(f" Connecting to Space (attempt {attempt + 1}/3) for {chunk_name}...")
client = Client(SPACE_URL)
break
except Exception as e:
error_msg = str(e).lower()
# GPU制限エラーを検知
if any(keyword in error_msg for keyword in ['gpu', 'quota', 'limit', 'exceeded', 'unavailable']):
print(f" GPU quota exceeded detected: {e}")
raise GPUQuotaExceededError(f"GPU quota exceeded: {e}")
print(f" Connection attempt {attempt + 1} for {chunk_name} (from {original_audio_filename}) failed: {e}")
if attempt < 2: time.sleep(5)
else: raise
if client is None: return None
# print(f" Sending chunk to Space: {chunk_name} (from {original_audio_filename})")
result = None
api_methods_to_try = [{"name": "fn_index=1", "fn_index": 1}, {"name": "fn_index=0", "fn_index": 0}, {"name": "default", "fn_index": None}]
for method_info in api_methods_to_try:
try:
if method_info["fn_index"] is not None:
result = client.predict(gradio_file(chunk_path), fn_index=method_info["fn_index"])
else:
result = client.predict(gradio_file(chunk_path))
# print(f" Successfully used API method '{method_info['name']}' for {chunk_name}")
break
except Exception as api_e:
error_msg = str(api_e).lower()
# GPU制限エラーを検知
if any(keyword in error_msg for keyword in ['gpu', 'quota', 'limit', 'exceeded', 'unavailable', 'out of memory', 'resource']):
print(f" GPU quota exceeded during API call: {api_e}")
raise GPUQuotaExceededError(f"GPU quota exceeded during API call: {api_e}")
# print(f" API method '{method_info['name']}' for {chunk_name} failed: {api_e}")
result = None
if result is None:
print(f" All API call methods failed for {chunk_name} (from {original_audio_filename})")
return None
# print(f" Received response from Space for {chunk_name} (type: {type(result)})")
if isinstance(result, dict):
return result
elif isinstance(result, str):
try: return json.loads(result)
except json.JSONDecodeError:
print(f" Failed to parse JSON response for {chunk_name}: {result[:100]}...")
return None
else:
print(f" Unexpected response format for {chunk_name}: {type(result)}")
return None
except GPUQuotaExceededError:
# GPU制限エラーは再発生させて上位で処理
raise
except Exception as e:
error_msg = str(e).lower()
# 最後の砦としてもう一度GPU制限エラーをチェック
if any(keyword in error_msg for keyword in ['gpu', 'quota', 'limit', 'exceeded', 'unavailable', 'out of memory', 'resource']):
print(f"GPU quota exceeded detected in general exception: {e}")
raise GPUQuotaExceededError(f"GPU quota exceeded: {e}")
print(f"Error sending chunk {chunk_name} (from {original_audio_filename}) to Space: {e}")
return None
def merge_transcripts(chunk_results: List[Dict], overlap_sec: int, audio_filename: str) -> Dict:
merged_segments = []
# print(f"Merging {len(chunk_results)} chunk results for {audio_filename}...")
cumulative_offset = 0.0 # チャンク間の累積オフセット
for i, chunk_result in enumerate(chunk_results):
if not isinstance(chunk_result, dict) or "segments" not in chunk_result:
# print(f" Skipping chunk {i+1} (invalid format) for {audio_filename}.")
continue
if "error" in chunk_result:
# print(f" Skipping chunk {i+1} (contains error: {chunk_result['error']}) for {audio_filename}.")
continue
chunk_start_time_in_global = cumulative_offset
last_segment_end_from_this_chunk = 0.0
for seg_idx, seg in enumerate(chunk_result["segments"]):
if not (isinstance(seg, dict) and "start" in seg and "end" in seg and "text" in seg):
# print(f" Skipping invalid segment in chunk {i+1} of {audio_filename}.")
continue
original_seg_start = float(seg["start"])
original_seg_end = float(seg["end"])
# 最初のチャンク以外で、セグメントがオーバーラップ期間よりかなり手前から始まる場合はスキップ
# (これはチャンク分割とAPIの特性に依存するかもしれないので、慎重に)
if i > 0 and original_seg_end < overlap_sec * 0.5: # オーバーラップの中間点より前で終わるものは無視
continue
# グローバルタイムラインにマッピング
seg_start = original_seg_start + chunk_start_time_in_global
seg_end = original_seg_end + chunk_start_time_in_global
# 前のマージ済みセグメントとの重複調整
if merged_segments:
last_merged_seg_end = merged_segments[-1]["end"]
if seg_start < last_merged_seg_end: # 開始がかぶる場合
if seg_end <= last_merged_seg_end: # 完全に内包されるか、同じ終端ならスキップ
continue
seg_start = last_merged_seg_end # 開始時刻を調整
if seg_start >= seg_end: continue # 調整の結果、無効になったセグメント
processed_words = []
if "words" in seg and isinstance(seg["words"], list):
for word_data in seg["words"]:
if not (isinstance(word_data, dict) and "start" in word_data and "end" in word_data and "word" in word_data):
continue
w_start = float(word_data["start"]) + chunk_start_time_in_global
w_end = float(word_data["end"]) + chunk_start_time_in_global
# 単語もセグメントの調整に合わせて調整
w_start = max(w_start, seg_start)
w_end = min(w_end, seg_end)
if w_start >= w_end: continue
processed_words.append({"start": round(w_start, 3), "end": round(w_end, 3), "word": word_data["word"]})
merged_segments.append({
"start": round(seg_start, 3), "end": round(seg_end, 3),
"text": seg["text"], "words": processed_words
})
last_segment_end_from_this_chunk = max(last_segment_end_from_this_chunk, original_seg_end)
# 次のチャンクのためのオフセットを更新
# チャンクの有効長は CHUNK_LENGTH_SECONDS - overlap_sec だが、実際の文字起こし結果の長さに合わせる方が良い場合もある。
# ここでは固定長で進める。APIが必ずしもチャンクいっぱいまで返さない可能性を考慮すると、
# 実際の文字起こしセグメントの最後の終了時刻を基準にする方法もあるが、複雑になる。
if last_segment_end_from_this_chunk > overlap_sec : # 少なくともオーバーラップ分は超えて文字起こしされた
cumulative_offset += max(0, last_segment_end_from_this_chunk - overlap_sec)
else: # オーバーラップ分すらまともに文字起こしされなかった場合、固定で進める
cumulative_offset += (CHUNK_LENGTH_SECONDS - overlap_sec)
if merged_segments: print(f" Finished merging transcripts for {audio_filename}.")
else: print(f" No segments to merge for {audio_filename}.")
return {"segments": merged_segments}
def save_transcript(result: Dict, output_path_stem_str: str, audio_filename: str):
output_path_obj = Path(output_path_stem_str)
# print(f"Saving transcripts for {audio_filename} to files starting with {output_path_obj.name}...")
segments_for_output = []
all_words_for_output = []
if "segments" in result and isinstance(result["segments"], list):
for seg in result["segments"]:
if isinstance(seg, dict) and "start" in seg and "end" in seg and "text" in seg:
segments_for_output.append( (seg["start"], seg["end"], seg["text"]) )
if "words" in seg and isinstance(seg["words"], list):
for word_info in seg["words"]:
if isinstance(word_info, dict) and "start" in word_info and "end" in word_info and "word" in word_info:
all_words_for_output.append( (word_info["start"], word_info["end"], word_info["word"]) )
if not segments_for_output:
print(f" No segments to write for {audio_filename}. Output files will be empty or not created.")
# 空でもファイルを作るか、作らないか。ここでは作る前提で進むが、内容は空になる。
# return # 何も保存しない場合はここでリターン
# JSON
json_path = output_path_obj.with_suffix(".json")
write_json_output(segments_for_output, all_words_for_output, json_path)
print(f" Transcript saved: {json_path.name}")
# SRT
srt_path = output_path_obj.with_suffix(".srt")
write_srt(segments_for_output, srt_path)
print(f" Transcript saved: {srt_path.name}")
# VTT
vtt_path = output_path_obj.with_suffix(".vtt")
try:
write_vtt(segments_for_output, all_words_for_output, vtt_path)
print(f" Transcript saved: {vtt_path.name}")
except ValueError as e: #主にファイルサイズ超過
print(f" Error saving VTT for {audio_filename} ({vtt_path.name}): {e}")
if vtt_path.exists():
try: vtt_path.unlink()
except OSError as ose: print(f" Could not delete incomplete VTT file {vtt_path}: {ose}")
# LRC
lrc_path = output_path_obj.with_suffix(".lrc")
write_lrc(segments_for_output, lrc_path)
print(f" Transcript saved: {lrc_path.name}")
# write_srt, write_vtt, write_json_output, write_lrc は前回から変更なしでOK
def write_srt(segments: List, path: Path):
def sec2srt(t_float: float) -> str:
h, rem = divmod(int(t_float), 3600); m, s = divmod(rem, 60)
ms = int((t_float - int(t_float)) * 1000)
return f"{h:02}:{m:02}:{s:02},{ms:03}"
with open(path, "w", encoding="utf-8") as f:
if not segments: f.write("") # 空なら空ファイル
for i, seg_list in enumerate(segments, 1):
f.write(f"{i}\n{sec2srt(float(seg_list[0]))} --> {sec2srt(float(seg_list[1]))}\n{seg_list[2]}\n\n")
def write_vtt(segments: List, words: List, path: Path): # words は all_words_for_output が渡される
def sec2vtt(t_float: float) -> str:
h, rem = divmod(int(t_float), 3600); m, s = divmod(rem, 60)
ms = int((t_float - int(t_float)) * 1000)
return f"{h:02}:{m:02}:{s:02}.{ms:03}"
with open(path, "w", encoding="utf-8") as f:
f.write("WEBVTT\n\n")
if not segments: return # セグメントがなければヘッダだけ
f.write("STYLE\n")
f.write("::cue(.current) { color: #ffff00; font-weight: bold; }\n")
f.write("::cue(.past) { color: #888888; }\n")
f.write("::cue(.future) { color: #ffffff; }\n")
f.write("::cue(.line) { background: rgba(0,0,0,0.7); padding: 4px; }\n\n")
# words (単語タイムスタンプ) が提供されていればそれを使う、なければセグメント単位
use_word_timestamps = bool(words) # wordsが空リストでもFalseになる
if not use_word_timestamps:
for i, seg_data in enumerate(segments, 1): # segments は (start, end, text) のタプルのリスト
f.write(f"NOTE Segment {i}\n")
f.write(f"{sec2vtt(float(seg_data[0]))} --> {sec2vtt(float(seg_data[1]))}\n{seg_data[2]}\n\n")
if f.tell() > MAX_VTT_SIZE_BYTES:
raise ValueError(f"VTT file size limit ({MAX_VTT_SIZE_BYTES/1024/1024:.1f}MB) exceeded for {path.name}")
return
# 以下、単語タイムスタンプがある場合の詳細なVTT生成ロジック (前回と同様)
segment_word_map = collections.defaultdict(list)
word_iter = iter(sorted(words, key=lambda x: float(x[0]))) # wordsは(start,end,text)のタプルリスト
current_word = next(word_iter, None)
for seg_idx, seg_data in enumerate(segments):
seg_start, seg_end, seg_text_full = float(seg_data[0]), float(seg_data[1]), seg_data[2]
while current_word:
word_start, word_end_time, word_text = float(current_word[0]), float(current_word[1]), current_word[2]
# 単語が現在のセグメントに属するか (開始時間で判断)
if word_start < seg_end - 0.01: # わずかな誤差を許容
if word_start >= seg_start - 0.01 :
segment_word_map[seg_idx].append(current_word)
current_word = next(word_iter, None) # 次の単語へ
else: # この単語は次のセグメント以降に属する
break
# current_wordがNoneになった後も、残りのセグメントを処理する必要があるため、
# word_iterを再初期化するか、またはこのループ構造を見直す必要がある。
# より単純には、各セグメントについて全単語リストをフィルタリングする方が確実。
# 単純化のため、セグメントごとに全単語をフィルタリングする方式に戻す
for seg_idx, seg_data in enumerate(segments):
seg_start, seg_end, seg_text_full = float(seg_data[0]), float(seg_data[1]), seg_data[2]
# このセグメントに含まれる単語を特定
current_segment_words = []
for word_data in words: # words は (start, end, text) のタプルのリスト
w_start, w_end = float(word_data[0]), float(word_data[1])
# 単語がセグメントの範囲内にあるか(中央が範囲内、または一部がオーバーラップ)
if max(seg_start, w_start) < min(seg_end, w_end):
current_segment_words.append(word_data)
current_segment_words.sort(key=lambda x: float(x[0])) # 開始時間でソート
if not current_segment_words:
f.write(f"{sec2vtt(seg_start)} --> {sec2vtt(seg_end)}\n{seg_text_full}\n\n")
if f.tell() > MAX_VTT_SIZE_BYTES: raise ValueError(f"VTT size limit for {path.name}")
continue
all_words_text_in_segment = [w[2] for w in current_segment_words]
# セグメント開始から最初の単語まで (必要なら)
first_word_actual_start = float(current_segment_words[0][0])
if seg_start < first_word_actual_start - 0.05:
f.write(f"{sec2vtt(seg_start)} --> {sec2vtt(first_word_actual_start)}\n")
f.write(f'<c.line>{" ".join(f"<c.future>{w_txt}</c>" for w_txt in all_words_text_in_segment)}</c>\n\n')
if f.tell() > MAX_VTT_SIZE_BYTES: raise ValueError(f"VTT size limit for {path.name}")
for local_idx, word_data in enumerate(current_segment_words):
w_s, w_e, w_txt = float(word_data[0]), float(word_data[1]), word_data[2]
f.write(f"{sec2vtt(w_s)} --> {sec2vtt(w_e)}\n")
line_parts = [f'<c.past>{t}</c>' for i, t in enumerate(all_words_text_in_segment) if i < local_idx]
line_parts.append(f'<c.current>{w_txt}</c>')
line_parts.extend(f'<c.future>{t}</c>' for i, t in enumerate(all_words_text_in_segment) if i > local_idx)
f.write(f'<c.line>{" ".join(line_parts)}</c>\n\n')
if f.tell() > MAX_VTT_SIZE_BYTES: raise ValueError(f"VTT size limit for {path.name}")
# 単語間の無音期間 (必要なら)
if local_idx < len(current_segment_words) - 1:
next_word_actual_start = float(current_segment_words[local_idx + 1][0])
if w_e < next_word_actual_start - 0.05: # 50ms以上のギャップ
f.write(f"{sec2vtt(w_e)} --> {sec2vtt(next_word_actual_start)}\n")
# 現在の単語までpast、残りはfuture
past_part = [f'<c.past>{t}</c>' for i, t in enumerate(all_words_text_in_segment) if i <= local_idx]
future_part = [f'<c.future>{t}</c>' for i, t in enumerate(all_words_text_in_segment) if i > local_idx]
f.write(f'<c.line>{" ".join(past_part + future_part)}</c>\n\n')
if f.tell() > MAX_VTT_SIZE_BYTES: raise ValueError(f"VTT size limit for {path.name}")
# 最後の単語からセグメント終了まで (必要なら)
last_word_actual_end = float(current_segment_words[-1][1])
if last_word_actual_end < seg_end - 0.05:
f.write(f"{sec2vtt(last_word_actual_end)} --> {sec2vtt(seg_end)}\n")
f.write(f'<c.line>{" ".join(f"<c.past>{w_txt}</c>" for w_txt in all_words_text_in_segment)}</c>\n\n')
if f.tell() > MAX_VTT_SIZE_BYTES: raise ValueError(f"VTT size limit for {path.name}")
def write_json_output(segments: List, words: List, path: Path): # segments: (start,end,text), words: (start,end,text)
result_data = {"segments": []}
# wordsをセグメントに割り当てる (write_vttのロジックと似たようなものが必要)
# もしwordsが空なら、segmentsのtextだけを使う
for seg_start, seg_end, seg_text in segments:
segment_entry = {"start": seg_start, "end": seg_end, "text": seg_text, "words": []}
if words:
# このセグメントに属する単語をフィルタリング
# words はソートされていると仮定 (呼び出し元でソートするか、ここでソート)
# words.sort(key=lambda x: float(x[0])) # 必要なら
for w_start, w_end, w_text in words:
# 単語がセグメントの範囲内にあるか (中央が範囲内、または一部がオーバーラップ)
if max(seg_start, w_start) < min(seg_end, w_end): # よりシンプルなオーバーラップ判定
segment_entry["words"].append({"start": w_start, "end": w_end, "word": w_text})
result_data["segments"].append(segment_entry)
with open(path, "w", encoding="utf-8") as f:
json.dump(result_data, f, ensure_ascii=False, indent=2)
def write_lrc(segments: List, path: Path):
def sec2lrc(t_float: float) -> str:
m, s = divmod(float(t_float), 60)
return f"[{int(m):02d}:{s:05.2f}]"
with open(path, "w", encoding="utf-8") as f:
if not segments: f.write("")
for seg_list in segments: # seg_list is (start, end, text)
f.write(f"{sec2lrc(float(seg_list[0]))}{seg_list[2]}\n")
def process_audio_file(input_path_str: str, output_dir_str: str):
original_input_path_obj = Path(input_path_str)
audio_filename = original_input_path_obj.name
print(f"Processing: {audio_filename}")
temp_wav_path_obj: Optional[Path] = None
current_processing_input_path = input_path_str # MP4等の場合、変換後のWAVパスに更新
output_dir_path = Path(output_dir_str)
# 一時ファイル用ディレクトリ (入力ファイルごと)
# 例: output_dir/temp_processing/input_file_stem/
base_temp_dir = output_dir_path / "temp_processing" / original_input_path_obj.stem
temp_conversion_dir = base_temp_dir / "conversion"
# チャンクは split_audio_with_ffmpeg 内で output_dir_path / "temp_chunks" / audio_stem に保存される
try: # WAV以外の入力はWAV (16kHz, mono) に変換
if original_input_path_obj.suffix.lower() not in ['.wav']:
print(f" Converting {audio_filename} to WAV...")
temp_conversion_dir.mkdir(parents=True, exist_ok=True)
temp_wav_path_obj = temp_conversion_dir / f"{original_input_path_obj.stem}_converted.wav"
if not shutil.which('ffmpeg'):
print(f" Error: ffmpeg not found. Cannot convert {audio_filename}.")
return
cmd = [
'ffmpeg', '-y', '-loglevel', 'error', '-i', input_path_str,
'-vn', '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1',
temp_wav_path_obj.as_posix()
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
if result.returncode == 0:
print(f" Successfully converted to {temp_wav_path_obj.name}")
current_processing_input_path = temp_wav_path_obj.as_posix()
else:
print(f" Error converting {audio_filename} to WAV: {result.stderr.strip()}")
return
except subprocess.TimeoutExpired:
print(f" Timeout converting {audio_filename} to WAV.")
return
except Exception as e_conv:
print(f" Exception during WAV conversion for {audio_filename}: {e_conv}")
return
# 音声分割 (出力ディレクトリのベースパスを渡す)
chunk_paths = split_audio_with_ffmpeg(
current_processing_input_path, output_dir_path.as_posix(),
CHUNK_LENGTH_SECONDS, CHUNK_OVERLAP_SECONDS
)
if not chunk_paths:
print(f" Failed to split {audio_filename}. Skipping transcription.")
return
print(f" Processing {len(chunk_paths)} chunks for {audio_filename} via API...")
chunk_results = []
for i, chunk_p_str in enumerate(chunk_paths):
try:
api_result = process_chunk(chunk_p_str, audio_filename)
if api_result:
chunk_results.append(api_result)
print(f" Successfully processed chunk {i+1}/{len(chunk_paths)}")
else:
print(f" Failed to process chunk {i+1}/{len(chunk_paths)}")
except GPUQuotaExceededError as gpu_error:
print(f" GPU quota exceeded while processing {audio_filename}")
print(f" Error: {gpu_error}")
print(f" GPU制限に達しました。処理を強制終了します。")
raise # main()関数で捕捉するために再発生
# APIリクエスト間の待機時間を追加
if i < len(chunk_paths) - 1: # 最後のチャンクの後は待機不要
wait_seconds = 5
print(f" Waiting for {wait_seconds} seconds before processing next chunk...")
time.sleep(wait_seconds)
if not chunk_results:
print(f" No chunks successfully processed via API for {audio_filename}.")
return
merged_result = merge_transcripts(chunk_results, CHUNK_OVERLAP_SECONDS, audio_filename)
output_stem_str = (output_dir_path / original_input_path_obj.stem).as_posix()
save_transcript(merged_result, output_stem_str, audio_filename)
except Exception as e_main_proc:
print(f"An unexpected error occurred while processing {audio_filename}: {e_main_proc}")
import traceback
traceback.print_exc()
finally:
# 一時ファイル/ディレクトリの削除
# split_audio_with_ffmpeg で作られたチャンク用ディレクトリを削除
chunk_temp_parent_dir = output_dir_path / "temp_chunks" / original_input_path_obj.stem
if chunk_temp_parent_dir.exists():
try:
shutil.rmtree(chunk_temp_parent_dir)
# print(f" Deleted temporary chunk directory: {chunk_temp_parent_dir}")
except OSError as e_del_chunk:
print(f" Error deleting temp chunk dir {chunk_temp_parent_dir}: {e_del_chunk}")
if temp_conversion_dir.exists() and temp_conversion_dir.parent == base_temp_dir : # base_temp_dirごと消すので個別削除は不要
pass
# temp_processing/input_file_stem ディレクトリ全体を削除
if base_temp_dir.exists():
try:
shutil.rmtree(base_temp_dir)
print(f" Cleaned up temporary processing directory: {base_temp_dir}")
except OSError as e_del_base:
print(f" Error deleting base temp dir {base_temp_dir}: {e_del_base}")
def main():
parser = argparse.ArgumentParser(
description="Transcribes audio/video files from a specified path (file or directory). "
"Outputs are saved in the same location as input files. "
"Skips already processed files (checks for .json output). "
"Prefers MP3 over MP4 if both exist with the same base name."
)
parser.add_argument(
"input_path",
nargs="?", # オプショナルにする
help="Path to an input audio/video file or a directory containing such files."
)
args = parser.parse_args()
# 引数が指定されていない場合はGUIでファイル選択
if args.input_path is None:
root = tk.Tk()
root.withdraw() # メインウィンドウを非表示
input_path = filedialog.askdirectory(
title="処理したい音声/動画ファイルのあるフォルダを選択してください",
initialdir=os.getcwd()
)
if not input_path: # キャンセルされた場合
print("フォルダが選択されませんでした。")
return
input_path_obj = Path(input_path)
else:
input_path_obj = Path(args.input_path)
if not input_path_obj.exists():
print(f"Error: Input path '{args.input_path}' does not exist.")
return
# 1. 処理対象候補のファイルリストを作成 (MP3優先ロジックを含む)
files_to_consider_processing = []
if input_path_obj.is_file():
if input_path_obj.suffix.lower() in TARGET_AUDIO_VIDEO_EXTENSIONS:
files_to_consider_processing.append(input_path_obj)
else:
print(f"Input file '{input_path_obj.name}' is not a supported type. Supported: {TARGET_AUDIO_VIDEO_EXTENSIONS}")
elif input_path_obj.is_dir():
print(f"Scanning directory: {input_path_obj.resolve()}")
# ベース名でファイルをグループ化
grouped_files = collections.defaultdict(list)
for item in sorted(input_path_obj.iterdir()): # sortedで処理順をある程度一定に
if item.is_file() and item.suffix.lower() in TARGET_AUDIO_VIDEO_EXTENSIONS:
grouped_files[item.stem].append(item)
if not grouped_files:
print(f"No supported files found in directory: {input_path_obj.resolve()}")
return
for base_name, file_group in grouped_files.items():
mp3_file = next((f for f in file_group if f.suffix.lower() == '.mp3'), None)
mp4_file = next((f for f in file_group if f.suffix.lower() == '.mp4'), None)
chosen_file = None
if mp3_file:
chosen_file = mp3_file
if mp4_file and mp4_file != mp3_file: # MP4も存在する場合 (通常は別ファイルのはず)
print(f" MP3 found for '{base_name}', prioritizing '{mp3_file.name}' over '{mp4_file.name}'.")
elif mp4_file:
chosen_file = mp4_file
else: # MP3もMP4もない場合、リストの最初のファイル(何らかの音声/動画ファイル)
# TARGET_AUDIO_VIDEO_EXTENSIONS の順序やファイル名のソート順に依存する可能性あり
if file_group: chosen_file = file_group[0]
if chosen_file:
files_to_consider_processing.append(chosen_file)
else:
print(f"Error: Input path '{args.input_path}' is not a valid file or directory.")
return
if not files_to_consider_processing:
print("No files selected for processing.")
return
# 2. 処理済みファイルをスキップ
actual_files_to_process = []
print(f"\nFound {len(files_to_consider_processing)} potential file(s). Checking for existing transcripts...")
for file_path in files_to_consider_processing:
output_dir = file_path.parent
# 代表的な出力ファイル (例: .json) の存在でスキップ判定
expected_output_file = output_dir / f"{file_path.stem}{PRIMARY_OUTPUT_EXTENSION_FOR_SKIP_CHECK}"
if expected_output_file.exists():
print(f" Skipping '{file_path.name}': Output '{expected_output_file.name}' already exists.")
else:
actual_files_to_process.append(file_path)
if not actual_files_to_process:
print("\nNo new files to process. All selected files seem to have existing transcripts.")
return
total_to_process_count = len(actual_files_to_process)
print(f"\nStarting processing for {total_to_process_count} new file(s)...")
for i, file_to_process_obj in enumerate(actual_files_to_process):
print(f"\n--- [{i+1}/{total_to_process_count}] Processing: {file_to_process_obj.name} ---")
output_dir_for_this_file = file_to_process_obj.parent.as_posix()
try:
process_audio_file(file_to_process_obj.as_posix(), output_dir_for_this_file)
print(f"--- Finished: {file_to_process_obj.name} ---")
except GPUQuotaExceededError as gpu_error:
print(f"\n=== GPU QUOTA EXCEEDED ===")
print(f"処理を中断します。GPU制限に達しました。")
print(f"Error details: {gpu_error}")
sys.exit(1) # 即座に強制終了
print(f"\nAll {total_to_process_count} new file(s) processed.")
if __name__ == "__main__":
if not GRADIO_CLIENT_AVAILABLE:
print("Critical: gradio_client library is not installed. Please run: pip install gradio_client")
else:
main()