File size: 8,301 Bytes
287c9ca 5470dfc 287c9ca 5470dfc 287c9ca 5470dfc 287c9ca 5470dfc 50c620f 5470dfc 50c620f 5470dfc b97795f 5470dfc b97795f 5470dfc 50c620f 5470dfc 50c620f 5470dfc 50c620f 5470dfc 287c9ca 5470dfc b97795f 5470dfc 287c9ca 5470dfc 50c620f 5470dfc 50c620f 5470dfc 287c9ca 5470dfc b97795f 287c9ca 5470dfc 287c9ca 5470dfc 287c9ca 5470dfc 287c9ca 5470dfc b97795f 50c620f 5470dfc 50c620f 5470dfc 50c620f 5470dfc 50c620f b97795f 5470dfc b97795f 287c9ca 5470dfc b97795f 50c620f 5470dfc 50c620f 5470dfc b97795f 5470dfc b97795f 5470dfc b97795f 5470dfc b97795f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
# core/visual_engine.py
import tempfile
import logging
from pathlib import Path
from PIL import Image, ImageDraw, ImageFont
from moviepy.editor import ImageClip, concatenate_videoclips
import os
# Set up logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
class VisualEngine:
def __init__(self, output_dir=None):
"""
Initialize the visual engine with a safe output directory.
If no output_dir is provided, creates a temporary directory in the system temp location.
This avoids permission issues as /tmp is typically writable by all users.
"""
self.output_dir = output_dir or self._create_temp_output_dir()
logger.info(f"Using output directory: {self.output_dir}")
# Ensure the directory exists and has proper permissions
os.makedirs(self.output_dir, exist_ok=True)
os.chmod(self.output_dir, 0o775) # Ensure writable by user and group
# --- Font Setup ---
self.font_filename = "arial.ttf"
self.font_size_pil = 24
# Try multiple font locations with fallbacks
self.font = self._load_font_with_fallbacks()
if self.font:
logger.info(f"Successfully loaded font: {self.font_path}")
else:
logger.warning("Could not load any custom font. Falling back to default font.")
self.font = ImageFont.load_default()
self.font_size_pil = 11 # Adjust for default font
def _create_temp_output_dir(self):
"""Create a temporary directory with appropriate permissions"""
temp_dir = tempfile.mkdtemp(prefix="cinegen_media_")
os.chmod(temp_dir, 0o775) # rwxrwxr-x permissions
return temp_dir
def _load_font_with_fallbacks(self):
"""Try multiple font locations with graceful fallback"""
# List of possible font locations (Docker and local development)
possible_font_paths = [
# Docker container path
f"/usr/local/share/fonts/truetype/mycustomfonts/{self.font_filename}",
# Local development path (relative to script)
os.path.join(os.path.dirname(__file__), "..", "assets", "fonts", self.font_filename),
# System fonts as last resort
f"/usr/share/fonts/truetype/{self.font_filename}",
f"/usr/share/fonts/TTF/{self.font_filename}"
]
for font_path in possible_font_paths:
try:
if os.path.exists(font_path):
self.font_path = font_path
return ImageFont.truetype(font_path, self.font_size_pil)
except IOError as e:
logger.warning(f"Could not load font from {font_path}: {str(e)}")
return None
def _get_text_dimensions(self, text_content, font_obj):
"""
Gets the width and height of a single line of text with the given font.
Returns (width, height).
"""
if not text_content:
return 0, self.font_size_pil
try:
# Modern Pillow (>=8.0.0)
if hasattr(font_obj, 'getbbox'):
bbox = font_obj.getbbox(text_content)
return bbox[2] - bbox[0], bbox[3] - bbox[1]
# Legacy Pillow
elif hasattr(font_obj, 'getsize'):
return font_obj.getsize(text_content)
except Exception as e:
logger.warning(f"Error measuring text: {str(e)}")
# Fallback calculation
avg_char_width = self.font_size_pil * 0.6
return int(len(text_content) * avg_char_width), self.font_size_pil
def create_placeholder_image(self, text_description, filename, size=(1280, 720)):
"""Create placeholder image with wrapped text"""
try:
img = Image.new('RGB', size, color=(30, 30, 60))
draw = ImageDraw.Draw(img)
if not text_description:
text_description = "No description provided"
# Create text with wrapping
lines = self._wrap_text(text_description, size[0] - 80, draw)
# Calculate vertical position to center text
_, line_height = self._get_text_dimensions("Tg", self.font)
total_height = len(lines) * line_height * 1.3
y_pos = max(40, (size[1] - total_height) / 2)
# Draw each line
for line in lines:
line_width, _ = self._get_text_dimensions(line, self.font)
x_pos = (size[0] - line_width) / 2
draw.text((x_pos, y_pos), line, fill=(220, 220, 150), font=self.font)
y_pos += line_height * 1.3
# Save to output directory
output_path = os.path.join(self.output_dir, filename)
img.save(output_path)
logger.debug(f"Created placeholder image: {output_path}")
return output_path
except Exception as e:
logger.error(f"Error creating placeholder image: {str(e)}")
return None
def _wrap_text(self, text, max_width, draw):
"""Wrap text to fit within specified width"""
words = text.split()
lines = []
current_line = []
for word in words:
test_line = ' '.join(current_line + [word])
test_width, _ = self._get_text_dimensions(test_line, self.font)
if test_width <= max_width:
current_line.append(word)
else:
if current_line:
lines.append(' '.join(current_line))
current_line = [word]
# Handle very long words
if self._get_text_dimensions(word, self.font)[0] > max_width:
# Break word if needed
while self._get_text_dimensions(''.join(current_line), self.font)[0] > max_width:
current_line[0] = current_line[0][:-1]
if current_line:
lines.append(' '.join(current_line))
return lines or ["(Text rendering error)"]
def create_video_from_images(self, image_paths, output_filename="final_video.mp4", fps=1, duration_per_image=3):
"""Create video from sequence of images"""
if not image_paths:
logger.error("No images provided for video creation")
return None
valid_paths = [p for p in image_paths if p and os.path.exists(p)]
if not valid_paths:
logger.error("No valid image paths found")
return None
logger.info(f"Creating video from {len(valid_paths)} images")
try:
clips = []
for img_path in valid_paths:
try:
clip = ImageClip(img_path).set_duration(duration_per_image)
clips.append(clip)
except Exception as e:
logger.error(f"Error processing {img_path}: {str(e)}")
if not clips:
return None
video = concatenate_videoclips(clips, method="compose")
output_path = os.path.join(self.output_dir, output_filename)
# Write video file
video.write_videofile(
output_path,
fps=fps,
codec='libx264',
audio_codec='aac',
temp_audiofile=os.path.join(self.output_dir, 'temp_audio.m4a'),
remove_temp=True,
threads=os.cpu_count() or 2,
logger=None # Suppress moviepy's verbose output
)
# Clean up resources
for clip in clips:
clip.close()
video.close()
logger.info(f"Video created: {output_path}")
return output_path
except Exception as e:
logger.error(f"Video creation failed: {str(e)}")
if isinstance(e, OSError):
logger.error("OSError: Check ffmpeg installation and file permissions")
return None |