my-video-app / app.py
mgbam's picture
Update app.py
3c12225 verified
"""
AI Video Studio (Runway Gen-4 / Gen-4 Turbo + Gemini + Tavily + ElevenLabs + Runway Audio Fallback)
Features:
- Quality Mode: choose 'gen4' (higher fidelity) or 'gen4_turbo' (faster iteration). Gen-4 / Turbo accept 5s or 10s durations only.
- Structured scene schema (Subject | Action | Camera | Lighting | Mood | Style) -> merged prompt.
- Multi-keyframe support (upload 1–4 images); automatic ratio cropping to supported Runway aspect ratios.
- ElevenLabs TTS with: pagination, retry, streaming/non-streaming, adjustable stability/similarity/style/speaker boost.
- Hard fallback default voice ID (env ELEVEN_DEFAULT_VOICE_ID) if dropdown fetch fails.
- Runway audio silent fallback placeholder (stub) if all TTS fails (replace later with real Runway audio call if available).
- Sharpness (edge density) heuristic; one automatic re-generation with detail suffix for blurry clips.
- Clean temporary file housekeeping; robust logging & progress reporting.
Environment Variables (required):
GEMINI_API_KEY
TAVILY_API_KEY
RUNWAY_API_KEY (or RUNWAYML_API_SECRET)
Optional:
ELEVENLABS_API_KEY (or XI_API_KEY)
ELEVEN_DEFAULT_VOICE_ID (fallback voice id)
Security: NEVER hard-code real API keys in this file.
"""
import os
import json
import time
import random
import logging
import subprocess
import base64
from pathlib import Path
from typing import List, Dict, Any, Optional
import gradio as gr
from PIL import Image, ImageDraw, ImageFont, ImageFilter
import numpy as np
# External SDKs
import google.generativeai as genai
from tavily import TavilyClient
from runwayml import RunwayML
import httpx
# ---- ElevenLabs (version-agnostic import) ----
try:
from elevenlabs import ElevenLabs
try:
from elevenlabs.errors import ApiError # may not exist in some versions
except Exception:
ApiError = Exception
except ImportError:
ElevenLabs = None
ApiError = Exception
# ---------------- Logging ----------------
logging.basicConfig(
level=logging.INFO,
format="[%(levelname)s %(asctime)s] %(message)s",
datefmt="%H:%M:%S"
)
log = logging.getLogger("ai_video_studio")
# ---------------- Environment / Keys ----------------
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY")
RUNWAY_KEY = os.getenv("RUNWAY_API_KEY") or os.getenv("RUNWAYML_API_SECRET")
ELEVEN_KEY = os.getenv("ELEVENLABS_API_KEY") or os.getenv("XI_API_KEY")
required_missing = [k for k, v in {
"GEMINI_API_KEY": GEMINI_API_KEY,
"TAVILY_API_KEY": TAVILY_API_KEY,
"RUNWAY_API_KEY": RUNWAY_KEY
}.items() if not v]
if required_missing:
raise RuntimeError(f"Missing required API keys: {', '.join(required_missing)}")
genai.configure(api_key=GEMINI_API_KEY)
tavily_client = TavilyClient(api_key=TAVILY_API_KEY)
runway_client = RunwayML(api_key=RUNWAY_KEY)
eleven_client = ElevenLabs(api_key=ELEVEN_KEY) if (ELEVEN_KEY and ElevenLabs) else None
# ---------------- Constants ----------------
DEFAULT_SCENES = 4
MAX_SCENES = 8
ALLOWED_DURATIONS = {5, 10} # Runway Gen-4 / Turbo durations (5 or 10 seconds) :contentReference[oaicite:0]{index=0}:contentReference[oaicite:1]{index=1}
SUPPORTED_RATIOS = {"1280:720", "1584:672", "1104:832", "720:1280", "832:1104", "960:960"} # documented multiple aspect ratios :contentReference[oaicite:2]{index=2}
WORDS_PER_SEC = 2.5
PLACEHOLDER_BG = (16, 18, 24)
PLACEHOLDER_FG = (240, 242, 248)
FONT_CANDIDATES = [
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"
]
SHARPNESS_MIN = 0.015
RETRY_DETAIL_SUFFIX = "ultra-detailed textures, crisp focus, refined edges"
GLOBAL_STYLE = "cinematic, cohesive composition, natural volumetric light, filmic color grade, gentle camera motion, high detail"
# Fallback ElevenLabs voice ID (replace with your own or set env var)
DEFAULT_ELEVEN_VOICE_ID = os.getenv("ELEVEN_DEFAULT_VOICE_ID", "21m00Tcm4TlvDq8ikWAM") # example/published sample id
RUNWAY_AUDIO_FALLBACK = True # Placeholder stub (replace with real Runway audio generation when available)
# ---------------- Utility ----------------
def uid() -> str:
return f"{int(time.time())}_{random.randint(1000,9999)}"
def sanitize_filename(name: str) -> str:
safe = "".join(c for c in name if c.isalnum() or c in ("-","_"))[:60]
return safe or "video"
def load_font(size: int = 44):
for p in FONT_CANDIDATES:
if Path(p).exists():
try:
return ImageFont.truetype(p, size)
except Exception:
pass
return ImageFont.load_default()
def generate_placeholder_image(topic: str, width=768, height=432) -> str:
img = Image.new("RGB", (width, height), PLACEHOLDER_BG)
draw = ImageDraw.Draw(img)
font = load_font(44)
words = topic.split()
lines, line = [], []
max_chars = 26
for w in words:
test = " ".join(line + [w])
if len(test) > max_chars:
lines.append(" ".join(line)); line=[w]
else:
line.append(w)
if line: lines.append(" ".join(line))
# center vertically
metrics=[]; total_h=0
for ln in lines:
bbox = draw.textbbox((0,0), ln, font=font)
h=bbox[3]-bbox[1]
metrics.append((ln,h,bbox))
total_h += h+12
y=(height-total_h)//2
for ln,h,bbox in metrics:
w=bbox[2]-bbox[0]
x=(width-w)//2
draw.text((x,y), ln, fill=PLACEHOLDER_FG, font=font)
y+=h+12
out=f"placeholder_{uid()}.png"
img.save(out)
return out
def closest_supported_ratio(w: int, h: int) -> str:
candidates=[]
cur_ratio = w / h
for r in SUPPORTED_RATIOS:
rw,rh = map(int,r.split(":"))
diff = abs(cur_ratio - (rw/rh))
candidates.append((diff,r))
candidates.sort()
return candidates[0][1]
def crop_to_ratio(img: Image.Image, ratio: str) -> Image.Image:
rw,rh = map(int, ratio.split(":"))
target = rw / rh
w,h = img.size
cur = w / h
if abs(cur-target)<1e-3:
return img
if cur>target: # too wide
new_w=int(target*h)
x0=(w-new_w)//2
return img.crop((x0,0,x0+new_w,h))
else: # too tall
new_h=int(w/target)
y0=(h-new_h)//2
return img.crop((0,y0,w,y0+new_h))
def research_topic(topic: str) -> str:
try:
res = tavily_client.search(
query=f"Key facts & interesting points about {topic}",
search_depth="basic"
)
if res and "results" in res:
return "\n".join(
str(r.get("content","")).strip()
for r in res["results"] if r.get("content")
)
except Exception as e:
log.warning(f"Tavily failed: {e}")
return "No supplemental research facts available."
# ---------------- Gemini Script Generation ----------------
def gemini_script(topic: str, facts: str, scene_count: int) -> Dict[str,Any]:
"""
Request structured JSON with narration + scene objects containing schema fields.
"""
prompt = f"""
You are a creative director.
Topic: {topic}
Facts:
{facts}
Return STRICT JSON:
{{
"narration_script": "<cohesive narration (<= 230 words)>",
"scenes": [
{{
"subject": "...",
"action": "...",
"camera": "...",
"lighting": "...",
"mood": "...",
"style": "...",
"prompt": "<merged scene prompt (<=40 words)>"
}}
(exactly {scene_count} objects)
]
}}
Rules:
- Keep one consistent main subject across scenes unless evolution is explicitly helpful.
- camera: ONE motion (e.g. "slow dolly in", "handheld pan", "aerial sweep").
- lighting: descriptive & cinematic (e.g. "golden hour rim light").
- style: filmic adjectives (e.g. "35mm film grain, rich color palette").
- merged prompt must integrate key fields succinctly.
- No markdown, no lists, no commentary outside JSON.
"""
model = genai.GenerativeModel("gemini-1.5-flash")
response = model.generate_content(prompt)
raw=(response.text or "").strip()
if raw.startswith("```"):
raw=raw.strip("`")
if raw.lower().startswith("json"):
raw=raw[4:].strip()
data=None
try:
data=json.loads(raw)
except json.JSONDecodeError:
s=raw.find("{"); e=raw.rfind("}")
if s!=-1 and e!=-1:
try: data=json.loads(raw[s:e+1])
except Exception: pass
if not isinstance(data,dict):
raise gr.Error("Gemini did not return valid JSON.")
narration=data.get("narration_script","").strip()
scenes=data.get("scenes",[])
if not narration:
raise gr.Error("Missing narration_script.")
norm=[]
for sc in scenes:
if not isinstance(sc,dict): continue
prompt_txt = sc.get("prompt") or ""
norm.append({
"subject": sc.get("subject",""),
"action": sc.get("action",""),
"camera": sc.get("camera",""),
"lighting": sc.get("lighting",""),
"mood": sc.get("mood",""),
"style": sc.get("style",""),
"prompt": prompt_txt[:160].strip()
})
while len(norm)<scene_count:
norm.append({
"subject":"main subject",
"action":"subtle motion",
"camera":"slow dolly in",
"lighting":"soft directional key light",
"mood":"cinematic",
"style":"filmic grain",
"prompt":f"Cinematic slow dolly in of main subject, soft directional light, filmic grain, {topic}"
})
norm=norm[:scene_count]
return {"narration": narration, "scenes": norm}
# ---------------- ElevenLabs Voice Handling ----------------
def fetch_voices_paginated(max_pages=5, page_size=50, delay=0.5) -> List[Dict[str,str]]:
if not eleven_client:
return []
voices=[]
token=None
for _ in range(max_pages):
try:
resp = eleven_client.voices.get_all(page_size=page_size, next_page_token=token)
except Exception as e:
log.error(f"Voice fetch error: {e}")
break
these = getattr(resp,"voices",[])
for v in these:
voices.append({"id": v.voice_id, "name": v.name})
token = getattr(resp,"next_page_token", None)
if not token:
break
time.sleep(delay)
log.info(f"Fetched {len(voices)} ElevenLabs voices.")
return voices
def tts_elevenlabs(text: str, voice_id: str, model_id: str,
stability: float, similarity: float,
style: float, speaker_boost: bool,
streaming: bool, out_path: str) -> bool:
if not eleven_client:
log.warning("ElevenLabs client not initialized.")
return False
if not voice_id:
log.warning("No voice_id provided for TTS.")
return False
try:
stability=max(0,min(1,stability))
similarity=max(0,min(1,similarity))
style=max(0,min(1,style))
settings = {
"stability": stability,
"similarity_boost": similarity,
"style": style,
"use_speaker_boost": speaker_boost
}
if streaming and hasattr(eleven_client.text_to_speech,"convert_as_stream"):
with open(out_path,"wb") as f:
for chunk in eleven_client.text_to_speech.convert_as_stream(
voice_id=voice_id,
model_id=model_id,
text=text,
optimize_streaming_latency=3,
voice_settings=settings
):
f.write(chunk)
else:
audio = eleven_client.text_to_speech.convert(
voice_id=voice_id,
model_id=model_id,
text=text,
voice_settings=settings
)
with open(out_path,"wb") as f:
f.write(audio)
# sanity size check
if os.path.getsize(out_path) < 800:
log.error("ElevenLabs audio too small; treating as failure.")
return False
return True
except ApiError as e:
log.error(f"ElevenLabs ApiError: {e}")
except Exception as e:
log.error(f"ElevenLabs TTS error: {e}")
return False
# ---------------- Runway Audio Fallback (placeholder silent track) ----------------
def runway_audio_fallback(text: str, out_path: str) -> bool:
if not RUNWAY_AUDIO_FALLBACK:
return False
try:
duration = max(2.0, min(300.0, len(text.split())/WORDS_PER_SEC))
subprocess.run([
"ffmpeg","-f","lavfi","-i","anullsrc=r=44100:cl=mono",
"-t", f"{duration:.2f}", "-q:a","9","-acodec","libmp3lame",
out_path,"-y"
], check=True)
return True
except Exception as e:
log.error(f"Runway audio fallback failed: {e}")
return False
def silent_track(narration: str, out_path: str):
duration = max(2.0, min(300.0, len(narration.split())/WORDS_PER_SEC))
subprocess.run([
"ffmpeg","-f","lavfi","-i","anullsrc=r=44100:cl=mono",
"-t", f"{duration:.2f}", "-q:a","9","-acodec","libmp3lame",
out_path,"-y"
], check=True)
# ---------------- Runway Video Generation ----------------
def runway_generate_clip(model: str, prompt_image: str, text_prompt: str,
duration: int, ratio: str, max_wait=360) -> str:
try:
task = runway_client.image_to_video.create(
model=model,
prompt_image=prompt_image,
prompt_text=text_prompt,
duration=duration,
ratio=ratio
) # API pattern for gen4 / turbo image-to-video :contentReference[oaicite:3]{index=3}:contentReference[oaicite:4]{index=4}
except Exception as e:
raise gr.Error(f"Runway task creation failed: {e}")
waited=0; interval=5
while True:
task = runway_client.tasks.retrieve(id=task.id)
status = getattr(task,"status",None)
if status=="SUCCEEDED":
break
if status=="FAILED":
raise gr.Error(f"Runway generation failed: {getattr(task,'error','Unknown error')}")
time.sleep(interval); waited+=interval
if waited>=max_wait:
raise gr.Error("Runway generation timeout.")
outputs = getattr(task,"output",None)
if not outputs or not isinstance(outputs,list):
raise gr.Error("Runway returned no outputs.")
video_url = outputs[0]
clip_path=f"runway_clip_{uid()}.mp4"
with httpx.stream("GET", video_url, timeout=240) as r:
r.raise_for_status()
with open(clip_path,"wb") as f:
for chunk in r.iter_bytes():
f.write(chunk)
return clip_path
# ---------------- Sharpness Heuristic ----------------
def clip_edge_density(path: str) -> float:
# Quick heuristic using FFmpeg + PIL (avoid heavy deps if opencv absent)
try:
tmp = f"frame_{uid()}.png"
subprocess.run([
"ffmpeg","-i",path,"-vf","scale=320:-1","-vframes","1",tmp,"-y"
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
img = Image.open(tmp).convert("L")
arr = np.array(img.filter(ImageFilter.FIND_EDGES))
val = arr.mean()/255.0
os.remove(tmp)
return val
except Exception:
return 1.0 # assume acceptable if analysis fails
# ---------------- Concatenate & Mux ----------------
def concat_and_mux(video_paths: List[str], audio_path: str, out_path: str):
list_file=f"concat_{uid()}.txt"
with open(list_file,"w") as lf:
for p in video_paths:
lf.write(f"file '{p}'\n")
combined=f"combined_{uid()}.mp4"
subprocess.run([
"ffmpeg","-f","concat","-safe","0","-i",list_file,
"-c","copy",combined,"-y"
],check=True)
subprocess.run([
"ffmpeg","-i",combined,"-i",audio_path,
"-c:v","copy","-c:a","aac","-shortest",out_path,"-y"
],check=True)
for p in (list_file,combined):
try: os.remove(p)
except OSError: pass
# ---------------- Prompt Assembly ----------------
def build_scene_prompt(sc: Dict[str,str]) -> str:
merged = sc.get("prompt") or ""
if merged:
return f"{merged}. {GLOBAL_STYLE}"
base = f"{sc['subject']} {sc['action']}, {sc['camera']}, {sc['lighting']}, {sc['mood']}, {sc['style']}"
return f"{base}. {GLOBAL_STYLE}"
# ---------------- Main Pipeline ----------------
def generate_video(
topic: str,
keyframes: list,
scene_count: int,
clip_duration: int,
ratio: str,
quality_mode: bool,
voice_choice: Optional[str],
model_id: str,
stability: float,
similarity: float,
style: float,
speaker_boost: bool,
streaming_tts: bool,
progress=gr.Progress(track_tqdm=True)
) -> str:
job=uid()
log.info(f"[JOB {job}] topic='{topic}'")
temp_files=[]
try:
if not topic.strip():
raise gr.Error("Please enter a topic.")
scene_count = max(1,min(MAX_SCENES,scene_count))
if clip_duration not in ALLOWED_DURATIONS:
clip_duration=5
runway_model = "gen4" if quality_mode else "gen4_turbo" # trade speed vs fidelity :contentReference[oaicite:5]{index=5}:contentReference[oaicite:6]{index=6}
progress(0.05, desc="πŸ” Researching...")
facts = research_topic(topic)
progress(0.15, desc="🧠 Scripting (Gemini)...")
script = gemini_script(topic, facts, scene_count)
narration = script["narration"]
scene_objs = script["scenes"]
progress(0.30, desc="πŸŽ™οΈ Narration (TTS)...")
audio_path=f"narration_{job}.mp3"
temp_files.append(audio_path)
# Determine voice id (UI or default fallback)
if voice_choice and "|" in voice_choice:
voice_id = voice_choice.split("|",1)[1].strip()
else:
voice_id = DEFAULT_ELEVEN_VOICE_ID
log.info(f"[JOB {job}] Using voice_id='{voice_id}' model_id='{model_id}' (quality={quality_mode})")
tts_ok=False
if ELEVEN_KEY and voice_id:
tts_ok = tts_elevenlabs(
narration, voice_id, model_id,
stability, similarity, style, speaker_boost,
streaming_tts, audio_path
)
if not tts_ok and RUNWAY_AUDIO_FALLBACK:
tts_ok = runway_audio_fallback(narration, audio_path)
if not tts_ok:
silent_track(narration, audio_path)
progress(0.40, desc="πŸ–ΌοΈ Preparing keyframes...")
loaded_keyframes=[]
if keyframes:
for fp in keyframes[:4]:
try:
img=Image.open(fp).convert("RGB")
loaded_keyframes.append(img)
except Exception:
pass
if not loaded_keyframes:
placeholder = generate_placeholder_image(topic)
temp_files.append(placeholder)
loaded_keyframes=[Image.open(placeholder).convert("RGB")]
if ratio not in SUPPORTED_RATIOS:
ratio_choice = closest_supported_ratio(*loaded_keyframes[0].size)
else:
ratio_choice = ratio
processed=[]
for img in loaded_keyframes:
processed.append(crop_to_ratio(img, ratio_choice))
# Data URIs for Runway image_to_video
data_uris=[]
from io import BytesIO
for img in processed:
buf=BytesIO()
img.save(buf, format="PNG")
data_uris.append("data:image/png;base64,"+base64.b64encode(buf.getvalue()).decode("utf-8"))
video_clips=[]
for idx, sc in enumerate(scene_objs, start=1):
progress(0.40 + 0.45*idx/scene_count,
desc=f"🎬 Scene {idx}/{scene_count}...")
img_uri = data_uris[(idx-1)%len(data_uris)]
prompt_text = build_scene_prompt(sc)
clip_path = runway_generate_clip(
model=runway_model,
prompt_image=img_uri,
text_prompt=prompt_text,
duration=clip_duration,
ratio=ratio_choice
)
video_clips.append(clip_path); temp_files.append(clip_path)
sharp = clip_edge_density(clip_path)
if sharp < SHARPNESS_MIN:
log.info(f"Scene {idx} low sharpness ({sharp:.4f}) - retrying with detail boost")
retry_prompt = prompt_text + ", " + RETRY_DETAIL_SUFFIX
retry_clip = runway_generate_clip(
model=runway_model,
prompt_image=img_uri,
text_prompt=retry_prompt,
duration=clip_duration,
ratio=ratio_choice
)
video_clips[-1]=retry_clip
temp_files.append(retry_clip)
progress(0.92, desc="🧡 Stitching & muxing...")
final_out=f"{sanitize_filename(topic)}_{job}.mp4"
concat_and_mux(video_clips, audio_path, final_out)
progress(1.0, desc="βœ… Complete")
log.info(f"[JOB {job}] done -> {final_out}")
return final_out
except Exception as e:
log.error(f"[JOB {job}] FAILED: {e}", exc_info=True)
raise gr.Error(f"Pipeline error: {e}")
finally:
# cleanup intermediates (keep final video)
for p in temp_files:
try:
if os.path.exists(p):
os.remove(p)
except OSError:
pass
# ---------------- UI Helpers ----------------
_cached_voices: List[str] = []
def refresh_voices():
global _cached_voices
voices = fetch_voices_paginated()
_cached_voices = [f"{v['name']}|{v['id']}" for v in voices]
return gr.update(choices=_cached_voices)
# ---------------- Gradio Interface ----------------
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# 🎬 AI Video Studio (Gen-4 / Turbo + Gemini + ElevenLabs)")
gr.Markdown(
"Iterate with Turbo, finalize with Gen-4. Upload up to 4 keyframes for stronger subject consistency."
)
with gr.Row():
topic = gr.Textbox(label="Video Topic", placeholder="e.g. The history of coffee", scale=3)
keyframes = gr.Files(label="Optional Keyframe Images (1–4)")
with gr.Row():
scene_count = gr.Slider(1, MAX_SCENES, value=DEFAULT_SCENES, step=1, label="Scenes")
clip_duration = gr.Radio(choices=sorted(list(ALLOWED_DURATIONS)), value=5, label="Seconds/Scene")
ratio = gr.Dropdown(choices=sorted(list(SUPPORTED_RATIOS)), value="1280:720", label="Aspect Ratio")
quality_mode = gr.Checkbox(label="Quality Mode (gen4 vs gen4_turbo)", value=False)
gr.Markdown("### Narration (ElevenLabs primary; fallback silent track)")
with gr.Row():
refresh_btn = gr.Button("πŸ”„ Refresh Voices")
voices_dd = gr.Dropdown(choices=[], label="ElevenLabs Voice (Name|ID)")
model_dd = gr.Dropdown(
choices=["eleven_turbo_v2_5","eleven_multilingual_v2","eleven_flash_v2_5","eleven_monolingual_v1"],
value="eleven_turbo_v2_5",
label="ElevenLabs Model"
)
streaming_chk = gr.Checkbox(label="Streaming TTS", value=False)
with gr.Row():
stability = gr.Slider(0,1,value=0.55,step=0.01,label="Stability")
similarity = gr.Slider(0,1,value=0.80,step=0.01,label="Similarity")
style = gr.Slider(0,1,value=0.25,step=0.01,label="Style")
speaker_boost = gr.Checkbox(label="Speaker Boost", value=True)
generate_btn = gr.Button("πŸš€ Generate Video", variant="primary")
output_video = gr.Video(label="Final Video")
refresh_btn.click(fn=refresh_voices, outputs=voices_dd)
generate_btn.click(
fn=generate_video,
inputs=[
topic, keyframes, scene_count, clip_duration, ratio,
quality_mode, voices_dd, model_dd, stability, similarity,
style, speaker_boost, streaming_chk
],
outputs=output_video
)
gr.Markdown(
"### Tips\n"
"- Use detailed keyframes with clear subject & lighting.\n"
"- Add emotional descriptors directly in narration text for richer prosody.\n"
"- Iterate with Turbo then switch to Quality Mode to finalize.\n"
"- Adjust Stability/Similarity for expressiveness vs consistency."
)
if __name__ == '__main__':
demo.launch()