import gradio as gr from pydub import AudioSegment import json import uuid import edge_tts import asyncio import aiofiles import os import time import torch import re from typing import List, Dict, Optional from transformers import AutoTokenizer, AutoModelForCausalLM, GenerationConfig import PyPDF2 import traceback import os import shutil from pathlib import Path model_subdir = Path.home() / ".cache" / "huggingface" / "hub" / "models--unsloth--Llama-3.2-3B" # Enable persistent caching on Hugging Face Spaces (if persistent storage is enabled) os.environ["TRANSFORMERS_CACHE"] = "/data/models" #from git import Repo #Repo.clone_from("https://huggingface.co/unsloth/Llama-3.2-3B-bnb-4bit", "./local_model_dir") # Constants MAX_FILE_SIZE_MB = 20 MAX_FILE_SIZE_BYTES = MAX_FILE_SIZE_MB * 1024 * 1024 MODEL_ID = "meta-llama/Meta-Llama-3-8B" #unsloth/Llama-3.2-3B" #meta-llama/Meta-Llama-3-8B"# unsloth/Llama-3.2-3B"#meta-llama/Meta-Llama-3-8B" #"unsloth/Llama-4-Scout-17B-16E-Instruct-GGUF"# unsloth/Qwen2.5-1.5B" #unsloth/Llama-3.2-3B" #unsloth/Llama-3.2-1B" glotoken = os.environ.get("Tokentest") # Global logging system - logs = [] def add_log(message): """Thread-safe logging function""" logs.append(f"[{time.strftime('%H:%M:%S')}] {message}") print(message) # Initialize model with comprehensive error handling model = None tokenizer = None generation_config = None def test_llm_generation(): try: test_prompt = "Hello, how are you today?" inputs = tokenizer(test_prompt, return_tensors="pt").to(model.device) with torch.no_grad(): outputs = model.generate( **inputs, max_new_tokens=10, do_sample=False, pad_token_id=tokenizer.pad_token_id, eos_token_id=tokenizer.eos_token_id ) result = tokenizer.decode(outputs[0], skip_special_tokens=True) add_log(f"๐Ÿงช Test LLM response: {result[:100]}") except Exception as e: add_log(f"โŒ LLM quick test failed: {e}") def initialize_model(): global model, tokenizer, generation_config try: add_log("๐Ÿ”„ Initializing model...") tokenizer = AutoTokenizer.from_pretrained( MODEL_ID, cache_dir="/data/models", token=glotoken, trust_remote_code=True, use_fast=False ) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token add_log("โœ… Set pad_token to eos_token") # Force GPU settings model = AutoModelForCausalLM.from_pretrained( MODEL_ID, torch_dtype=torch.float16, cache_dir="/data/models", trust_remote_code=True, token=glotoken, device_map={"": 0}, # <- force GPU:0 low_cpu_mem_usage=True ) # model = AutoModelForCausalLM.from_pretrained( # MODEL_ID, # cache_dir="/data/models", # trust_remote_code=True # ) model.eval() generation_config = GenerationConfig( max_new_tokens=4096, temperature=0.7, top_p=0.9, do_sample=True, pad_token_id=tokenizer.pad_token_id, eos_token_id=tokenizer.eos_token_id, repetition_penalty=1.1, length_penalty=1.0 ) add_log(f"โœ… Model loaded successfully on device: {model.device}") return True except Exception as e: error_msg = f"โŒ Model initialization failed: {str(e)}" add_log(error_msg) add_log(f"Traceback: {traceback.format_exc()}") return False except Exception as e: error_msg = f"โŒ Model initialization failed: {str(e)}" add_log(error_msg) add_log(f"Traceback: {traceback.format_exc()}") return False # Initialize model at startup model_loaded = initialize_model() class PodcastGenerator: def __init__(self): self.model = model self.tokenizer = tokenizer self.generation_config = generation_config def extract_text_from_pdf(self, file_path: str) -> str: """Extract text from PDF file - CRITICAL FIX #3""" try: add_log(f"๐Ÿ“– Extracting text from PDF: {file_path}") with open(file_path, 'rb') as file: pdf_reader = PyPDF2.PdfReader(file) text = "" for page_num, page in enumerate(pdf_reader.pages): try: page_text = page.extract_text() text += page_text + "\n" add_log(f"โœ… Extracted page {page_num + 1}") except Exception as e: add_log(f"โš ๏ธ Failed to extract page {page_num + 1}: {e}") continue if not text.strip(): raise Exception("No text could be extracted from PDF") add_log(f"โœ… PDF extraction complete. Text length: {len(text)} characters") return text.strip() except Exception as e: error_msg = f"โŒ PDF extraction failed: {str(e)}" add_log(error_msg) raise Exception(error_msg) async def postprocess_conversation(self, raw_text: str) -> str: """Run LLM again to enforce strict Speaker 1/2 format""" prompt = f""" You are a podcast formatter. You just reformat text as if two persons have a conversation - Every line begins with exactly and strictily with `Speaker 1:` or `Speaker 2:` (with colon) - No timestamps, no names, no parentheses, no extra formatting, no chapter names, no special characters beside ":" - No blank lines allowed - Do not invent or change the content, do not add or use -any- person or speaker names, chapeter names , time stamps etc - you are not allowed to use anywhere in the text the character +#-*<>"()[] Example output - you have to follow this structure: Speaker 1: Hello and welcome. Speaker 2: Thanks! Glad to be here. Speaker 1: ... Speaker 2: ... Speaker 1: ... Speaker 2: ... Now format the following according to above instructions {raw_text} """ inputs = self.tokenizer( prompt, return_tensors="pt", truncation=True, max_length=2048 ) inputs = {k: v.to(self.model.device) for k, v in inputs.items()} #inputs = {k: v for k, v in inputs.items()} with torch.no_grad(): outputs = self.model.generate( **inputs, max_new_tokens=1024, pad_token_id=self.tokenizer.pad_token_id, eos_token_id=self.tokenizer.eos_token_id ) formatted = self.tokenizer.decode( outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True ) return formatted.strip() def clean_and_validate_json(self, text: str) -> Dict: """Improved JSON extraction and validation - CRITICAL FIX #4""" add_log("๐Ÿ” Attempting to extract JSON from generated text") # Multiple strategies for JSON extraction strategies = [ # Strategy 1: Look for complete JSON objects r'\{[^{}]*"topic"[^{}]*"podcast"[^{}]*\[[^\]]*\][^{}]*\}', # Strategy 2: More flexible pattern r'\{.*?"topic".*?"podcast".*?\[.*?\].*?\}', # Strategy 3: Extract content between first { and last } r'\{.*\}' ] for i, pattern in enumerate(strategies): add_log(f"๐ŸŽฏ Trying extraction strategy {i+1}") matches = re.findall(pattern, text, re.DOTALL | re.IGNORECASE) for match in matches: try: # Clean the match cleaned = match.strip() # Fix common JSON issues cleaned = re.sub(r',\s*}', '}', cleaned) # Remove trailing commas cleaned = re.sub(r',\s*]', ']', cleaned) # Remove trailing commas in arrays parsed = json.loads(cleaned) # Validate structure if self.validate_podcast_structure(parsed): add_log("โœ… Valid JSON structure found") return parsed except json.JSONDecodeError as e: add_log(f"โš ๏ธ JSON parse error in strategy {i+1}: {e}") continue add_log("โš ๏ธ No valid JSON found, creating fallback") return self.create_fallback_podcast(text) def normalize_speaker_lines(self,text: str) -> str: """Normalize lines to 'Speaker 1: text' format based on presence of 1 or 2 and a ':' or '-'.""" # Convert markdown and bracketed formats to 'Speaker X: ...' text = re.sub( r'(?i)^.*?([12])[^a-zA-Z0-9]*[:\-]\s*', lambda m: f"Speaker {m.group(1)}: ", text, flags=re.MULTILINE ) return text def conversation_to_json(self, text: str) -> Dict: """Convert speaker-formatted text to podcast JSON structure""" # Allow leading whitespace and enforce full line match """Convert speaker-formatted text to podcast JSON structure""" text = self.normalize_speaker_lines(text) # Match strict "Speaker X: ..." lines only lines = re.findall(r'^Speaker\s+([12]):\s*(.+)', text, flags=re.MULTILINE) podcast = [{"speaker": int(s), "line": l.strip()} for s, l in lines] return { "topic": "Generated from Input", "podcast": podcast } def validate_podcast_structure(self, data: Dict) -> bool: """Validate podcast JSON structure""" try: if not isinstance(data, dict): return False if 'topic' not in data or 'podcast' not in data: return False if not isinstance(data['podcast'], list): return False for item in data['podcast']: if not isinstance(item, dict): return False if 'speaker' not in item or 'line' not in item: return False if not isinstance(item['speaker'], int) or item['speaker'] not in [1, 2]: return False if not isinstance(item['line'], str) or len(item['line'].strip()) == 0: return False return len(data['podcast']) > 0 except Exception: return False def create_fallback_podcast(self, text: str) -> Dict: """Create fallback podcast structure - IMPROVED""" add_log("๐Ÿ”ง Creating fallback podcast structure") # Extract meaningful content from the original text sentences = [s.strip() for s in text.split('.') if len(s.strip()) > 20] if not sentences: add_log("๐Ÿ”ง failed sentences creating, fallback standard text") sentences = [ "Welcome to our podcast discussion", "Today we're exploring an interesting topic", "Let's dive into the key points", "That's a fascinating perspective", "What are your thoughts on this matter", "I think there are multiple angles to consider", "This is definitely worth exploring further", "Thank you for this engaging conversation" ] # Create balanced conversation podcast_lines = [] for i, sentence in enumerate(sentences[:12]): # Limit to 12 exchanges speaker = (i % 2) + 1 line = sentence + "." if not sentence.endswith('.') else sentence podcast_lines.append({ "speaker": speaker, "line": line }) result = { "topic": "Generated Discussion", "podcast": podcast_lines } add_log(f"โœ… Fallback podcast created with {len(podcast_lines)} lines") return result async def generate_script(self, prompt: str, language: str, file_obj=None, progress=None) -> Dict: """Improved script generation with better error handling""" if not model_loaded or not self.model or not self.tokenizer: raise Exception("โŒ Model not properly initialized. Please restart the application.") add_log("๐ŸŽฌ Starting script generation") # Process file if provided - CRITICAL FIX #5 if file_obj is not None: try: add_log(f"๐Ÿ“ Processing uploaded file: {file_obj}") if file_obj.endswith('.pdf'): extracted_text = self.extract_text_from_pdf(file_obj) # Truncate if too long if len(extracted_text) > 2000: extracted_text = extracted_text[:2000] + "..." add_log("โœ‚๏ธ Text truncated to 2000 characters") prompt = extracted_text elif file_obj.endswith('.txt'): with open(file_obj, 'r', encoding='utf-8') as f: file_content = f.read() if len(file_content) > 2000: file_content = file_content[:2000] + "..." prompt = file_content except Exception as e: add_log(f"โš ๏ธ File processing error: {e}") # Continue with original prompt # Create focused prompt - CRITICAL FIX #6 example_json = { "topic": "AI Technology", "podcast": [ {"speaker": 1, "line": "Welcome to our discussion about AI technology."}, {"speaker": 2, "line": "Thanks for having me. This is such an exciting field."}, {"speaker": 1, "line": "What aspects of AI do you find most interesting?"}, {"speaker": 2, "line": "I'm particularly fascinated by machine learning applications."} ] } # Simplified and more reliable prompt system_prompt = f"""Create a podcast script Requirements: - Exactly two speakers: Speaker 1 and Speaker 2 - The podcast should fill 4-5 minutes, focusing on the core context of the input text - DO NOT copy the example below , only use it as conversation reference - The podcast should be professional, lively, witty and engaging, and hook the listener from the start. - The input text might be disorganized or unformatted. Ignore any formatting inconsistencies or irrelevant details; your task is to distill the essential points, {{ "topic": "Short and engaging title", "podcast": [ {{"speaker": 1, "line": "Welcome to the podcast."}}, {{"speaker": 2, "line": "Thank you, great to be here."}}, {{"speaker": 1, "line": "..."}}, {{"speaker": 2, "line": "..."}} ] }} Return only valid JSON. Do not include explanation, markdown, or comments. """ #Example JSON structure: #{json.dumps(example_json, indent=2)} #user_prompt = f"\nInput Text:\n{prompt}\n\nPodcast Script:" #user_prompt = user_prompt = f"\nInput Text:\n{prompt}\n\nJSON:"# f"\nTopic: {prompt}\nJSON:" user_prompt = f"\nInput Text:\n{prompt}\n\nJSON:" full_prompt = system_prompt + user_prompt add_log("๐Ÿ” Prompt Preview:\n" + full_prompt[:2000]) try: if progress: progress(0.3, "๐Ÿค– Generating script...") add_log("๐Ÿ”ค Tokenizing input...") # Tokenize with proper handling inputs = self.tokenizer( full_prompt, return_tensors="pt", padding=True, truncation=True, max_length=1200, # Reduced for stability return_attention_mask=True ) # Move to correct device inputs = {k: v.to(self.model.device) for k, v in inputs.items()} add_log(f"โœ… Inputs moved to device: ") add_log("self๐Ÿง  Generating with model...") # Generate with timeout and better parameters with torch.no_grad(): torch.cuda.empty_cache() if torch.cuda.is_available() else None outputs = self.model.generate( **inputs, generation_config=self.generation_config, pad_token_id=self.tokenizer.pad_token_id, # attention_mask=inputs.get('attention_mask'), use_cache=True ) add_log("โœ… Model generation complete") # Decode only new tokens generated_text = self.tokenizer.decode( outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True, clean_up_tokenization_spaces=True ) add_log(f"๐Ÿ“ Generated text length: {len(generated_text)} characters") add_log(f"๐Ÿ” Generated text preview: {generated_text[:2000]}...") #formatted_text = await self.postprocess_conversation(generated_text) #add_log(f"๐Ÿงผ Post-processed text:\n{formatted_text[:2000]}") if progress: progress(0.4, "๐Ÿ” Processing generated script...") # Extract and validate JSON result = self.clean_and_validate_json(generated_text) #result = self.conversation_to_json(formatted_text) if progress: progress(0.5, "โœ… Script generated successfully!") add_log(f"๐Ÿ“„ Full generated text:\n{generated_text}") add_log(f"โœ… Final script has {len(result.get('podcast', []))} lines") return result except Exception as e: error_msg = f"โŒ Script generation error: {str(e)}" add_log(error_msg) add_log(f"๐Ÿ” failed script creation") add_log(f"๐Ÿ” Traceback: {traceback.format_exc()}") # Return robust fallback return self.create_fallback_podcast("Welcome to our podcast") async def tts_generate(self, text: str, speaker: int, speaker1: str, speaker2: str) -> str: """Improved TTS generation with better error handling - CRITICAL FIX #7""" voice = speaker1 if speaker == 1 else speaker2 add_log(f"๐ŸŽ™๏ธ Generating TTS for speaker {speaker} with voice {voice}") # Clean text for TTS text = text.strip() if not text: raise Exception("Empty text for TTS") # Remove problematic characters text = re.sub(r'[^\w\s.,!?;:\-\'"()]', '', text) temp_filename = f"temp_audio_{uuid.uuid4().hex[:8]}.wav" max_retries = 3 for attempt in range(max_retries): try: add_log(f"๐ŸŽต TTS attempt {attempt + 1} for: {text[:50]}...") communicate = edge_tts.Communicate(text, voice) # Use asyncio.wait_for with timeout await asyncio.wait_for( communicate.save(temp_filename), timeout=30.0 ) # Verify file was created and has content if os.path.exists(temp_filename) and os.path.getsize(temp_filename) > 1000: add_log(f"โœ… TTS successful: {os.path.getsize(temp_filename)} bytes") return temp_filename else: raise Exception("Generated audio file is too small or empty") except asyncio.TimeoutError: add_log(f"โฐ TTS timeout on attempt {attempt + 1}") if os.path.exists(temp_filename): os.remove(temp_filename) if attempt == max_retries - 1: raise Exception("TTS generation timed out after multiple attempts") await asyncio.sleep(2) except Exception as e: add_log(f"โŒ TTS error on attempt {attempt + 1}: {str(e)}") if os.path.exists(temp_filename): os.remove(temp_filename) if attempt == max_retries - 1: raise Exception(f"TTS generation failed after {max_retries} attempts: {str(e)}") await asyncio.sleep(2) async def combine_audio_files(self, audio_files: List[str], progress=None) -> str: """Improved audio combination - CRITICAL FIX #8""" if progress: progress(0.9, "๐ŸŽต Combining audio files...") add_log(f"๐Ÿ”— Combining {len(audio_files)} audio files") try: combined_audio = AudioSegment.empty() silence_padding = AudioSegment.silent(duration=800) # 800ms silence for i, audio_file in enumerate(audio_files): try: add_log(f"๐Ÿ“ Processing audio file {i+1}: {audio_file}") if not os.path.exists(audio_file): add_log(f"โš ๏ธ Audio file not found: {audio_file}") continue file_size = os.path.getsize(audio_file) add_log(f"๐Ÿ“Š File size: {file_size} bytes") if file_size < 2000: add_log(f"โš ๏ธ 1 Audio file too small, skipping: {audio_file}") continue audio_segment = AudioSegment.from_file(audio_file) if len(audio_segment) < 500: # Less than 100ms add_log(f"โš ๏ธ 2 Audio segment too short, skipping") continue combined_audio += audio_segment # Add silence between speakers (except for the last file) if i < len(audio_files) - 1: combined_audio += silence_padding add_log(f"โœ… Added audio segment {i+1}, total duration: {len(combined_audio)}ms") except Exception as e: add_log(f"โš ๏ธ Could not process audio file {audio_file}: {e}") continue finally: # Clean up temporary file try: if os.path.exists(audio_file): os.remove(audio_file) add_log(f"๐Ÿ—‘๏ธ Cleaned up temp file: {audio_file}") except: pass if len(combined_audio) == 0: raise Exception("No valid audio content was generated") if len(combined_audio) < 5000: # Less than 5 seconds raise Exception("3 Combined audio is too short") output_filename = f"podcast_output_{uuid.uuid4().hex[:8]}.wav" combined_audio.export(output_filename, format="wav") file_size = os.path.getsize(output_filename) duration = len(combined_audio) / 1000 # Duration in seconds add_log(f"โœ… Final podcast: {output_filename} ({file_size} bytes, {duration:.1f}s)") if progress: progress(1.0, "๐ŸŽ‰ Podcast generated successfully!") return output_filename except Exception as e: error_msg = f"โŒ Audio combination failed: {str(e)}" add_log(error_msg) # Clean up any remaining temp files for audio_file in audio_files: try: if os.path.exists(audio_file): os.remove(audio_file) except: pass raise Exception(error_msg) async def generate_podcast(self, input_text: str, language: str, speaker1: str, speaker2: str, file_obj=None, progress=None) -> str: """Main podcast generation pipeline - CRITICAL FIX #9""" start_time = time.time() add_log("๐ŸŽฌ Starting podcast generation pipeline") try: if progress: progress(0.1, "๐Ÿš€ Starting podcast generation...") # Generate script add_log("๐Ÿ“ Generating podcast script...") podcast_json = await self.generate_script(input_text, language, file_obj, progress) if not podcast_json.get('podcast') or len(podcast_json['podcast']) == 0: raise Exception("No podcast content was generated") add_log(f"โœ… Script generated with {len(podcast_json['podcast'])} dialogue lines") if progress: progress(0.5, "๐ŸŽ™๏ธ Converting text to speech...") # Generate TTS with proper error handling audio_files = [] total_lines = len(podcast_json['podcast']) successful_lines = 0 for i, item in enumerate(podcast_json['podcast']): try: add_log(f"๐ŸŽต Processing line {i+1}/{total_lines}: Speaker {item['speaker']}") clean_line = item['line'] # ๐Ÿ”ง Sanitize malformed lines if not isinstance(clean_line, str) or len(clean_line.strip()) == 0 or clean_line.strip().startswith('"') or "{" in clean_line: add_log(f"โš ๏ธ Malformed line detected for speaker {item['speaker']}: {repr(clean_line[:80])}") # Try to recover from JSON-like noise candidates = re.findall(r'\"line\"\s*:\s*\"([^\"]+)\"', clean_line) if candidates: clean_line = candidates[0] add_log(f"โœ… Recovered line: {clean_line}") else: # Fallback: strip bad characters clean_line = re.sub(r'[^A-Za-z0-9\s.,!?;:\-\'"]+', '', clean_line) add_log(f"๐Ÿ› ๏ธ Cleaned fallback line: {clean_line}") audio_file = await self.tts_generate( clean_line, #item['line'], item['speaker'], speaker1, speaker2 ) audio_files.append(audio_file) successful_lines += 1 # Update progress if progress: current_progress = 0.5 + (0.4 * (i + 1) / total_lines) progress(current_progress, f"๐ŸŽ™๏ธ Generated speech {successful_lines}/{total_lines}") except Exception as e: add_log(f"โŒ TTS failed for line {i+1}: {e}") # Continue with remaining lines rather than failing completely continue if not audio_files: raise Exception("No audio files were generated successfully") if successful_lines < len(podcast_json['podcast']) / 2: add_log(f"โš ๏ธ Warning: Only {successful_lines}/{total_lines} lines processed successfully") add_log(f"โœ… TTS generation complete: {len(audio_files)} audio files") # Combine audio files combined_audio = await self.combine_audio_files(audio_files, progress) elapsed_time = time.time() - start_time add_log(f"๐ŸŽ‰ Podcast generation completed in {elapsed_time:.1f} seconds") return combined_audio except Exception as e: elapsed_time = time.time() - start_time error_msg = f"โŒ Podcast generation failed after {elapsed_time:.1f}s: {str(e)}" add_log(error_msg) add_log(f"๐Ÿ” Full traceback: {traceback.format_exc()}") raise Exception(error_msg) # Voice mapping VOICE_MAPPING = { "Andrew - English (United States)": "en-US-AndrewMultilingualNeural", "Ava - English (United States)": "en-US-AvaMultilingualNeural", "Brian - English (United States)": "en-US-BrianMultilingualNeural", "Emma - English (United States)": "en-US-EmmaMultilingualNeural", "Florian - German (Germany)": "de-DE-FlorianMultilingualNeural", "Seraphina - German (Germany)": "de-DE-SeraphinaMultilingualNeural", "Remy - French (France)": "fr-FR-RemyMultilingualNeural", "Vivienne - French (France)": "fr-FR-VivienneMultilingualNeural" } async def process_input(input_text: str, input_file, language: str, speaker1: str, speaker2: str, progress=None) -> str: """Process input and generate podcast - MAIN ENTRY POINT""" add_log("=" * 50) add_log("๐ŸŽฌ NEW PODCAST GENERATION REQUEST") add_log("=" * 50) try: if progress: progress(0.05, "๐Ÿ” Processing input...") # Map speaker names to voice IDs speaker1_voice = VOICE_MAPPING.get(speaker1, "en-US-AndrewMultilingualNeural") speaker2_voice = VOICE_MAPPING.get(speaker2, "en-US-AvaMultilingualNeural") add_log(f"๐ŸŽญ Speaker 1: {speaker1} -> {speaker1_voice}") add_log(f"๐ŸŽญ Speaker 2: {speaker2} -> {speaker2_voice}") # Validate input if not input_text or input_text.strip() == "": if input_file is None: raise Exception("โŒ Please provide either text input or upload a file") add_log("๐Ÿ“ No text input provided, will process uploaded file") else: add_log(f"๐Ÿ“ Text input provided: {len(input_text)} characters") if input_file: add_log(f"๐Ÿ“Ž File uploaded: {input_file}") # Check model status if not model_loaded: raise Exception("โŒ Model not loaded. Please restart the application.") podcast_generator = PodcastGenerator() result = await podcast_generator.generate_podcast( input_text, language, speaker1_voice, speaker2_voice, input_file, progress ) add_log("๐ŸŽ‰ PODCAST GENERATION COMPLETED SUCCESSFULLY") return result except Exception as e: error_msg = f"โŒ CRITICAL ERROR: {str(e)}" add_log(error_msg) add_log(f"๐Ÿ” Traceback: {traceback.format_exc()}") raise Exception(error_msg) def generate_podcast_gradio(input_text, input_file, language, speaker1, speaker2): """Gradio interface function - CRITICAL FIX #10""" global logs logs = [] # Reset logs for each generation try: add_log("๐ŸŽฌ Gradio function called") add_log(f"๐Ÿ“‹ Parameters: text={bool(input_text)}, file={bool(input_file)}, lang={language}") # Validate inputs if not input_text and input_file is None: add_log("โŒ No input provided") return None, "\n".join(logs) if input_text and len(input_text.strip()) == 0: input_text = None # Progress tracking def progress_callback(value, text): add_log(f"๐Ÿ“Š Progress: {value:.1%} - {text}") # Create new event loop for this request - CRITICAL FIX try: # Try to get existing loop try: loop = asyncio.get_running_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) if loop.is_running(): # If loop is running, we need to run in thread import concurrent.futures with concurrent.futures.ThreadPoolExecutor() as executor: future = executor.submit( lambda: asyncio.run( process_input(input_text, input_file, language, speaker1, speaker2, progress_callback) ) ) result = future.result(timeout=300) # 5 minute timeout else: result = loop.run_until_complete( process_input(input_text, input_file, language, speaker1, speaker2, progress_callback) ) except RuntimeError: # No event loop exists, create new one result = asyncio.run( process_input(input_text, input_file, language, speaker1, speaker2, progress_callback) ) add_log("โœ… Gradio function completed successfully") return result, "\n".join(logs) except Exception as e: error_msg = f"โŒ Gradio function error: {str(e)}" add_log(error_msg) add_log(f"๐Ÿ” Traceback: {traceback.format_exc()}") return None, "\n".join(logs) def create_interface(): #model_loaded = initialize_model() if model_loaded: test_llm_generation() """Create the Gradio interface""" language_options = [ "Auto Detect", "English", "German", "French", "Spanish", "Italian", "Portuguese", "Dutch", "Russian", "Chinese", "Japanese", "Korean" ] voice_options = list(VOICE_MAPPING.keys()) with gr.Blocks( title="Pasching Podcast 2๐ŸŽ™๏ธ", theme=gr.themes.Soft(), css=".gradio-container {max-width: 1200px; margin: auto;}" ) as demo: gr.Markdown("# ๐ŸŽ™๏ธ Pasching Podcast 2") gr.Markdown("Generate professional 2-speaker podcasts from text input!") # Model status indicator if model_loaded: gr.Markdown("โœ… **Model Status: Ready**") else: gr.Markdown("โŒ **Model Status: Failed to Load**") with gr.Row(): with gr.Column(scale=2): input_text = gr.Textbox( label="Input Text", lines=8, placeholder="Enter your topic or text for podcast generation...", info="Describe what you want the podcast to discuss" ) with gr.Column(scale=1): input_file = gr.File( label="Upload File (Optional)", file_types=[".pdf", ".txt"], type="filepath", #info=f"Max size: {MAX_FILE_SIZE_MB}MB" ) with gr.Row(): language = gr.Dropdown( label="Language", choices=language_options, value="Auto Detect", info="Select output language" ) speaker1 = gr.Dropdown( label="Speaker 1 Voice", choices=voice_options, value="Andrew - English (United States)" ) speaker2 = gr.Dropdown( label="Speaker 2 Voice", choices=voice_options, value="Ava - English (United States)" ) generate_btn = gr.Button( "๐ŸŽ™๏ธ Generate Podcast", variant="primary", size="lg", interactive=model_loaded ) log_output = gr.Textbox( label="๐Ÿชต Debug & Transcript Log", lines=15, interactive=False, info="Real-time generation logs and debugging information" ) output_audio = gr.Audio( label="Generated Podcast", type="filepath", format="wav", show_download_button=True ) # Connect the interface generate_btn.click( fn=generate_podcast_gradio, inputs=[input_text, input_file, language, speaker1, speaker2], outputs=[output_audio, log_output], show_progress=True ) return demo if __name__ == "__main__": demo = create_interface() demo.launch( server_name="0.0.0.0", server_port=7860, show_error=True, share=False )