import gradio as gr # from PIL import Image # Keep commented unless needed import torch from transformers import pipeline # Keep pipeline for standard models from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, AutoConfig, AutoModelForCausalLM import yt_dlp import tempfile import os import shutil import numpy as np import time # For timestamp formatting import soundfile as sf # For reading audio info import traceback # For printing full errors import platform import re import subprocess # --- 硬體檢查函數 --- def get_hardware_info(): """獲取 CPU 和 GPU 信息""" # 獲取 CPU 信息 cpu_info = "Unknown CPU" try: if platform.system() == "Windows": output = subprocess.check_output("wmic cpu get name", shell=True).decode().strip().split('\n') if len(output) >= 2: cpu_info = output[1].strip() elif platform.system() == "Linux": with open('/proc/cpuinfo', 'r') as f: for line in f: if line.startswith('model name'): cpu_info = line.split(':')[1].strip() break elif platform.system() == "Darwin": # macOS output = subprocess.check_output("sysctl -n machdep.cpu.brand_string", shell=True).decode().strip() cpu_info = output except Exception as e: print(f"Error getting CPU info: {e}") # 獲取 GPU 信息 gpu_info = None if torch.cuda.is_available(): try: gpu_info = torch.cuda.get_device_name(0) # print(f"GPU detected: {gpu_info}") except Exception as e: print(f"Error getting GPU info: {e}") # 保留 CPU 和 GPU 的完整名稱 # 不進行簡化,直接返回完整名稱 return cpu_info, gpu_info # --- Global Variables --- pipe = None phi4_model = None phi4_processor = None current_model_name = None current_device = "cpu" # 默認使用 CPU # --- Model Data --- PHI4_MODEL_ID = "microsoft/Phi-4-multimodal-instruct" MERALION_MODEL_ID = "MERaLiON/MERaLiON-AudioLLM-Whisper-SEA-LION" SEALLM_MODEL_ID = "SeaLLMs/SeaLLMs-Audio-7B" MODEL_DATA = [ {"id": "openai/whisper-tiny", "params": "~39M", "size": "151 MB", "status_en": "Available", "status_zh": "可用", "type": "whisper"}, {"id": "openai/whisper-base", "params": "~74M", "size": "290 MB", "status_en": "Available", "status_zh": "可用", "type": "whisper"}, {"id": "openai/whisper-small", "params": "~244M", "size": "967 MB", "status_en": "Available", "status_zh": "可用", "type": "whisper"}, {"id": "openai/whisper-medium", "params": "~769M", "size": "3.06 GB", "status_en": "Available (CPU Slow)", "status_zh": "可用 (CPU 慢)", "type": "whisper"}, {"id": "openai/whisper-large", "params": "~1.55B", "size": "6.17 GB", "status_en": "Available (CPU Very Slow)", "status_zh": "可用 (CPU 極慢)", "type": "whisper"}, {"id": "openai/whisper-large-v2", "params": "~1.55B", "size": "6.17 GB", "status_en": "Available (CPU Very Slow)", "status_zh": "可用 (CPU 極慢)", "type": "whisper"}, {"id": "openai/whisper-large-v3", "params": "~1.55B", "size": "3.09 GB", "status_en": "Available (CPU Very Slow)", "status_zh": "可用 (CPU 極慢)", "type": "whisper"}, {"id": "openai/whisper-large-v3-turbo", "params": "~809M", "size": "1.62 GB", "status_en": "Available (Optimized, CPU Slow)", "status_zh": "可用 (優化, CPU 慢)", "type": "whisper"}, {"id": PHI4_MODEL_ID, "params": "~5.57B", "size": "11.15 GB", "status_en": "Multimodal (Need Trust, High RAM)", "status_zh": "多模態 (需信任,高RAM)", "type": "phi4"}, # {"id": MERALION_MODEL_ID, "params": "~9.93B", "size": "19.85 GB", "status_en": "Experimental (Need Trust, High RAM)", "status_zh": "實驗性 (需信任,高RAM)", "type": "other"}, # {"id": SEALLM_MODEL_ID, "params": "~8.29B", "size": "16.57 GB", "status_en": "Experimental (Need Trust, High RAM)", "status_zh": "實驗性 (需信任,高RAM)", "type": "other"}, ] MODEL_INFO_DICT = {m['id']: m for m in MODEL_DATA} MODEL_CHOICES_WITH_PARAMS = [ (f"{m['id'].split('/')[-1]} ({m['params']}, {m['size']}) - {m['status_en']} / {m['status_zh']}", m['id']) for m in MODEL_DATA ] DEFAULT_MODEL = "openai/whisper-tiny" # --- Language Data --- BILINGUAL_LANGUAGES_DICT = { "auto": "Auto-detect / 自動偵測", "en": "English / 英文", "zh": "Chinese / 中文", "de": "German / 德文", "es": "Spanish / 西班牙文", "ru": "Russian / 俄文", "ko": "Korean / 韓文", "fr": "French / 法文", "ja": "Japanese / 日文", "pt": "Portuguese / 葡萄牙文", "tr": "Turkish / 土耳其文", "pl": "Polish / 波蘭文", "ca": "Catalan / 加泰隆尼亞文", "nl": "Dutch / 荷蘭文", "ar": "Arabic / 阿拉伯文", "sv": "Swedish / 瑞典文", "it": "Italian / 義大利文", "id": "Indonesian / 印尼文", "hi": "Hindi / 印地文", "fi": "Finnish / 芬蘭文", "vi": "Vietnamese / 越南文", "he": "Hebrew / 希伯來文", "uk": "Ukrainian / 烏克蘭文", "el": "Greek / 希臘文", "ms": "Malay / 馬來文", "cs": "Czech / 捷克文", "ro": "Romanian / 羅馬尼亞文", "da": "Danish / 丹麥文", "hu": "Hungarian / 匈牙利文", "ta": "Tamil / 坦米爾文", "no": "Norwegian / 挪威文", "th": "Thai / 泰文", "ur": "Urdu / 烏爾都文", "hr": "Croatian / 克羅埃西亞文", "bg": "Bulgarian / 保加利亞文", "lt": "Lithuanian / 立陶宛文", "la": "Latin / 拉丁文", "mi": "Maori / 毛利文", "ml": "Malayalam / 馬拉雅拉姆文", "cy": "Welsh / 威爾斯文", "sk": "Slovak / 斯洛伐克文", "te": "Telugu / 泰盧固文", "fa": "Persian / 波斯文", "lv": "Latvian / 拉脫維亞文", "bn": "Bengali / 孟加拉文", "sr": "Serbian / 塞爾維亞文", "az": "Azerbaijani / 亞塞拜然文", "sl": "Slovenian / 斯洛維尼亞文", "kn": "Kannada / 坎那達文", "et": "Estonian / 愛沙尼亞文", "mk": "Macedonian / 馬其頓文", "br": "Breton / 布列塔尼文", "eu": "Basque / 巴斯克文", "is": "Icelandic / 冰島文", "hy": "Armenian / 亞美尼亞文", "ne": "Nepali / 尼泊爾文", "mn": "Mongolian / 蒙古文", "bs": "Bosnian / 波士尼亞文", "kk": "Kazakh / 哈薩克文", "sq": "Albanian / 阿爾巴尼亞文", "sw": "Swahili / 史瓦希里文", "gl": "Galician / 加利西亞文", "mr": "Marathi / 馬拉地文", "pa": "Punjabi / 旁遮普文", "si": "Sinhala / 僧伽羅文", "km": "Khmer / 高棉文", "sn": "Shona / 修納文", "yo": "Yoruba / 約魯巴文", "so": "Somali / 索馬利文", "af": "Afrikaans / 南非荷蘭文", "oc": "Occitan / 奧克西坦文", "ka": "Georgian / 喬治亞文", "be": "Belarusian / 白俄羅斯文", "tg": "Tajik / 塔吉克文", "sd": "Sindhi / 信德文", "gu": "Gujarati / 古吉拉特文", "am": "Amharic / 安哈拉文", "yi": "Yiddish / 意第緒文", "lo": "Lao / 寮文", "uz": "Uzbek / 烏茲別克文", "fo": "Faroese / 法羅文", "ht": "Haitian Creole / 海地克里奧爾文", "ps": "Pashto / 普什圖文", "tk": "Turkmen / 土庫曼文", "nn": "Nynorsk / 新挪威文", "mt": "Maltese / 馬爾他文", "sa": "Sanskrit / 梵文", "lb": "Luxembourgish / 盧森堡文", "my": "Myanmar / 緬甸文", "bo": "Tibetan / 藏文", "tl": "Tagalog / 他加祿文", "mg": "Malagasy / 馬達加斯加文", "as": "Assamese / 阿薩姆文", "tt": "Tatar / 韃靼文", "haw": "Hawaiian / 夏威夷文", "ln": "Lingala / 林加拉文", "ha": "Hausa / 豪沙文", "ba": "Bashkir / 巴什基爾文", "jw": "Javanese / 爪哇文", "su": "Sundanese / 巽他文", "yue": "Cantonese / 粵語", } WHISPER_LANGUAGES_LIST = [] WHISPER_LANGUAGES_LIST.append((BILINGUAL_LANGUAGES_DICT["auto"], "auto")) def get_english_name(display_name_tuple): return display_name_tuple[0].split('/')[0].strip() sorted_languages = sorted( [(display_name, code) for code, display_name in BILINGUAL_LANGUAGES_DICT.items() if code != "auto"], key=get_english_name ) WHISPER_LANGUAGES_LIST.extend(sorted_languages) PHI4_AUDIO_LANG_CODES = ["auto", "en", "zh", "de", "fr", "it", "ja", "es", "pt"] PHI4_LANGUAGES_LIST = [(BILINGUAL_LANGUAGES_DICT.get(code, code), code) for code in PHI4_AUDIO_LANG_CODES] # --- Microphone Prompt --- MIC_PROMPT = """**Try Reading / 試著朗讀:** "Success is stumbling from failure to failure with no loss of enthusiasm." - Winston Churchill 「成功是在一次又一次失敗中,依然熱情不減地前行。」 - 溫斯頓・邱吉爾""" # --- YouTube Audio Download Function --- def download_youtube_audio(url): # 使用固定的目錄來存儲下載的音訊文件,這樣它們就不會被刪除 download_dir = os.path.join(tempfile.gettempdir(), "youtube_downloads") os.makedirs(download_dir, exist_ok=True) # 從 URL 中提取視頻 ID 作為文件名的一部分 video_id = url.split("v=")[-1].split("&")[0] if "v=" in url else str(int(time.time())) filename = f"youtube_{video_id}_{int(time.time())}" temp_dir = tempfile.mkdtemp() downloaded_path = None try: temp_filepath_tmpl = os.path.join(download_dir, f"{filename}.%(ext)s") ydl_opts = { 'format': 'bestaudio/best', 'outtmpl': temp_filepath_tmpl, 'noplaylist': True, 'quiet': True, 'postprocessors': [{'key': 'FFmpegExtractAudio','preferredcodec': 'mp3','preferredquality': '192',}], 'ffmpeg_location': shutil.which("ffmpeg"), } if not ydl_opts['ffmpeg_location']: print("Warning: ffmpeg not found... / 警告:找不到 ffmpeg...") with yt_dlp.YoutubeDL(ydl_opts) as ydl: info_dict = ydl.extract_info(url, download=True) duration = info_dict.get('duration') title = info_dict.get('title', 'unknown') final_filepath = ydl.prepare_filename(info_dict) if not final_filepath.endswith('.mp3'): base_name = final_filepath.rsplit('.', 1)[0] final_filepath = base_name + '.mp3' if os.path.exists(final_filepath): downloaded_path = final_filepath print(f"YouTube audio downloaded: {downloaded_path}") print(f"Title: {title}, Duration: {duration}s") else: potential_files = [os.path.join(download_dir, f) for f in os.listdir(download_dir) if f.startswith(filename) and f.endswith(".mp3")] if potential_files: downloaded_path = potential_files[0] print(f"Warning: Could not find expected MP3, using fallback: {downloaded_path}") duration = None else: raise FileNotFoundError(f"Audio file not found after download in {download_dir}") return downloaded_path, temp_dir, duration except Exception as e: print(f"Error processing YouTube URL: {e}") if temp_dir and os.path.exists(temp_dir): try: shutil.rmtree(temp_dir) except Exception as cleanup_e: print(f"Error cleaning temp directory {temp_dir}: {cleanup_e}") return None, None, None # --- Timestamp Formatting --- def format_timestamp(seconds): if seconds is None: return "N/A" milliseconds = round(seconds * 1000) seconds_int = int(milliseconds // 1000) milliseconds_rem = milliseconds % 1000 minutes = seconds_int // 60 seconds_rem = seconds_int % 60 hours = minutes // 60 minutes_rem = minutes % 60 return f"{hours:01d}:{minutes_rem:02d}:{seconds_rem:02d}.{milliseconds_rem:03d}" # --- 下載功能 --- def update_download_file(filepath): """當有音訊檔案時更新下載檔案""" if filepath and os.path.exists(filepath): return filepath return None # --- YouTube 音訊處理 --- def process_youtube_url(youtube_url): """處理 YouTube URL,下載音訊並返回播放器和下載按鈕的更新""" if not youtube_url or not youtube_url.strip(): return gr.update(visible=False, value=None), gr.update(visible=False, value=None) try: print(f"Processing YouTube URL: {youtube_url}") # 只使用我們需要的返回值 audio_path, _, _ = download_youtube_audio(youtube_url) if audio_path and os.path.exists(audio_path): # 返回音訊播放器和下載按鈕的更新 return gr.update(visible=True, value=audio_path), gr.update(visible=True, value=audio_path) else: return gr.update(visible=False, value=None), gr.update(visible=False, value=None) except Exception as e: print(f"Error processing YouTube URL: {e}") return gr.update(visible=False, value=None), gr.update(visible=False, value=None) # --- Load ASR Pipeline --- def load_asr_pipeline(model_id): global pipe, phi4_model, phi4_processor, current_device print(f"DEBUG: Loading ASR pipeline for {model_id} on device: {current_device}") trust_code = model_id in [MERALION_MODEL_ID, SEALLM_MODEL_ID] if trust_code: print(f"DEBUG: Setting trust_remote_code=True for pipeline model {model_id}") try: phi4_model = None phi4_processor = None # 根據選擇的設備設置模型加載參數 if current_device == "gpu": # 檢查 CUDA 是否可用 if torch.cuda.is_available(): try: # 嘗試直接使用 CUDA 設備 pipe = pipeline( "automatic-speech-recognition", model=model_id, trust_remote_code=trust_code, device="cuda" ) # 注意:第一次運行時可能會出現 attention mask 警告,這是正常的,不影響功能 print(f"DEBUG: Using GPU (CUDA) for ASR pipeline. Available GPU: {torch.cuda.get_device_name(0)}") except Exception as e: # 如果直接使用 CUDA 失敗,嘗試使用 device=0 pipe = pipeline( "automatic-speech-recognition", model=model_id, trust_remote_code=trust_code, device=0 ) print(f"DEBUG: Using GPU (device=0) for ASR pipeline. Reason for device_map failure: {str(e)}") else: # 如果 CUDA 不可用,回退到 CPU 並警告用戶 pipe = pipeline( "automatic-speech-recognition", model=model_id, trust_remote_code=trust_code, device="cpu" ) print("WARNING: GPU selected but CUDA is not available. Falling back to CPU.") else: # CPU # 使用 CPU pipe = pipeline( "automatic-speech-recognition", model=model_id, trust_remote_code=trust_code, device="cpu" ) # 注意:第一次運行時可能會出現 attention mask 警告,這是正常的,不影響功能 print("DEBUG: Using CPU for ASR pipeline.") print(f"DEBUG: Model loaded on device: {pipe.device}") return pipe except Exception as e: print(f"Error loading ASR pipeline for {model_id}:") traceback.print_exc() raise e # --- Load Phi-4 Model --- def load_phi4_model(model_id): global pipe, phi4_model, phi4_processor, current_device print(f"DEBUG: Loading Phi-4 model {model_id} on device: {current_device}") try: pipe = None phi4_processor = AutoProcessor.from_pretrained(model_id, trust_remote_code=True) # 根據選擇的設備設置模型加載參數 if current_device == "gpu": # 檢查 CUDA 是否可用 if torch.cuda.is_available(): try: # 嘗試直接使用 CUDA 設備 phi4_model = AutoModelForCausalLM.from_pretrained( model_id, trust_remote_code=True, torch_dtype=torch.float16, # 使用半精度以節省 GPU 記憶體 _attn_implementation="eager", ) phi4_model = phi4_model.to("cuda") print(f"DEBUG: Using GPU (CUDA) for Phi-4. Available GPU: {torch.cuda.get_device_name(0)}") except Exception as e: # 如果直接使用 CUDA 失敗,嘗試使用 device=0 try: phi4_model = AutoModelForCausalLM.from_pretrained( model_id, trust_remote_code=True, torch_dtype=torch.float16, _attn_implementation="eager", ) phi4_model = phi4_model.to("cuda:0") print(f"DEBUG: Using GPU (device=0) for Phi-4. Reason for first attempt failure: {str(e)}") except Exception as e2: # 如果仍然失敗,回退到 CPU phi4_model = AutoModelForCausalLM.from_pretrained( model_id, trust_remote_code=True, torch_dtype=torch.float32, _attn_implementation="eager", ) phi4_model = phi4_model.to("cpu") print(f"WARNING: Failed to use GPU for Phi-4, falling back to CPU. Error: {str(e2)}") else: # 如果 CUDA 不可用,回退到 CPU 並警告用戶 phi4_model = AutoModelForCausalLM.from_pretrained( model_id, trust_remote_code=True, torch_dtype=torch.float32, # CPU 通常使用全精度 _attn_implementation="eager", ) phi4_model = phi4_model.to("cpu") print("WARNING: GPU selected but CUDA is not available. Falling back to CPU for Phi-4.") else: # CPU # 使用 CPU phi4_model = AutoModelForCausalLM.from_pretrained( model_id, trust_remote_code=True, torch_dtype=torch.float32, # CPU 通常使用全精度 _attn_implementation="eager", ) phi4_model = phi4_model.to("cpu") print("DEBUG: Using CPU for Phi-4.") print(f"DEBUG: Phi-4 model loaded on device: {next(phi4_model.parameters()).device}") return phi4_model, phi4_processor except Exception as e: print(f"Error loading Phi-4 model {model_id}:") traceback.print_exc() if "scipy" in str(e) or "torchvision" in str(e) or "peft" in str(e): missing_pkg = "scipy" if "scipy" in str(e) else "torchvision" if "torchvision" in str(e) else "peft" raise type(e)(f"{e}. Please ensure '{missing_pkg}' is in requirements.txt") from e else: raise e # --- Main Transcription Function --- def transcribe_audio(mic_input, file_input, youtube_url, selected_model_identifier, task, language, return_timestamps, phi4_prompt_text, device_choice, previous_output_text, active_tab): global pipe, phi4_model, phi4_processor, current_model_name, current_device audio_source = None source_type_en = "" source_type_zh = "" temp_dir_to_clean = None audio_duration = None model_name_for_display = selected_model_identifier model_load_time = 0.0 inference_time = 0.0 model_type = MODEL_INFO_DICT.get(selected_model_identifier, {}).get("type", "other") output_text_accumulated = previous_output_text if previous_output_text else "" status_update_prefix = output_text_accumulated + ("\n\n---\n\n" if output_text_accumulated else "") final_output_text = output_text_accumulated # 更新設備選擇 if device_choice != current_device: current_device = device_choice print(f"DEBUG: Device changed to {current_device}") # 設備變更時強制重新加載模型 pipe = None phi4_model = None phi4_processor = None current_model_name = None # --- Load Model --- model_changed = selected_model_identifier != current_model_name model_needs_load = (model_type == "phi4" and phi4_model is None) or (model_type != "phi4" and pipe is None) if model_changed or model_needs_load: warning_message = "" # 移除未使用的 trust_code 變量 if selected_model_identifier in [PHI4_MODEL_ID, MERALION_MODEL_ID, SEALLM_MODEL_ID]: warning_message += f"Warning: Model {selected_model_identifier} requires executing remote code.\n警告: 模型 {selected_model_identifier} 需要執行遠端程式碼。\n" if "seallms" in selected_model_identifier.lower() or "meralion" in selected_model_identifier.lower(): warning_message += f"Warning: Model {selected_model_identifier} likely requires >16GB RAM.\n警告: 模型 {selected_model_identifier} 可能需要 >16GB RAM。\n" if model_type == "phi4": warning_message += f"Warning: Phi-4 uses a different process.\n警告: Phi-4 使用不同處理流程。\n" print(f"Attempting to load model / 嘗試載入模型: {selected_model_identifier} (Type / 類型: {model_type})") status_update_str = warning_message + f"Loading model / 正在載入模型: {selected_model_identifier}..." # 不使用 yield,而是更新 output_text_accumulated output_text_accumulated = status_update_prefix + status_update_str load_start_time = time.monotonic() try: if model_type == "phi4": phi4_model, phi4_processor = load_phi4_model(selected_model_identifier) pipe = None else: pipe = load_asr_pipeline(selected_model_identifier) phi4_model = None phi4_processor = None load_end_time = time.monotonic() model_load_time = load_end_time - load_start_time current_model_name = selected_model_identifier model_name_for_display = current_model_name print(f"Model {current_model_name} loaded successfully ({model_load_time:.2f}s). / 模型 {current_model_name} 載入成功 ({model_load_time:.2f} 秒).") status_update_str = warning_message + f"Model {current_model_name} loaded successfully / 載入成功 ({model_load_time:.2f}s)." # 更新 output_text_accumulated output_text_accumulated = status_update_prefix + status_update_str except Exception as e: load_end_time = time.monotonic() model_load_time = load_end_time - load_start_time print(f"Failed to load model {selected_model_identifier} ({model_load_time:.2f}s). / 載入模型 {selected_model_identifier} 失敗 ({model_load_time:.2f} 秒).") error_msg = f"Error: Failed to load model {selected_model_identifier}:\n錯誤: 載入模型 {selected_model_identifier} 失敗:\n{e}\n({model_load_time:.2f}s)" if "requires `accelerate`" in str(e): error_msg += "\n**Missing 'accelerate'. Please install. / 缺少 'accelerate',請安裝.**" if isinstance(e, (MemoryError, RuntimeError)) and "out of memory" in str(e).lower(): error_msg += "\n**Out of Memory. Try a smaller model. / 記憶體不足,請嘗試較小模型.**" if "trust_remote_code=True" in str(e): error_msg += "\n**Requires trusting remote code. Model might be unsafe. / 需要信任遠端代碼,模型可能不安全.**" if "scipy" in str(e) or "torchvision" in str(e) or "peft" in str(e): missing_pkg = "scipy" if "scipy" in str(e) else "torchvision" if "torchvision" in str(e) else "peft" error_msg += f"\n**Missing '{missing_pkg}'. Please install. / 缺少 '{missing_pkg}',請安裝.**" status_update_str = warning_message + error_msg pipe = None phi4_model = None phi4_processor = None current_model_name = None # 更新 output_text_accumulated output_text_accumulated = status_update_prefix + status_update_str return (output_text_accumulated, gr.update(), gr.update(), gr.update()) # Keep inputs # --- Check if model loaded --- if (model_type == "phi4" and phi4_model is None) or (model_type != "phi4" and pipe is None): output_text_accumulated = status_update_prefix + "Error: Cannot use model. / 錯誤: 無法使用模型." return (output_text_accumulated, gr.update(), gr.update(), gr.update()) # --- Determine Input Source & Get Duration --- # 根據當前活動的標籤選擇正確的輸入源 print(f"DEBUG: Active tab is {active_tab}") if active_tab == "mic" and mic_input is not None: audio_source = mic_input source_type_en = "Microphone" source_type_zh = "麥克風" elif active_tab == "file" and file_input is not None: # 處理 File 組件的輸出,它可能是一個文件路徑或一個包含文件路徑的列表 if isinstance(file_input, list) and len(file_input) > 0: # 如果是列表,取第一個文件 audio_source = file_input[0] else: # 否則直接使用 audio_source = file_input source_type_en = "File Upload" source_type_zh = "檔案上傳" elif active_tab == "youtube" and youtube_url and youtube_url.strip(): source_type_en = "YouTube" source_type_zh = "YouTube" status_update_str = f"Downloading YouTube Audio / 正在下載 YouTube 音訊..." output_text_accumulated = status_update_prefix + status_update_str audio_path, temp_dir, duration_yt = download_youtube_audio(youtube_url) if audio_path: audio_source = audio_path temp_dir_to_clean = temp_dir audio_duration = duration_yt else: output_text_accumulated = status_update_prefix + "Error: Failed to download YouTube audio. / 錯誤:無法下載 YouTube 音訊。" return (output_text_accumulated, gr.update(), gr.update(), gr.update()) else: # 如果沒有選擇任何輸入源或當前標籤沒有有效輸入 return (previous_output_text, gr.update(), gr.update(), gr.update()) # No input if audio_source is None: output_text_accumulated = status_update_prefix + f"Error: No audio file provided. / 錯誤:未提供音訊檔案." return (output_text_accumulated, gr.update(), gr.update(), gr.update()) # 確保音頻文件存在 if not os.path.exists(audio_source): output_text_accumulated = status_update_prefix + f"Error: Audio file not found '{audio_source}'. / 錯誤:找不到音訊檔案 '{audio_source}'." return (output_text_accumulated, gr.update(), gr.update(), gr.update()) # 檢查文件是否為有效的音頻文件 valid_audio_extensions = ['.wav', '.mp3', '.ogg', '.flac', '.m4a', '.aac'] file_ext = os.path.splitext(audio_source)[1].lower() if file_ext not in valid_audio_extensions: output_text_accumulated = status_update_prefix + f"Error: Invalid audio file format '{file_ext}'. / 錯誤:無效的音訊檔案格式 '{file_ext}'." return (output_text_accumulated, gr.update(), gr.update(), gr.update()) if audio_duration is None: try: # 根據文件格式選擇適當的方法獲取音頻時長 if file_ext == '.wav': # 對於 WAV 文件,使用 wave 模塊 import wave try: with wave.open(audio_source, 'rb') as wf: frames = wf.getnframes() rate = wf.getframerate() audio_duration = frames / float(rate) print(f"Got audio duration from wave module / 從 wave 模塊獲取音檔時長: {audio_duration:.2f}s") except Exception as wave_err: print(f"Could not get audio duration from wave module / 無法從 wave 模塊獲取音檔時長: {wave_err}") # 如果 wave 模塊失敗,嘗試使用 soundfile info = sf.info(audio_source) audio_duration = info.duration print(f"Got audio duration from soundfile / 從 soundfile 獲取音檔時長: {audio_duration:.2f}s") else: # 對於其他格式,使用 soundfile info = sf.info(audio_source) audio_duration = info.duration print(f"Got audio duration from soundfile / 從 soundfile 獲取音檔時長: {audio_duration:.2f}s") except Exception as e: print(f"Could not get audio duration / 無法獲取音檔時長: {e}") # 如果無法獲取時長,設置一個默認值 audio_duration = 0.0 print(f"Using default audio duration / 使用默認音檔時長: {audio_duration:.2f}s") print(f"Processing with {current_model_name} from [{source_type_en} / {source_type_zh}]: {audio_source}") print(f"Options: Task='{task}', Language(Source)='{language}', Timestamps='{return_timestamps}'") if model_type == "phi4": print(f"Phi-4 Prompt: '{phi4_prompt_text}'") status_update_str = f"Processing, please wait... / 正在處理,請稍候...\n(Model / 模型: {model_name_for_display})" output_text_accumulated = status_update_prefix + status_update_str # --- Execute & Timing --- inference_start_time = time.monotonic() current_run_output = "" timing_info_str = "" try: if model_type == "phi4": print("DEBUG: Processing with Phi-4...") if not phi4_model or not phi4_processor: raise ValueError("Phi-4 model/processor not loaded / Phi-4 模型/處理器未載入") if not phi4_prompt_text: raise ValueError("Phi-4 requires a prompt text / Phi-4 需要提示文字") user_prompt_tag='<|user|>' assistant_prompt_tag='<|assistant|>' end_tag='<|end|>' prompt = f"{user_prompt_tag}<|audio_1|>{phi4_prompt_text}{end_tag}{assistant_prompt_tag}" audio_data, samplerate = sf.read(audio_source) inputs = phi4_processor(text=prompt, audios=[(audio_data, samplerate)], return_tensors='pt').to(phi4_model.device) with torch.no_grad(): generate_ids = phi4_model.generate(**inputs, max_new_tokens=500, num_logits_to_keep=0) # Added num_logits_to_keep=0 generate_ids = generate_ids[:, inputs['input_ids'].shape[1]:] result_text = phi4_processor.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0] current_run_output = result_text.strip() return_timestamps = False else: # Whisper or other pipeline models print("DEBUG: Processing with ASR pipeline...") if not pipe: raise ValueError("ASR pipeline not loaded / ASR pipeline 未載入") generate_kwargs_pipe = {"task": task} # 根據任務處理語言參數 if task == "transcribe": # 在轉錄任務中,language 表示源語言 if language != "auto": generate_kwargs_pipe["language"] = language print(f"DEBUG: Setting source language to {language} for transcription") else: # translate # 在翻譯任務中,Whisper 只支持翻譯為英文,所以我們忽略 language 參數 # 但我們仍然可以在日誌中記錄目標語言 print(f"DEBUG: Translation target language is {language}, but Whisper only supports English as target") # 設置 pipeline 參數 pipeline_kwargs = { "chunk_length_s": 30, "batch_size": 1, "return_timestamps": "chunks" if return_timestamps else False, "generate_kwargs": generate_kwargs_pipe } # 使用 pipeline 調用處理音訊 # 注意:第一次運行時可能會出現 attention mask 警告,這是正常的,不影響功能 # 第二次及後續運行不會出現警告,且處理速度會更快 result = pipe(audio_source, **pipeline_kwargs) print("DEBUG: pipe() call finished.") print("DEBUG: Raw result type:", type(result)) print("DEBUG: Raw result content:", result) # 處理不同格式的結果 if return_timestamps and isinstance(result, dict) and "chunks" in result: formatted_chunks = [f"[{format_timestamp(chunk.get('timestamp', (None,))[0])} -> {format_timestamp(chunk.get('timestamp', (None, None))[1])}] {chunk.get('text', '').strip()}" for chunk in result["chunks"]] current_run_output = "\n".join(formatted_chunks).strip() elif isinstance(result, dict) and "text" in result: current_run_output = result["text"].strip() elif isinstance(result, str): current_run_output = result.strip() elif isinstance(result, list) and len(result) > 0 and isinstance(result[0], dict) and 'generated_text' in result[0]: current_run_output = result[0]['generated_text'].strip() else: current_run_output = f"(Unrecognized result format / 無法識別的結果格式: {type(result)})" print("DEBUG: Processed result:", current_run_output[:100] + "..." if len(current_run_output) > 100 else current_run_output) inference_end_time = time.monotonic() inference_time = inference_end_time - inference_start_time if not current_run_output: current_run_output = "(Audio empty or unrecognizable / 音檔空白或無法辨識)" # --- Format Timing Info (Plain Text, EN / ZH) --- timing_info_str = f"Model / 模型: {model_name_for_display}\n" if model_load_time > 0: timing_info_str += f"Model Load Time / 模型載入時間: {model_load_time:.2f} seconds / 秒\n" timing_info_str += f"Inference Time / 推論時間: {inference_time:.2f} seconds / 秒\n" relative_speed_str = "(relative time unavailable / 無法計算相對時間)" if audio_duration is not None and audio_duration > 0: relative_speed = inference_time / audio_duration # Corrected format for relative speed relative_speed_str = f"audio duration / 音檔長度 x {relative_speed:.2f}" timing_info_str += f"audio duration / 音檔時長: {audio_duration:.2f} seconds / 秒\n" timing_info_str += f"relative speed / 相對速度: {relative_speed_str}" # Corrected format print(f"Processing finished. / 處理完成。") print(timing_info_str.replace('\n', ' | ')) print(f"Result Text / 結果文字:\n{current_run_output}") # Print result text # 確保 current_run_output 不為空 if not current_run_output or current_run_output.strip() == "": current_run_output = "No text detected in audio / 音頻中未檢測到文字" # 構建最終輸出文本,確保包含所有必要信息 final_output_text = "" if status_update_prefix and status_update_prefix.strip(): final_output_text += status_update_prefix + "\n" # 添加模型和時間信息 final_output_text += timing_info_str + "\n\n" # 添加結果文本,並確保它被正確標記 final_output_text += "Result Text / 結果文字:\n" + current_run_output # 確保最終輸出不是空的或只有一個點 final_output_text = final_output_text.strip() if final_output_text == "." or not final_output_text: final_output_text = timing_info_str + "\n\nResult Text / 結果文字:\n" + current_run_output # 返回完整的文本結果,包括模型信息和處理時間 # 確保返回的是有效的文本,而不是單個點 if final_output_text == ".": print("DEBUG: Detected dot-only output, fixing...") # 構建更有意義的輸出 fixed_output = f"{timing_info_str}\n\nResult Text / 結果文字:\n{current_run_output}" return fixed_output return final_output_text except Exception as e: inference_end_time = time.monotonic() inference_time = inference_end_time - inference_start_time print(f"DEBUG: Exception occurred during processing / 處理過程中發生錯誤:") traceback.print_exc() error_message = f"Processing Failed / 處理失敗:\n{e}" final_output_text = (status_update_prefix + error_message).strip() timing_info_str = f"Model / 模型: {model_name_for_display}\n" if model_load_time > 0: timing_info_str += f"Model Load Time / 模型載入時間: {model_load_time:.2f} seconds / 秒\n" timing_info_str += f"Inference Time (until error) / 推論時間 (至錯誤): {inference_time:.2f} seconds / 秒\n" timing_info_str += "Processing Failed / 處理失敗" final_output_text += "\n\n" + timing_info_str if isinstance(e, (MemoryError, RuntimeError)) and "out of memory" in str(e).lower(): final_output_text += "\n\nOut of Memory, try smaller model. / 記憶體不足,請用小模型." finally: if temp_dir_to_clean: print(f"Cleaning YouTube temp files / 清理 YouTube 暫存: {temp_dir_to_clean}") # Corrected finally block syntax try: shutil.rmtree(temp_dir_to_clean) except Exception as e: print(f"Failed to clean temp files / 清理暫存失敗: {e}") print("DEBUG: Returning final result tuple...") # Return final tuple: Update output_text, KEEP inputs by using gr.update() # 如果 final_output_text 是字典(ASR pipeline 的輸出),直接返回它 # 否則,返回標準的元組格式 if isinstance(final_output_text, dict): return final_output_text else: return (final_output_text, gr.update(), gr.update(), gr.update()) # --- UI Update Functions --- # 添加一個函數來更新音頻播放器 def update_file_audio_player(file_path): if file_path is None: return gr.update(value=None, visible=False) # 如果是列表,取第一個文件 if isinstance(file_path, list) and len(file_path) > 0: file_path = file_path[0] # 檢查文件是否存在 if not os.path.exists(file_path): return gr.update(value=None, visible=False) # 檢查是否為有效的音頻文件 valid_audio_extensions = ['.wav', '.mp3', '.ogg', '.flac', '.m4a', '.aac'] file_ext = os.path.splitext(file_path)[1].lower() if file_ext not in valid_audio_extensions: return gr.update(value=None, visible=False) # 返回更新的音頻播放器 return gr.update(value=file_path, visible=True) def update_task_choices(selected_model_id): model_type = MODEL_INFO_DICT.get(selected_model_id, {}).get("type", "other") if model_type == "whisper": new_choices = [ ("Transcribe / 轉錄", "transcribe"), ("Translate (Whisper only to English) / 翻譯 (Whisper 僅支援轉譯至英文)", "translate") ] else: new_choices = [ ("Transcribe / 轉錄", "transcribe"), ("Translate / 轉譯", "translate") ] return gr.update(choices=new_choices) def update_phi4_prompt_ui(selected_model_id, task, language_code): model_type = MODEL_INFO_DICT.get(selected_model_id, {}).get("type", "other") is_phi4 = model_type == "phi4" prompt_text = "" if is_phi4: if task == "transcribe": if language_code == "auto": prompt_text = "Transcribe the audio to text." else: lang_display_name = BILINGUAL_LANGUAGES_DICT.get(language_code, language_code) lang_english_name = lang_display_name.split('/')[0].strip() prompt_text = f"Transcribe the audio in {lang_english_name}." elif task == "translate": # 在翻譯任務中,language_code 表示目標語言 lang_display_name = BILINGUAL_LANGUAGES_DICT.get(language_code, language_code) lang_english_name = lang_display_name.split('/')[0].strip() if language_code == "auto" or language_code == "en": # 如果目標語言是自動或英文,默認翻譯為英文 prompt_text = "Translate the audio to text." else: # 如果指定了目標語言,翻譯為該語言 prompt_text = f"Detect the language in the audio and translate it to {lang_english_name}." # Return update for Textbox visibility and value directly return gr.update(visible=is_phi4, value=prompt_text) def update_language_choices(selected_model_id): model_type = MODEL_INFO_DICT.get(selected_model_id, {}).get("type", "other") if model_type == "phi4": return gr.update(choices=PHI4_LANGUAGES_LIST, value="auto") else: return gr.update(choices=WHISPER_LANGUAGES_LIST, value="auto") def update_timestamp_visibility(selected_model_id): model_type = MODEL_INFO_DICT.get(selected_model_id, {}).get("type", "other") print(f"DEBUG: Updating timestamp visibility for {selected_model_id}. Type: {model_type}. Visible: {model_type != 'phi4'}") # Debug print return gr.update(visible=(model_type != "phi4")) def update_language_ui(model_id, task): """根據模型和任務更新語言選擇器的標籤和可見性""" model_type = MODEL_INFO_DICT.get(model_id, {}).get("type", "other") # 如果是 Whisper 模型且任務是翻譯,則隱藏語言選擇器(因為 Whisper 只能翻譯成英文) if model_type == "whisper" and task == "translate": return gr.update(visible=False, label="Target Language / 目標語言") # 否則,根據任務更新標籤並顯示 if task == "transcribe": return gr.update(visible=True, label="Source Language / 來源語言") else: # translate return gr.update(visible=True, label="Target Language / 目標語言") # --- Gradio Interface --- # Preserving user's CSS choices compact_css = """ .tabitem { margin: 0rem !important; padding: 0rem !important;} .compact-file > div { min-height: unset !important; } """ # 移除 JavaScript 代碼,改用純 CSS 解決方案 with gr.Blocks(css=compact_css, theme=gr.themes.Default(spacing_size=gr.themes.sizes.spacing_sm, text_size=gr.themes.sizes.text_sm)) as demo: # 只顯示標題,不顯示 GPU 狀態 gr.Markdown("# Automatic Speech Recognition(ASR) & Speech to Text(STT) / 語音辨識、語音轉文字 🔊🔄📝\nUse AI models to transcribe or translate speech from microphone, file uploads, or YouTube. / 使用 AI 模型轉錄或翻譯來自麥克風、上傳檔案或 YouTube 的語音。") with gr.Row(): # Left Column: Input & Options with gr.Column(scale=4): # Preserving user's scale # 添加一個隱藏的狀態變量來跟踪當前活動的標籤 active_tab = gr.State(value="mic") # 默認為麥克風標籤 # 定義標籤切換函數 def set_active_tab(tab_name): return tab_name with gr.Tabs() as tabs: with gr.TabItem("🎤 Microphone / 麥克風") as mic_tab: gr.Markdown(MIC_PROMPT, elem_classes="compact-markdown") mic_input = gr.Audio(sources=["microphone"], type="filepath", label="Record Audio / 錄製音訊") download_output = gr.File(label="Download Recording / 下載錄音檔", interactive=False, elem_classes="compact-file") with gr.TabItem("📁 Upload File / 上傳檔案") as file_tab: # 使用 File 組件代替 Audio 組件,避免音頻處理問題 file_input = gr.File(label="Upload Audio File / 上傳音訊檔", file_types=["audio"], type="filepath") # 添加音頻播放器 file_audio_player = gr.Audio(label="Audio Preview / 音訊預覽", interactive=False, visible=False) with gr.TabItem("▶️ YouTube") as youtube_tab: youtube_input = gr.Textbox(label="YouTube URL / 網址", placeholder="Paste YouTube link here / 在此貼上 YouTube 連結") gr.Examples(examples=[["https://www.youtube.com/watch?v=5D7l0tqQJ7k"]], inputs=[youtube_input], label="Example YouTube URL / 範例 YouTube 網址") # 添加 YouTube 音訊播放器和下載按鈕 with gr.Row(): youtube_audio_player = gr.Audio(label="YouTube Audio / YouTube 音訊", interactive=False, visible=False) youtube_download = gr.File(label="Download YouTube Audio / 下載 YouTube 音訊", interactive=False, visible=False, elem_classes="compact-file") # 添加標籤切換事件 mic_tab.select(fn=lambda: set_active_tab("mic"), inputs=[], outputs=[active_tab]) file_tab.select(fn=lambda: set_active_tab("file"), inputs=[], outputs=[active_tab]) youtube_tab.select(fn=lambda: set_active_tab("youtube"), inputs=[], outputs=[active_tab]) # Options in a single column with ID for spacing with gr.Column(elem_id="options-block"): # elem_id for CSS targeting if needed model_select = gr.Dropdown(choices=MODEL_CHOICES_WITH_PARAMS, label="Model / 模型", value=DEFAULT_MODEL, elem_classes="compact-label") # 獲取硬體信息並顯示具體的 CPU 和 GPU 型號 cpu_info, gpu_info = get_hardware_info() device_choices = [(f"CPU ({cpu_info})", "cpu")] if torch.cuda.is_available() and gpu_info: device_choices.append((f"GPU ({gpu_info})", "gpu")) device_input = gr.Radio(choices=device_choices, label="Device / 設備", value="cpu", elem_classes="compact-label radio-align") task_input = gr.Radio(choices=[("Transcribe / 轉錄", "transcribe"), ("Translate / 轉譯", "translate")], label="Task / 任務", value="transcribe", elem_classes="compact-label radio-align") language_input = gr.Dropdown(choices=WHISPER_LANGUAGES_LIST, label="Source Language / 來源語言", value="auto", elem_classes="compact-label") # Phi-4 prompt directly in the column, no Accordion phi4_prompt_input = gr.Textbox(label="Only for Phi-4 Prompt / 僅用於 Phi-4 指令", placeholder="e.g., Transcribe the audio to text.", lines=1, visible=False, elem_classes="compact-label") # Preserving user label and params timestamp_input = gr.Checkbox(label="Show Timestamps / 顯示時間戳", value=False, elem_classes="compact-label checkbox-align") # Preserving user label # Right Column: Output with gr.Column(scale=6): # Preserving user's scale submit_button = gr.Button("Submit / 提交", variant="primary") # Preserving user's text and placement output_text = gr.Textbox( label="Result / 結果", lines=25, # 設置顯示的行數 max_lines=25, # 設置最大行數,超過會顯示滾動條 interactive=True, placeholder="Results appear here (new results appended). / 結果將顯示在此 (新結果會附加在後面)", elem_classes="result-textbox", # 保留 CSS 類 autoscroll=False # 不自動滾動到底部,讓用戶可以控制滾動 ) # --- Event Listeners --- model_select.change(fn=update_language_choices, inputs=model_select, outputs=language_input) model_select.change(fn=update_task_choices, inputs=[model_select], outputs=[task_input]) # Link prompt update function correctly model_select.change(fn=update_phi4_prompt_ui, inputs=[model_select, task_input, language_input], outputs=[phi4_prompt_input]) task_input.change(fn=update_phi4_prompt_ui, inputs=[model_select, task_input, language_input], outputs=[phi4_prompt_input]) language_input.change(fn=update_phi4_prompt_ui, inputs=[model_select, task_input, language_input], outputs=[phi4_prompt_input]) # 根據模型和任務更新語言選擇器 task_input.change(fn=update_language_ui, inputs=[model_select, task_input], outputs=language_input) model_select.change(fn=update_language_ui, inputs=[model_select, task_input], outputs=language_input) # Link timestamp visibility function model_select.change(fn=update_timestamp_visibility, inputs=model_select, outputs=timestamp_input) # 連接下載按鈕功能 mic_input.change(fn=update_download_file, inputs=mic_input, outputs=download_output) # 連接文件上傳音頻播放器 file_input.change(fn=update_file_audio_player, inputs=file_input, outputs=file_audio_player) # 連接 YouTube 處理功能 youtube_input.change( fn=process_youtube_url, inputs=youtube_input, outputs=[youtube_audio_player, youtube_download], show_progress=True ) # 添加一個函數來直接調用 transcribe_audio,不使用 yield def transcribe_audio_with_error_handling(*args): try: # 獲取模型信息 selected_model_identifier = args[3] # 第四個參數是 selected_model_identifier model_name_for_display = selected_model_identifier # 獲取音頻時長,用於估算處理時間 audio_source = None active_tab = args[-1] # 最後一個參數是 active_tab if active_tab == "mic" and args[0] is not None: audio_source = args[0] elif active_tab == "file" and args[1] is not None: if isinstance(args[1], list) and len(args[1]) > 0: audio_source = args[1][0] else: audio_source = args[1] elif active_tab == "youtube" and args[2] and args[2].strip(): # YouTube 處理較複雜,暫不估算時間 pass # 檢查音頻文件是否存在 if audio_source and os.path.exists(audio_source): print(f"Processing audio file: {audio_source}") # 清除之前的輸出,確保結果顯示正確 print("\n" + "="*50) print("NEW TRANSCRIPTION PROCESS STARTED") print("="*50 + "\n") # 開始計時 start_time = time.time() # 直接調用 transcribe_audio 函數 result = transcribe_audio(*args) # 處理完成 elapsed_time = time.time() - start_time # 處理結果(現在應該是文本字符串) print("DEBUG: Result type:", type(result)) print("DEBUG: Final result:", result) # 檢查結果是否為字符串 if isinstance(result, str): if result.strip() == ".": # 如果結果只是一個點 ".",這是一個已知問題 print("DEBUG: Detected dot-only output in handler, fixing...") # 從控制台輸出中提取最後一個處理結果 # 這是一個臨時解決方案 model_info = f"Model / 模型: {model_name_for_display}" inference_time_info = f"Processing Time / 處理時間: {elapsed_time:.2f} seconds / 秒" # 嘗試從控制台日誌中提取結果文本 # 這裡我們假設結果已經在控制台中打印出來了 final_text = f"{model_info}\n{inference_time_info}\n\nResult Text / 結果文字:\n" final_text += "(Please check console for complete transcription / 請查看控制台獲取完整轉錄)" print("DEBUG: Created replacement result:", final_text[:100] + "..." if len(final_text) > 100 else final_text) else: # 正常結果,直接使用 final_text = result print("DEBUG: Using original result text") else: # 如果結果不是字符串,創建一個新的結果字符串 final_text = f"Model / 模型: {model_name_for_display}\n" final_text += f"Processing Time / 處理時間: {elapsed_time:.2f} seconds / 秒\n\n" final_text += "(No text detected in audio / 音頻中未檢測到文字)" print("DEBUG: Created new result for non-string:", final_text[:100] + "..." if len(final_text) > 100 else final_text) return final_text, gr.update(), gr.update(), gr.update() except Exception as e: import traceback error_msg = f"Error during processing: {str(e)}\n\n{traceback.format_exc()}" print(error_msg) # 返回錯誤訊息,保持其他輸出不變 return f"處理過程中發生錯誤 / Error during processing:\n{str(e)}", gr.update(), gr.update(), gr.update() # Main submit action - Corrected outputs list submit_button.click( fn=transcribe_audio_with_error_handling, inputs=[mic_input, file_input, youtube_input, model_select, task_input, language_input, timestamp_input, phi4_prompt_input, device_input, output_text, active_tab], outputs=[output_text, mic_input, file_input, youtube_input], # 保持原始輸出 show_progress="full" # 顯示完整進度條 ) # --- Launch App --- if __name__ == "__main__": # 獲取硬體信息 cpu_info, gpu_info = get_hardware_info() has_gpu = gpu_info is not None print(f"CPU: {cpu_info}") if has_gpu: print(f"GPU: {gpu_info}") else: print("No GPU detected") # REMEMBER: Update requirements.txt with accelerate, scipy, torchvision, peft # 檢查是否在 Hugging Face Spaces 環境中 import os is_spaces = os.environ.get("SPACE_ID") is not None demo.launch( # debug=True, # max_threads=4, # 減少最大線程數,提高穩定性 # show_error=True, # 顯示錯誤詳情 # server_name="0.0.0.0", # 明確指定監聽所有接口 # server_port=7860, # 指定端口 # quiet=False, # 顯示所有日誌 # prevent_thread_lock=True, # 防止線程鎖定 # share=is_spaces # 在 Spaces 環境中啟用分享 )