""" AI Video Studio (Runway Gen-4 / Gen-4 Turbo + Gemini + Tavily + ElevenLabs + Runway Audio Fallback) Features: - Quality Mode: choose 'gen4' (higher fidelity) or 'gen4_turbo' (faster iteration). Gen-4 / Turbo accept 5s or 10s durations only. - Structured scene schema (Subject | Action | Camera | Lighting | Mood | Style) -> merged prompt. - Multi-keyframe support (upload 1โ€“4 images); automatic ratio cropping to supported Runway aspect ratios. - ElevenLabs TTS with: pagination, retry, streaming/non-streaming, adjustable stability/similarity/style/speaker boost. - Hard fallback default voice ID (env ELEVEN_DEFAULT_VOICE_ID) if dropdown fetch fails. - Runway audio silent fallback placeholder (stub) if all TTS fails (replace later with real Runway audio call if available). - Sharpness (edge density) heuristic; one automatic re-generation with detail suffix for blurry clips. - Clean temporary file housekeeping; robust logging & progress reporting. Environment Variables (required): GEMINI_API_KEY TAVILY_API_KEY RUNWAY_API_KEY (or RUNWAYML_API_SECRET) Optional: ELEVENLABS_API_KEY (or XI_API_KEY) ELEVEN_DEFAULT_VOICE_ID (fallback voice id) Security: NEVER hard-code real API keys in this file. """ import os import json import time import random import logging import subprocess import base64 from pathlib import Path from typing import List, Dict, Any, Optional import gradio as gr from PIL import Image, ImageDraw, ImageFont, ImageFilter import numpy as np # External SDKs import google.generativeai as genai from tavily import TavilyClient from runwayml import RunwayML import httpx # ---- ElevenLabs (version-agnostic import) ---- try: from elevenlabs import ElevenLabs try: from elevenlabs.errors import ApiError # may not exist in some versions except Exception: ApiError = Exception except ImportError: ElevenLabs = None ApiError = Exception # ---------------- Logging ---------------- logging.basicConfig( level=logging.INFO, format="[%(levelname)s %(asctime)s] %(message)s", datefmt="%H:%M:%S" ) log = logging.getLogger("ai_video_studio") # ---------------- Environment / Keys ---------------- GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") TAVILY_API_KEY = os.getenv("TAVILY_API_KEY") RUNWAY_KEY = os.getenv("RUNWAY_API_KEY") or os.getenv("RUNWAYML_API_SECRET") ELEVEN_KEY = os.getenv("ELEVENLABS_API_KEY") or os.getenv("XI_API_KEY") required_missing = [k for k, v in { "GEMINI_API_KEY": GEMINI_API_KEY, "TAVILY_API_KEY": TAVILY_API_KEY, "RUNWAY_API_KEY": RUNWAY_KEY }.items() if not v] if required_missing: raise RuntimeError(f"Missing required API keys: {', '.join(required_missing)}") genai.configure(api_key=GEMINI_API_KEY) tavily_client = TavilyClient(api_key=TAVILY_API_KEY) runway_client = RunwayML(api_key=RUNWAY_KEY) eleven_client = ElevenLabs(api_key=ELEVEN_KEY) if (ELEVEN_KEY and ElevenLabs) else None # ---------------- Constants ---------------- DEFAULT_SCENES = 4 MAX_SCENES = 8 ALLOWED_DURATIONS = {5, 10} # Runway Gen-4 / Turbo durations (5 or 10 seconds) :contentReference[oaicite:0]{index=0}:contentReference[oaicite:1]{index=1} SUPPORTED_RATIOS = {"1280:720", "1584:672", "1104:832", "720:1280", "832:1104", "960:960"} # documented multiple aspect ratios :contentReference[oaicite:2]{index=2} WORDS_PER_SEC = 2.5 PLACEHOLDER_BG = (16, 18, 24) PLACEHOLDER_FG = (240, 242, 248) FONT_CANDIDATES = [ "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf" ] SHARPNESS_MIN = 0.015 RETRY_DETAIL_SUFFIX = "ultra-detailed textures, crisp focus, refined edges" GLOBAL_STYLE = "cinematic, cohesive composition, natural volumetric light, filmic color grade, gentle camera motion, high detail" # Fallback ElevenLabs voice ID (replace with your own or set env var) DEFAULT_ELEVEN_VOICE_ID = os.getenv("ELEVEN_DEFAULT_VOICE_ID", "21m00Tcm4TlvDq8ikWAM") # example/published sample id RUNWAY_AUDIO_FALLBACK = True # Placeholder stub (replace with real Runway audio generation when available) # ---------------- Utility ---------------- def uid() -> str: return f"{int(time.time())}_{random.randint(1000,9999)}" def sanitize_filename(name: str) -> str: safe = "".join(c for c in name if c.isalnum() or c in ("-","_"))[:60] return safe or "video" def load_font(size: int = 44): for p in FONT_CANDIDATES: if Path(p).exists(): try: return ImageFont.truetype(p, size) except Exception: pass return ImageFont.load_default() def generate_placeholder_image(topic: str, width=768, height=432) -> str: img = Image.new("RGB", (width, height), PLACEHOLDER_BG) draw = ImageDraw.Draw(img) font = load_font(44) words = topic.split() lines, line = [], [] max_chars = 26 for w in words: test = " ".join(line + [w]) if len(test) > max_chars: lines.append(" ".join(line)); line=[w] else: line.append(w) if line: lines.append(" ".join(line)) # center vertically metrics=[]; total_h=0 for ln in lines: bbox = draw.textbbox((0,0), ln, font=font) h=bbox[3]-bbox[1] metrics.append((ln,h,bbox)) total_h += h+12 y=(height-total_h)//2 for ln,h,bbox in metrics: w=bbox[2]-bbox[0] x=(width-w)//2 draw.text((x,y), ln, fill=PLACEHOLDER_FG, font=font) y+=h+12 out=f"placeholder_{uid()}.png" img.save(out) return out def closest_supported_ratio(w: int, h: int) -> str: candidates=[] cur_ratio = w / h for r in SUPPORTED_RATIOS: rw,rh = map(int,r.split(":")) diff = abs(cur_ratio - (rw/rh)) candidates.append((diff,r)) candidates.sort() return candidates[0][1] def crop_to_ratio(img: Image.Image, ratio: str) -> Image.Image: rw,rh = map(int, ratio.split(":")) target = rw / rh w,h = img.size cur = w / h if abs(cur-target)<1e-3: return img if cur>target: # too wide new_w=int(target*h) x0=(w-new_w)//2 return img.crop((x0,0,x0+new_w,h)) else: # too tall new_h=int(w/target) y0=(h-new_h)//2 return img.crop((0,y0,w,y0+new_h)) def research_topic(topic: str) -> str: try: res = tavily_client.search( query=f"Key facts & interesting points about {topic}", search_depth="basic" ) if res and "results" in res: return "\n".join( str(r.get("content","")).strip() for r in res["results"] if r.get("content") ) except Exception as e: log.warning(f"Tavily failed: {e}") return "No supplemental research facts available." # ---------------- Gemini Script Generation ---------------- def gemini_script(topic: str, facts: str, scene_count: int) -> Dict[str,Any]: """ Request structured JSON with narration + scene objects containing schema fields. """ prompt = f""" You are a creative director. Topic: {topic} Facts: {facts} Return STRICT JSON: {{ "narration_script": "", "scenes": [ {{ "subject": "...", "action": "...", "camera": "...", "lighting": "...", "mood": "...", "style": "...", "prompt": "" }} (exactly {scene_count} objects) ] }} Rules: - Keep one consistent main subject across scenes unless evolution is explicitly helpful. - camera: ONE motion (e.g. "slow dolly in", "handheld pan", "aerial sweep"). - lighting: descriptive & cinematic (e.g. "golden hour rim light"). - style: filmic adjectives (e.g. "35mm film grain, rich color palette"). - merged prompt must integrate key fields succinctly. - No markdown, no lists, no commentary outside JSON. """ model = genai.GenerativeModel("gemini-1.5-flash") response = model.generate_content(prompt) raw=(response.text or "").strip() if raw.startswith("```"): raw=raw.strip("`") if raw.lower().startswith("json"): raw=raw[4:].strip() data=None try: data=json.loads(raw) except json.JSONDecodeError: s=raw.find("{"); e=raw.rfind("}") if s!=-1 and e!=-1: try: data=json.loads(raw[s:e+1]) except Exception: pass if not isinstance(data,dict): raise gr.Error("Gemini did not return valid JSON.") narration=data.get("narration_script","").strip() scenes=data.get("scenes",[]) if not narration: raise gr.Error("Missing narration_script.") norm=[] for sc in scenes: if not isinstance(sc,dict): continue prompt_txt = sc.get("prompt") or "" norm.append({ "subject": sc.get("subject",""), "action": sc.get("action",""), "camera": sc.get("camera",""), "lighting": sc.get("lighting",""), "mood": sc.get("mood",""), "style": sc.get("style",""), "prompt": prompt_txt[:160].strip() }) while len(norm) List[Dict[str,str]]: if not eleven_client: return [] voices=[] token=None for _ in range(max_pages): try: resp = eleven_client.voices.get_all(page_size=page_size, next_page_token=token) except Exception as e: log.error(f"Voice fetch error: {e}") break these = getattr(resp,"voices",[]) for v in these: voices.append({"id": v.voice_id, "name": v.name}) token = getattr(resp,"next_page_token", None) if not token: break time.sleep(delay) log.info(f"Fetched {len(voices)} ElevenLabs voices.") return voices def tts_elevenlabs(text: str, voice_id: str, model_id: str, stability: float, similarity: float, style: float, speaker_boost: bool, streaming: bool, out_path: str) -> bool: if not eleven_client: log.warning("ElevenLabs client not initialized.") return False if not voice_id: log.warning("No voice_id provided for TTS.") return False try: stability=max(0,min(1,stability)) similarity=max(0,min(1,similarity)) style=max(0,min(1,style)) settings = { "stability": stability, "similarity_boost": similarity, "style": style, "use_speaker_boost": speaker_boost } if streaming and hasattr(eleven_client.text_to_speech,"convert_as_stream"): with open(out_path,"wb") as f: for chunk in eleven_client.text_to_speech.convert_as_stream( voice_id=voice_id, model_id=model_id, text=text, optimize_streaming_latency=3, voice_settings=settings ): f.write(chunk) else: audio = eleven_client.text_to_speech.convert( voice_id=voice_id, model_id=model_id, text=text, voice_settings=settings ) with open(out_path,"wb") as f: f.write(audio) # sanity size check if os.path.getsize(out_path) < 800: log.error("ElevenLabs audio too small; treating as failure.") return False return True except ApiError as e: log.error(f"ElevenLabs ApiError: {e}") except Exception as e: log.error(f"ElevenLabs TTS error: {e}") return False # ---------------- Runway Audio Fallback (placeholder silent track) ---------------- def runway_audio_fallback(text: str, out_path: str) -> bool: if not RUNWAY_AUDIO_FALLBACK: return False try: duration = max(2.0, min(300.0, len(text.split())/WORDS_PER_SEC)) subprocess.run([ "ffmpeg","-f","lavfi","-i","anullsrc=r=44100:cl=mono", "-t", f"{duration:.2f}", "-q:a","9","-acodec","libmp3lame", out_path,"-y" ], check=True) return True except Exception as e: log.error(f"Runway audio fallback failed: {e}") return False def silent_track(narration: str, out_path: str): duration = max(2.0, min(300.0, len(narration.split())/WORDS_PER_SEC)) subprocess.run([ "ffmpeg","-f","lavfi","-i","anullsrc=r=44100:cl=mono", "-t", f"{duration:.2f}", "-q:a","9","-acodec","libmp3lame", out_path,"-y" ], check=True) # ---------------- Runway Video Generation ---------------- def runway_generate_clip(model: str, prompt_image: str, text_prompt: str, duration: int, ratio: str, max_wait=360) -> str: try: task = runway_client.image_to_video.create( model=model, prompt_image=prompt_image, prompt_text=text_prompt, duration=duration, ratio=ratio ) # API pattern for gen4 / turbo image-to-video :contentReference[oaicite:3]{index=3}:contentReference[oaicite:4]{index=4} except Exception as e: raise gr.Error(f"Runway task creation failed: {e}") waited=0; interval=5 while True: task = runway_client.tasks.retrieve(id=task.id) status = getattr(task,"status",None) if status=="SUCCEEDED": break if status=="FAILED": raise gr.Error(f"Runway generation failed: {getattr(task,'error','Unknown error')}") time.sleep(interval); waited+=interval if waited>=max_wait: raise gr.Error("Runway generation timeout.") outputs = getattr(task,"output",None) if not outputs or not isinstance(outputs,list): raise gr.Error("Runway returned no outputs.") video_url = outputs[0] clip_path=f"runway_clip_{uid()}.mp4" with httpx.stream("GET", video_url, timeout=240) as r: r.raise_for_status() with open(clip_path,"wb") as f: for chunk in r.iter_bytes(): f.write(chunk) return clip_path # ---------------- Sharpness Heuristic ---------------- def clip_edge_density(path: str) -> float: # Quick heuristic using FFmpeg + PIL (avoid heavy deps if opencv absent) try: tmp = f"frame_{uid()}.png" subprocess.run([ "ffmpeg","-i",path,"-vf","scale=320:-1","-vframes","1",tmp,"-y" ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) img = Image.open(tmp).convert("L") arr = np.array(img.filter(ImageFilter.FIND_EDGES)) val = arr.mean()/255.0 os.remove(tmp) return val except Exception: return 1.0 # assume acceptable if analysis fails # ---------------- Concatenate & Mux ---------------- def concat_and_mux(video_paths: List[str], audio_path: str, out_path: str): list_file=f"concat_{uid()}.txt" with open(list_file,"w") as lf: for p in video_paths: lf.write(f"file '{p}'\n") combined=f"combined_{uid()}.mp4" subprocess.run([ "ffmpeg","-f","concat","-safe","0","-i",list_file, "-c","copy",combined,"-y" ],check=True) subprocess.run([ "ffmpeg","-i",combined,"-i",audio_path, "-c:v","copy","-c:a","aac","-shortest",out_path,"-y" ],check=True) for p in (list_file,combined): try: os.remove(p) except OSError: pass # ---------------- Prompt Assembly ---------------- def build_scene_prompt(sc: Dict[str,str]) -> str: merged = sc.get("prompt") or "" if merged: return f"{merged}. {GLOBAL_STYLE}" base = f"{sc['subject']} {sc['action']}, {sc['camera']}, {sc['lighting']}, {sc['mood']}, {sc['style']}" return f"{base}. {GLOBAL_STYLE}" # ---------------- Main Pipeline ---------------- def generate_video( topic: str, keyframes: list, scene_count: int, clip_duration: int, ratio: str, quality_mode: bool, voice_choice: Optional[str], model_id: str, stability: float, similarity: float, style: float, speaker_boost: bool, streaming_tts: bool, progress=gr.Progress(track_tqdm=True) ) -> str: job=uid() log.info(f"[JOB {job}] topic='{topic}'") temp_files=[] try: if not topic.strip(): raise gr.Error("Please enter a topic.") scene_count = max(1,min(MAX_SCENES,scene_count)) if clip_duration not in ALLOWED_DURATIONS: clip_duration=5 runway_model = "gen4" if quality_mode else "gen4_turbo" # trade speed vs fidelity :contentReference[oaicite:5]{index=5}:contentReference[oaicite:6]{index=6} progress(0.05, desc="๐Ÿ” Researching...") facts = research_topic(topic) progress(0.15, desc="๐Ÿง  Scripting (Gemini)...") script = gemini_script(topic, facts, scene_count) narration = script["narration"] scene_objs = script["scenes"] progress(0.30, desc="๐ŸŽ™๏ธ Narration (TTS)...") audio_path=f"narration_{job}.mp3" temp_files.append(audio_path) # Determine voice id (UI or default fallback) if voice_choice and "|" in voice_choice: voice_id = voice_choice.split("|",1)[1].strip() else: voice_id = DEFAULT_ELEVEN_VOICE_ID log.info(f"[JOB {job}] Using voice_id='{voice_id}' model_id='{model_id}' (quality={quality_mode})") tts_ok=False if ELEVEN_KEY and voice_id: tts_ok = tts_elevenlabs( narration, voice_id, model_id, stability, similarity, style, speaker_boost, streaming_tts, audio_path ) if not tts_ok and RUNWAY_AUDIO_FALLBACK: tts_ok = runway_audio_fallback(narration, audio_path) if not tts_ok: silent_track(narration, audio_path) progress(0.40, desc="๐Ÿ–ผ๏ธ Preparing keyframes...") loaded_keyframes=[] if keyframes: for fp in keyframes[:4]: try: img=Image.open(fp).convert("RGB") loaded_keyframes.append(img) except Exception: pass if not loaded_keyframes: placeholder = generate_placeholder_image(topic) temp_files.append(placeholder) loaded_keyframes=[Image.open(placeholder).convert("RGB")] if ratio not in SUPPORTED_RATIOS: ratio_choice = closest_supported_ratio(*loaded_keyframes[0].size) else: ratio_choice = ratio processed=[] for img in loaded_keyframes: processed.append(crop_to_ratio(img, ratio_choice)) # Data URIs for Runway image_to_video data_uris=[] from io import BytesIO for img in processed: buf=BytesIO() img.save(buf, format="PNG") data_uris.append("data:image/png;base64,"+base64.b64encode(buf.getvalue()).decode("utf-8")) video_clips=[] for idx, sc in enumerate(scene_objs, start=1): progress(0.40 + 0.45*idx/scene_count, desc=f"๐ŸŽฌ Scene {idx}/{scene_count}...") img_uri = data_uris[(idx-1)%len(data_uris)] prompt_text = build_scene_prompt(sc) clip_path = runway_generate_clip( model=runway_model, prompt_image=img_uri, text_prompt=prompt_text, duration=clip_duration, ratio=ratio_choice ) video_clips.append(clip_path); temp_files.append(clip_path) sharp = clip_edge_density(clip_path) if sharp < SHARPNESS_MIN: log.info(f"Scene {idx} low sharpness ({sharp:.4f}) - retrying with detail boost") retry_prompt = prompt_text + ", " + RETRY_DETAIL_SUFFIX retry_clip = runway_generate_clip( model=runway_model, prompt_image=img_uri, text_prompt=retry_prompt, duration=clip_duration, ratio=ratio_choice ) video_clips[-1]=retry_clip temp_files.append(retry_clip) progress(0.92, desc="๐Ÿงต Stitching & muxing...") final_out=f"{sanitize_filename(topic)}_{job}.mp4" concat_and_mux(video_clips, audio_path, final_out) progress(1.0, desc="โœ… Complete") log.info(f"[JOB {job}] done -> {final_out}") return final_out except Exception as e: log.error(f"[JOB {job}] FAILED: {e}", exc_info=True) raise gr.Error(f"Pipeline error: {e}") finally: # cleanup intermediates (keep final video) for p in temp_files: try: if os.path.exists(p): os.remove(p) except OSError: pass # ---------------- UI Helpers ---------------- _cached_voices: List[str] = [] def refresh_voices(): global _cached_voices voices = fetch_voices_paginated() _cached_voices = [f"{v['name']}|{v['id']}" for v in voices] return gr.update(choices=_cached_voices) # ---------------- Gradio Interface ---------------- with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# ๐ŸŽฌ AI Video Studio (Gen-4 / Turbo + Gemini + ElevenLabs)") gr.Markdown( "Iterate with Turbo, finalize with Gen-4. Upload up to 4 keyframes for stronger subject consistency." ) with gr.Row(): topic = gr.Textbox(label="Video Topic", placeholder="e.g. The history of coffee", scale=3) keyframes = gr.Files(label="Optional Keyframe Images (1โ€“4)") with gr.Row(): scene_count = gr.Slider(1, MAX_SCENES, value=DEFAULT_SCENES, step=1, label="Scenes") clip_duration = gr.Radio(choices=sorted(list(ALLOWED_DURATIONS)), value=5, label="Seconds/Scene") ratio = gr.Dropdown(choices=sorted(list(SUPPORTED_RATIOS)), value="1280:720", label="Aspect Ratio") quality_mode = gr.Checkbox(label="Quality Mode (gen4 vs gen4_turbo)", value=False) gr.Markdown("### Narration (ElevenLabs primary; fallback silent track)") with gr.Row(): refresh_btn = gr.Button("๐Ÿ”„ Refresh Voices") voices_dd = gr.Dropdown(choices=[], label="ElevenLabs Voice (Name|ID)") model_dd = gr.Dropdown( choices=["eleven_turbo_v2_5","eleven_multilingual_v2","eleven_flash_v2_5","eleven_monolingual_v1"], value="eleven_turbo_v2_5", label="ElevenLabs Model" ) streaming_chk = gr.Checkbox(label="Streaming TTS", value=False) with gr.Row(): stability = gr.Slider(0,1,value=0.55,step=0.01,label="Stability") similarity = gr.Slider(0,1,value=0.80,step=0.01,label="Similarity") style = gr.Slider(0,1,value=0.25,step=0.01,label="Style") speaker_boost = gr.Checkbox(label="Speaker Boost", value=True) generate_btn = gr.Button("๐Ÿš€ Generate Video", variant="primary") output_video = gr.Video(label="Final Video") refresh_btn.click(fn=refresh_voices, outputs=voices_dd) generate_btn.click( fn=generate_video, inputs=[ topic, keyframes, scene_count, clip_duration, ratio, quality_mode, voices_dd, model_dd, stability, similarity, style, speaker_boost, streaming_chk ], outputs=output_video ) gr.Markdown( "### Tips\n" "- Use detailed keyframes with clear subject & lighting.\n" "- Add emotional descriptors directly in narration text for richer prosody.\n" "- Iterate with Turbo then switch to Quality Mode to finalize.\n" "- Adjust Stability/Similarity for expressiveness vs consistency." ) if __name__ == '__main__': demo.launch()