Spaces:
Running
Running
import gradio as gr | |
import torch | |
import soundfile as sf | |
import edge_tts | |
import asyncio | |
from transformers import GPT2Tokenizer, GPT2LMHeadModel | |
from keybert import KeyBERT | |
from moviepy.editor import ( | |
VideoFileClip, | |
AudioFileClip, | |
concatenate_videoclips, | |
concatenate_audioclips, | |
CompositeAudioClip, | |
AudioClip, | |
TextClip, | |
CompositeVideoClip, | |
VideoClip | |
) | |
import numpy as np | |
import json | |
import logging | |
import os | |
import requests | |
import re | |
import math | |
import tempfile | |
import shutil | |
import uuid | |
import threading | |
import time | |
from datetime import datetime, timedelta | |
# ------------------- Configuración de Timeout ------------------- | |
os.environ["GRADIO_SERVER_TIMEOUT"] = "1800" # 30 minutos en segundos | |
# ------------------- Configuración & Globals ------------------- | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
logger = logging.getLogger(__name__) | |
PEXELS_API_KEY = os.getenv("PEXELS_API_KEY") | |
if not PEXELS_API_KEY: | |
logger.warning("PEXELS_API_KEY no definido. Los videos no funcionarán.") | |
tokenizer, gpt2_model, kw_model = None, None, None | |
RESULTS_DIR = "video_results" | |
os.makedirs(RESULTS_DIR, exist_ok=True) | |
TASKS = {} | |
# ------------------- Motor Edge TTS ------------------- | |
class EdgeTTSEngine: | |
def __init__(self, voice="es-ES-AlvaroNeural"): | |
self.voice = voice | |
logger.info(f"Inicializando Edge TTS con voz: {voice}") | |
async def _synthesize_async(self, text, output_path): | |
"""Sintetiza texto a voz usando Edge TTS de forma asíncrona""" | |
try: | |
communicate = edge_tts.Communicate(text, self.voice) | |
await communicate.save(output_path) | |
return True | |
except Exception as e: | |
logger.error(f"Error en Edge TTS: {e}") | |
return False | |
def synthesize(self, text, output_path): | |
"""Sintetiza texto a voz (wrapper síncrono)""" | |
try: | |
# Ejecutar la función async en un nuevo loop | |
return asyncio.run(self._synthesize_async(text, output_path)) | |
except Exception as e: | |
logger.error(f"Error al sintetizar con Edge TTS: {e}") | |
return False | |
# Instancia global del motor TTS | |
tts_engine = EdgeTTSEngine() | |
# ------------------- Carga Perezosa de Modelos ------------------- | |
def get_tokenizer(): | |
global tokenizer | |
if tokenizer is None: | |
logger.info("Cargando tokenizer GPT2 español...") | |
tokenizer = GPT2Tokenizer.from_pretrained("datificate/gpt2-small-spanish") | |
if tokenizer.pad_token is None: | |
tokenizer.pad_token = tokenizer.eos_token | |
return tokenizer | |
def get_gpt2_model(): | |
global gpt2_model | |
if gpt2_model is None: | |
logger.info("Cargando modelo GPT-2 español...") | |
gpt2_model = GPT2LMHeadModel.from_pretrained("datificate/gpt2-small-spanish").eval() | |
return gpt2_model | |
def get_kw_model(): | |
global kw_model | |
if kw_model is None: | |
logger.info("Cargando modelo KeyBERT multilingüe...") | |
kw_model = KeyBERT("paraphrase-multilingual-MiniLM-L12-v2") | |
return kw_model | |
# ------------------- Funciones del Pipeline ------------------- | |
def update_task_progress(task_id, message): | |
if task_id in TASKS: | |
TASKS[task_id]['progress_log'] = message | |
logger.info(f"[{task_id}] {message}") | |
def gpt2_script(prompt: str) -> str: | |
"""Genera un guión usando GPT-2""" | |
try: | |
local_tokenizer = get_tokenizer() | |
local_gpt2_model = get_gpt2_model() | |
instruction = f"Escribe un guion corto y coherente sobre: {prompt}" | |
inputs = local_tokenizer(instruction, return_tensors="pt", truncation=True, max_length=512) | |
outputs = local_gpt2_model.generate( | |
**inputs, | |
max_length=160 + inputs["input_ids"].shape[1], | |
do_sample=True, | |
top_p=0.9, | |
top_k=40, | |
temperature=0.7, | |
no_repeat_ngram_size=3, | |
pad_token_id=local_tokenizer.pad_token_id, | |
eos_token_id=local_tokenizer.eos_token_id, | |
) | |
text = local_tokenizer.decode(outputs[0], skip_special_tokens=True) | |
generated = text.split("sobre:")[-1].strip() | |
return generated if generated else prompt | |
except Exception as e: | |
logger.error(f"Error generando guión: {e}") | |
return f"Hoy hablaremos sobre {prompt}. Este es un tema fascinante que merece nuestra atención." | |
def generate_tts_audio(text: str, output_path: str) -> bool: | |
"""Genera audio usando Edge TTS""" | |
try: | |
logger.info("Generando audio con Edge TTS...") | |
success = tts_engine.synthesize(text, output_path) | |
if success and os.path.exists(output_path) and os.path.getsize(output_path) > 0: | |
logger.info(f"Audio generado exitosamente: {output_path}") | |
return True | |
else: | |
logger.error("El archivo de audio no se generó correctamente") | |
return False | |
except Exception as e: | |
logger.error(f"Error generando TTS: {e}") | |
return False | |
def extract_keywords(text: str) -> list[str]: | |
"""Extrae palabras clave del texto para búsqueda de videos""" | |
try: | |
local_kw_model = get_kw_model() | |
clean_text = re.sub(r"[^\w\sáéíóúñÁÉÍÓÚÑ]", "", text.lower()) | |
kws = local_kw_model.extract_keywords(clean_text, stop_words="spanish", top_n=5) | |
keywords = [k.replace(" ", "+") for k, _ in kws if k] | |
return keywords if keywords else ["naturaleza", "paisaje"] | |
except Exception as e: | |
logger.error(f"Error extrayendo keywords: {e}") | |
return ["mystery", "conspiracy", "alien", "UFO", "secret", "cover-up", "illusion", "paranoia", | |
"secret society", "lie", "simulation", "matrix", "terror", "darkness", "shadow", "enigma", | |
"urban legend", "unknown", "hidden", "mistrust", "experiment", "government", "control", | |
"surveillance", "propaganda", "deception", "whistleblower", "anomaly", "extraterrestrial", | |
"shadow government", "cabal", "deep state", "new world order", "mind control", "brainwashing", | |
"disinformation", "false flag", "assassin", "black ops", "anomaly", "men in black", "abduction", | |
"hybrid", "ancient aliens", "hollow earth", "simulation theory", "alternate reality", "predictive programming", | |
"symbolism", "occult", "eerie", "haunting", "unexplained", "forbidden knowledge", "redacted", "conspiracy theorist"] | |
def search_pexels_videos(query: str, count: int = 3) -> list[dict]: | |
"""Busca videos en Pexels""" | |
if not PEXELS_API_KEY: | |
return [] | |
try: | |
response = requests.get( | |
"https://api.pexels.com/videos/search", | |
headers={"Authorization": PEXELS_API_KEY}, | |
params={"query": query, "per_page": count, "orientation": "landscape"}, | |
timeout=20 | |
) | |
response.raise_for_status() | |
return response.json().get("videos", []) | |
except Exception as e: | |
logger.error(f"Error buscando videos en Pexels: {e}") | |
return [] | |
def download_video(url: str, folder: str) -> str | None: | |
"""Descarga un video desde URL""" | |
try: | |
filename = f"{uuid.uuid4().hex}.mp4" | |
filepath = os.path.join(folder, filename) | |
with requests.get(url, stream=True, timeout=60) as response: | |
response.raise_for_status() | |
with open(filepath, "wb") as f: | |
for chunk in response.iter_content(chunk_size=1024*1024): | |
f.write(chunk) | |
if os.path.exists(filepath) and os.path.getsize(filepath) > 1000: | |
return filepath | |
else: | |
logger.error(f"Archivo descargado inválido: {filepath}") | |
return None | |
except Exception as e: | |
logger.error(f"Error descargando video {url}: {e}") | |
return None | |
def create_subtitle_clips(script: str, video_width: int, video_height: int, duration: float): | |
"""Crea clips de subtítulos""" | |
try: | |
sentences = [s.strip() for s in re.split(r"[.!?¿¡]", script) if s.strip()] | |
if not sentences: | |
return [] | |
total_words = sum(len(s.split()) for s in sentences) or 1 | |
time_per_word = duration / total_words | |
clips = [] | |
current_time = 0.0 | |
for sentence in sentences: | |
num_words = len(sentence.split()) | |
sentence_duration = num_words * time_per_word | |
if sentence_duration < 0.5: | |
continue | |
txt_clip = ( | |
TextClip( | |
sentence, | |
fontsize=max(20, int(video_height * 0.05)), | |
color="white", | |
stroke_color="black", | |
stroke_width=2, | |
method="caption", | |
size=(int(video_width * 0.9), None), | |
font="Arial-Bold" | |
) | |
.set_start(current_time) | |
.set_duration(sentence_duration) | |
.set_position(("center", "bottom")) | |
) | |
clips.append(txt_clip) | |
current_time += sentence_duration | |
return clips | |
except Exception as e: | |
logger.error(f"Error creando subtítulos: {e}") | |
return [] | |
def loop_audio_to_duration(audio_clip: AudioFileClip, target_duration: float) -> AudioFileClip: | |
"""Hace loop del audio hasta alcanzar la duración objetivo""" | |
try: | |
if audio_clip.duration >= target_duration: | |
return audio_clip.subclip(0, target_duration) | |
loops_needed = math.ceil(target_duration / audio_clip.duration) | |
looped_audio = concatenate_audioclips([audio_clip] * loops_needed) | |
return looped_audio.subclip(0, target_duration) | |
except Exception as e: | |
logger.error(f"Error haciendo loop del audio: {e}") | |
return audio_clip | |
def create_video(script_text: str, generate_script: bool, music_path: str | None, task_id: str) -> str: | |
"""Función principal para crear el video""" | |
temp_dir = tempfile.mkdtemp() | |
try: | |
# Paso 1: Generar o usar guión | |
update_task_progress(task_id, "Paso 1/7: Preparando guión...") | |
if generate_script: | |
script = gpt2_script(script_text) | |
else: | |
script = script_text.strip() | |
if not script: | |
raise ValueError("El guión está vacío") | |
# Paso 2: Generar audio TTS | |
update_task_progress(task_id, "Paso 2/7: Generando audio con Edge TTS...") | |
audio_path = os.path.join(temp_dir, "voice.wav") | |
if not generate_tts_audio(script, audio_path): | |
raise RuntimeError("Error generando el audio TTS") | |
voice_clip = AudioFileClip(audio_path) | |
video_duration = voice_clip.duration | |
if video_duration < 1: | |
raise ValueError("El audio generado es demasiado corto") | |
# Paso 3: Buscar y descargar videos | |
update_task_progress(task_id, "Paso 3/7: Buscando videos en Pexels...") | |
video_paths = [] | |
keywords = extract_keywords(script) | |
for i, keyword in enumerate(keywords[:3]): # Límite de 3 keywords | |
update_task_progress(task_id, f"Paso 3/7: Buscando videos para '{keyword}' ({i+1}/{len(keywords[:3])})") | |
videos = search_pexels_videos(keyword, 2) | |
for video_data in videos: | |
if len(video_paths) >= 6: # Límite de 6 videos | |
break | |
video_files = video_data.get("video_files", []) | |
if video_files: | |
# Tomar el video de mejor calidad | |
best_file = max(video_files, key=lambda f: f.get("width", 0)) | |
video_url = best_file.get("link") | |
if video_url: | |
downloaded_path = download_video(video_url, temp_dir) | |
if downloaded_path: | |
video_paths.append(downloaded_path) | |
if not video_paths: | |
raise RuntimeError("No se pudieron descargar videos de Pexels") | |
# Paso 4: Procesar videos | |
update_task_progress(task_id, f"Paso 4/7: Procesando {len(video_paths)} videos...") | |
video_clips = [] | |
for path in video_paths: | |
try: | |
clip = VideoFileClip(path) | |
# Tomar máximo 8 segundos de cada clip | |
duration = min(8, clip.duration) | |
video_clips.append(clip.subclip(0, duration)) | |
except Exception as e: | |
logger.error(f"Error procesando video {path}: {e}") | |
continue | |
if not video_clips: | |
raise RuntimeError("No se pudieron procesar los videos") | |
# Concatenar videos | |
base_video = concatenate_videoclips(video_clips, method="chain") | |
# Extender video si es más corto que el audio | |
if base_video.duration < video_duration: | |
loops_needed = math.ceil(video_duration / base_video.duration) | |
base_video = concatenate_videoclips([base_video] * loops_needed) | |
# Cortar al tiempo exacto del audio | |
base_video = base_video.subclip(0, video_duration) | |
# Paso 5: Componer audio final | |
update_task_progress(task_id, "Paso 5/7: Componiendo audio...") | |
if music_path and os.path.exists(music_path): | |
try: | |
music_clip = AudioFileClip(music_path) | |
music_clip = loop_audio_to_duration(music_clip, video_duration).volumex(0.2) | |
final_audio = CompositeAudioClip([music_clip, voice_clip]) | |
except Exception as e: | |
logger.error(f"Error con música: {e}") | |
final_audio = voice_clip | |
else: | |
final_audio = voice_clip | |
# Paso 6: Crear subtítulos | |
update_task_progress(task_id, "Paso 6/7: Agregando subtítulos...") | |
subtitle_clips = create_subtitle_clips(script, base_video.w, base_video.h, video_duration) | |
# Paso 7: Renderizar video final | |
update_task_progress(task_id, "Paso 7/7: Renderizando video final...") | |
final_video = CompositeVideoClip([base_video] + subtitle_clips).set_audio(final_audio) | |
output_path = os.path.join(RESULTS_DIR, f"video_{task_id}.mp4") | |
final_video.write_videofile( | |
output_path, | |
fps=24, | |
codec="libx264", | |
audio_codec="aac", | |
threads=2, | |
logger=None, | |
verbose=False | |
) | |
# Limpiar clips | |
voice_clip.close() | |
if 'music_clip' in locals(): | |
music_clip.close() | |
base_video.close() | |
final_video.close() | |
for clip in video_clips: | |
clip.close() | |
return output_path | |
except Exception as e: | |
logger.error(f"Error creando video: {e}") | |
raise | |
finally: | |
# Limpiar directorio temporal | |
try: | |
shutil.rmtree(temp_dir) | |
except: | |
pass | |
def worker_thread(task_id: str, mode: str, topic: str, user_script: str, music_path: str | None): | |
"""Hilo worker para procesamiento de video""" | |
try: | |
generate_script = (mode == "Generar Guion con IA") | |
content = topic if generate_script else user_script | |
output_path = create_video(content, generate_script, music_path, task_id) | |
TASKS[task_id].update({ | |
"status": "done", | |
"result": output_path, | |
"progress_log": "✅ ¡Video completado exitosamente!" | |
}) | |
except Exception as e: | |
logger.error(f"Error en worker {task_id}: {e}") | |
TASKS[task_id].update({ | |
"status": "error", | |
"error": str(e), | |
"progress_log": f"❌ Error: {str(e)}" | |
}) | |
def generate_video_with_progress(mode, topic, user_script, music): | |
"""Función principal que maneja la generación con progreso en tiempo real""" | |
# Validar entrada | |
content = topic if mode == "Generar Guion con IA" else user_script | |
if not content or not content.strip(): | |
yield "❌ Error: Por favor, ingresa un tema o guion.", None, None | |
return | |
# Crear tarea | |
task_id = uuid.uuid4().hex[:8] | |
TASKS[task_id] = { | |
"status": "processing", | |
"progress_log": "🚀 Iniciando generación de video...", | |
"timestamp": datetime.utcnow() | |
} | |
# Iniciar worker | |
worker = threading.Thread( | |
target=worker_thread, | |
args=(task_id, mode, topic, user_script, music), | |
daemon=True | |
) | |
worker.start() | |
# Monitorear progreso | |
while TASKS[task_id]["status"] == "processing": | |
yield TASKS[task_id]['progress_log'], None, None | |
time.sleep(1) | |
# Retornar resultado final | |
if TASKS[task_id]["status"] == "error": | |
yield TASKS[task_id]['progress_log'], None, None | |
elif TASKS[task_id]["status"] == "done": | |
result_path = TASKS[task_id]['result'] | |
yield TASKS[task_id]['progress_log'], result_path, result_path | |
# ------------------- Limpieza automática ------------------- | |
def cleanup_old_files(): | |
"""Limpia archivos antiguos cada hora""" | |
while True: | |
try: | |
time.sleep(3600) # 1 hora | |
now = datetime.utcnow() | |
logger.info("Ejecutando limpieza de archivos antiguos...") | |
for task_id, info in list(TASKS.items()): | |
if "timestamp" in info and now - info["timestamp"] > timedelta(hours=24): | |
if info.get("result") and os.path.exists(info.get("result")): | |
try: | |
os.remove(info["result"]) | |
logger.info(f"Archivo eliminado: {info['result']}") | |
except Exception as e: | |
logger.error(f"Error eliminando archivo: {e}") | |
del TASKS[task_id] | |
except Exception as e: | |
logger.error(f"Error en cleanup: {e}") | |
# Iniciar hilo de limpieza | |
threading.Thread(target=cleanup_old_files, daemon=True).start() | |
# ------------------- Interfaz Gradio ------------------- | |
def toggle_input_fields(mode): | |
"""Alterna los campos de entrada según el modo seleccionado""" | |
return ( | |
gr.update(visible=mode == "Generar Guion con IA"), | |
gr.update(visible=mode != "Generar Guion con IA") | |
) | |
# Crear interfaz | |
with gr.Blocks(title="🎬 Generador de Videos IA", theme=gr.themes.Soft()) as demo: | |
gr.Markdown(""" | |
# 🎬 Generador de Videos con IA | |
Crea videos profesionales a partir de texto usando: | |
- **Edge TTS** para voz en español | |
- **GPT-2** para generación de guiones | |
- **Pexels API** para videos de stock | |
- **Subtítulos automáticos** y efectos visuales | |
El progreso se mostrará en tiempo real. | |
""") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
gr.Markdown("### ⚙️ Configuración") | |
mode_radio = gr.Radio( | |
choices=["Generar Guion con IA", "Usar Mi Guion"], | |
value="Generar Guion con IA", | |
label="Método de creación" | |
) | |
topic_input = gr.Textbox( | |
label="💡 Tema para la IA", | |
placeholder="Ej: Los misterios del océano profundo", | |
lines=2 | |
) | |
script_input = gr.Textbox( | |
label="📝 Tu Guion Completo", | |
placeholder="Escribe aquí tu guion personalizado...", | |
lines=8, | |
visible=False | |
) | |
music_input = gr.Audio( | |
type="filepath", | |
label="🎵 Música de fondo (opcional)" | |
) | |
generate_btn = gr.Button( | |
"🎬 Generar Video", | |
variant="primary", | |
size="lg" | |
) | |
with gr.Column(scale=2): | |
gr.Markdown("### 📊 Progreso y Resultados") | |
progress_output = gr.Textbox( | |
label="📋 Log de progreso en tiempo real", | |
lines=12, | |
interactive=False, | |
show_copy_button=True | |
) | |
video_output = gr.Video( | |
label="🎥 Video generado", | |
height=400 | |
) | |
download_output = gr.File( | |
label="📥 Descargar archivo" | |
) | |
# Event handlers | |
mode_radio.change( | |
fn=toggle_input_fields, | |
inputs=[mode_radio], | |
outputs=[topic_input, script_input] | |
) | |
generate_btn.click( | |
fn=generate_video_with_progress, | |
inputs=[mode_radio, topic_input, script_input, music_input], | |
outputs=[progress_output, video_output, download_output] | |
) | |
gr.Markdown(""" | |
### 📋 Instrucciones: | |
1. **Elige el método**: Genera un guion con IA o usa el tuyo propio | |
2. **Configura el contenido**: Ingresa un tema interesante o tu guion | |
3. **Música opcional**: Sube un archivo de audio para fondo musical | |
4. **Genera**: Presiona el botón y observa el progreso en tiempo real | |
⏱️ **Tiempo estimado**: 2-5 minutos dependiendo de la duración del contenido. | |
""") | |
# Ejecutar aplicación | |
if __name__ == "__main__": | |
logger.info("🚀 Iniciando aplicación Generador de Videos IA...") | |
# Configurar la cola (versión compatible) | |
demo.queue(max_size=10) | |
# Lanzar aplicación (parámetros básicos compatibles) | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
show_api=False | |
) |