Spaces:
Sleeping
Sleeping
File size: 7,206 Bytes
a38b4f9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# β
Updated media_gen.py with file logging + UI debug toggle
import os
import re
import logging
import streamlit as st
import requests
from PIL import Image, UnidentifiedImageError
from io import BytesIO
from dotenv import load_dotenv
from moviepy.editor import ImageClip, AudioFileClip
from elevenlabs import generate, save, set_api_key
from googletrans import Translator
from PIL import ImageEnhance, Image
import tempfile
# Load env vars
load_dotenv()
# Logging setup
logging.basicConfig(
filename="app.log",
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
# Constants
OUTPUT_DIR = "outputs"
DEFAULT_IMAGE = "assets/fallback.jpg"
WATERMARK_PATH = "assets/logo_watermark.png"
UNSPLASH_ACCESS_KEY = os.getenv("UNSPLASH_ACCESS_KEY")
os.makedirs("outputs/audio", exist_ok=True)
os.makedirs("outputs/images", exist_ok=True)
os.makedirs("outputs/videos", exist_ok=True)
def translate_text(text, target_lang):
return Translator().translate(text, dest=target_lang).text
def sanitize_filename(text):
return re.sub(r'\W+', '_', text).lower()[:50]
def apply_watermark(image_path, watermark_path=WATERMARK_PATH):
try:
base = Image.open(image_path).convert("RGBA")
watermark = Image.open(watermark_path).convert("RGBA").resize((100, 100))
base.paste(watermark, (base.width - 110, base.height - 110), watermark)
base.convert("RGB").save(image_path)
except Exception as e:
logging.error(f"Watermarking failed: {e}")
st.write(f"β Watermarking failed: {e}")
def use_fallback_image(prompt, add_watermark=False):
try:
fallback_path = DEFAULT_IMAGE
output_path = f"outputs/images/{sanitize_filename(prompt)}.jpg"
with Image.open(fallback_path) as img:
img.save(output_path)
if add_watermark:
apply_watermark(output_path)
return output_path
except UnidentifiedImageError:
logging.error("Could not open fallback image.")
st.write("β Could not open fallback image.")
return None
def generate_gtts_fallback(prompt, output_path, lang="en", debug_mode=False):
try:
from gtts import gTTS
tts = gTTS(text=prompt, lang=lang)
tts.save(output_path)
logging.info(f"gTTS fallback audio saved to {output_path}")
if debug_mode:
st.write(f"β
Fallback audio (gTTS) saved to {output_path}")
return output_path
except Exception as e:
logging.error(f"gTTS fallback failed: {e}")
st.write(f"β gTTS fallback failed: {str(e)}")
return None
def generate_image(prompt, file_tag, add_watermark=False, dark_mode=False, debug_mode=False):
try:
# Enhance prompt if dark mode is enabled
if dark_mode:
prompt += " at night, dark theme, low light, moody lighting"
url = f"https://api.unsplash.com/photos/random?query={requests.utils.quote(prompt)}&client_id={UNSPLASH_ACCESS_KEY}"
response = requests.get(url, timeout=10)
response.raise_for_status()
image_url = response.json()["urls"]["regular"]
image_response = requests.get(image_url, timeout=10)
image_response.raise_for_status()
output_path = f"outputs/images/{sanitize_filename(prompt)}.jpg"
img = Image.open(BytesIO(image_response.content))
img.convert("RGB").save(output_path)
if add_watermark:
apply_watermark(output_path)
return output_path
except Exception as e:
logging.error(f"Image generation failed: {e}")
st.write("π Unsplash failed. Using fallback.")
st.write(f"β Image generation failed: {e}")
return use_fallback_image(prompt, add_watermark=add_watermark)
# β
Updated generate_audio with proper language handling
def generate_audio(prompt, output_path, debug_mode=False, lang="en"):
try:
api_key = os.getenv("ELEVEN_API_KEY") or st.secrets.get("ELEVEN_API_KEY", None)
# Use gTTS for non-English languages
if lang != "en":
if debug_mode:
st.write(f"π Non-English language selected: {lang}. Using gTTS.")
return generate_gtts_fallback(prompt, output_path, lang=lang, debug_mode=debug_mode)
if api_key:
if debug_mode:
st.write(f"β
ELEVEN_API_KEY loaded: {api_key[:4]}...****")
set_api_key(api_key)
if debug_mode:
st.write(f"π§ Generating audio for prompt: {prompt}")
try:
audio = generate(text=prompt, voice="Aria", model="eleven_monolingual_v1")
save(audio, output_path)
logging.info(f"Audio saved successfully to {output_path}")
if debug_mode:
st.write(f"π File exists after save? {os.path.exists(output_path)}")
st.write(f"β
Audio saved successfully to {output_path}")
return output_path
except Exception as e:
logging.warning(f"ElevenLabs failed: {e}")
if debug_mode:
st.write(f"β οΈ ElevenLabs failed: {str(e)}")
st.write("π Falling back to gTTS...")
return generate_gtts_fallback(prompt, output_path, lang=lang, debug_mode=debug_mode)
else:
logging.warning("ELEVEN_API_KEY not found")
if debug_mode:
st.write("β ELEVEN_API_KEY not found. Falling back to gTTS.")
return generate_gtts_fallback(prompt, output_path, lang=lang, debug_mode=debug_mode)
except Exception as e:
logging.error(f"Exception during audio generation setup: {e}")
if debug_mode:
st.write(f"β Exception during audio generation setup: {str(e)}")
st.write("π Falling back to gTTS...")
return generate_gtts_fallback(prompt, output_path, lang=lang, debug_mode=debug_mode)
def generate_video(prompt, image_path, audio_path, output_path, add_watermark=False, dark_mode=False):
try:
# If dark_mode, darken the image temporarily
if dark_mode:
with Image.open(image_path) as img:
enhancer = ImageEnhance.Brightness(img)
darker_img = enhancer.enhance(0.5) # Reduce brightness to 50%
# Save to a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
temp_image_path = tmp.name
darker_img.save(temp_image_path)
image_path = temp_image_path
audio_clip = AudioFileClip(audio_path)
image_clip = ImageClip(image_path).set_duration(audio_clip.duration).resize(height=720)
video = image_clip.set_audio(audio_clip)
output_path = f"outputs/videos/{sanitize_filename(prompt)}.mp4"
video.write_videofile(output_path, fps=24, codec="libx264", audio_codec="aac", verbose=False, logger=None)
return output_path
except Exception as e:
logging.error(f"Video generation failed: {e}")
st.write(f"β Video generation failed: {e}")
return None
|