Update core/visual_engine.py
Browse files- core/visual_engine.py +154 -138
core/visual_engine.py
CHANGED
@@ -1,192 +1,208 @@
|
|
1 |
# core/visual_engine.py
|
|
|
|
|
|
|
2 |
from PIL import Image, ImageDraw, ImageFont
|
3 |
from moviepy.editor import ImageClip, concatenate_videoclips
|
4 |
import os
|
5 |
|
|
|
|
|
|
|
|
|
6 |
class VisualEngine:
|
7 |
-
def __init__(self, output_dir=
|
8 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
9 |
os.makedirs(self.output_dir, exist_ok=True)
|
|
|
10 |
|
11 |
# --- Font Setup ---
|
12 |
-
|
13 |
-
# Example: COPY assets/fonts/arial.ttf /usr/local/share/fonts/truetype/mycustomfonts/arial.ttf
|
14 |
-
self.font_filename = "arial.ttf" # Or "DejaVuSans.ttf" or your chosen font
|
15 |
-
self.font_path_in_container = f"/usr/local/share/fonts/truetype/mycustomfonts/{self.font_filename}"
|
16 |
self.font_size_pil = 24
|
17 |
|
18 |
-
|
19 |
-
|
20 |
-
|
21 |
-
|
22 |
-
|
23 |
-
|
24 |
-
|
25 |
self.font = ImageFont.load_default()
|
26 |
-
# Adjust
|
27 |
-
|
28 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
29 |
|
30 |
def _get_text_dimensions(self, text_content, font_obj):
|
31 |
"""
|
32 |
Gets the width and height of a single line of text with the given font.
|
33 |
Returns (width, height).
|
34 |
"""
|
35 |
-
if
|
36 |
-
return 0, self.font_size_pil
|
37 |
|
38 |
try:
|
39 |
-
|
40 |
-
|
41 |
bbox = font_obj.getbbox(text_content)
|
42 |
-
|
43 |
-
|
44 |
-
|
45 |
-
return
|
46 |
-
elif hasattr(font_obj, 'getsize'): # For older Pillow versions
|
47 |
-
width, height = font_obj.getsize(text_content)
|
48 |
-
return width, height if height > 0 else self.font_size_pil
|
49 |
-
else: # Fallback for very basic font objects (like default font after failure)
|
50 |
-
avg_char_width = self.font_size_pil * 0.6 # Rough estimate
|
51 |
-
height_estimate = self.font_size_pil * 1.2
|
52 |
-
return int(len(text_content) * avg_char_width), int(height_estimate if height_estimate > 0 else self.font_size_pil)
|
53 |
except Exception as e:
|
54 |
-
|
55 |
-
|
56 |
-
|
57 |
-
|
58 |
-
|
59 |
-
|
60 |
|
61 |
def create_placeholder_image(self, text_description, filename, size=(1280, 720)):
|
62 |
-
|
63 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
64 |
|
65 |
-
|
66 |
-
|
|
|
67 |
lines = []
|
|
|
68 |
|
69 |
-
if not text_description: # Handle case where text_description might be None or empty
|
70 |
-
text_description = "(No description provided)"
|
71 |
-
|
72 |
-
words = text_description.split()
|
73 |
-
current_line = ""
|
74 |
-
|
75 |
-
if not self.font: # Should not happen if __init__ fallback works, but as a safeguard
|
76 |
-
print("Error: Font object is not initialized in create_placeholder_image. Cannot draw text.")
|
77 |
-
return None
|
78 |
-
|
79 |
for word in words:
|
80 |
-
|
81 |
-
|
82 |
-
|
83 |
-
if line_width <= max_text_width and current_line != "": # If it fits and current_line is not empty
|
84 |
-
current_line = test_line_candidate
|
85 |
-
elif line_width <= max_text_width and current_line == "": # First word, and it fits
|
86 |
-
current_line = test_line_candidate
|
87 |
-
elif current_line != "": # If it doesn't fit and current_line is not empty, finalize current_line
|
88 |
-
lines.append(current_line.strip())
|
89 |
-
current_line = word + " " # Start new line with current word
|
90 |
-
else: # Word itself is too long for a line
|
91 |
-
# Simple truncation for very long words (can be improved with character-level wrapping)
|
92 |
-
temp_word = word
|
93 |
-
while self._get_text_dimensions(temp_word, self.font)[0] > max_text_width and len(temp_word) > 0:
|
94 |
-
temp_word = temp_word[:-1]
|
95 |
-
lines.append(temp_word)
|
96 |
-
current_line = "" # Reset current line
|
97 |
-
|
98 |
-
if current_line.strip(): # Add the last line if it has content
|
99 |
-
lines.append(current_line.strip())
|
100 |
-
|
101 |
-
if not lines: # If after all that, lines is empty (e.g. only very long words that got truncated to nothing)
|
102 |
-
lines.append("(Text too long or error)")
|
103 |
-
|
104 |
-
# Calculate starting y position to center the text block
|
105 |
-
_, single_line_height = self._get_text_dimensions("Tg", self.font) # Get height of a typical line
|
106 |
-
if single_line_height == 0: # Safety for failed get_text_dimensions
|
107 |
-
single_line_height = self.font_size_pil
|
108 |
|
109 |
-
|
110 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
111 |
|
112 |
-
|
113 |
-
|
114 |
-
|
115 |
-
|
116 |
-
for line in lines:
|
117 |
-
line_width, _ = self._get_text_dimensions(line, self.font)
|
118 |
-
x_text = (size[0] - line_width) / 2.0
|
119 |
-
if x_text < padding: # Ensure text doesn't start too left
|
120 |
-
x_text = float(padding)
|
121 |
-
|
122 |
-
draw.text(
|
123 |
-
xy=(x_text, y_text),
|
124 |
-
text=line,
|
125 |
-
fill=(220, 220, 150), # Lighter Yellow text
|
126 |
-
font=self.font
|
127 |
-
)
|
128 |
-
y_text += single_line_height * line_spacing_factor
|
129 |
-
|
130 |
-
filepath = os.path.join(self.output_dir, filename)
|
131 |
-
try:
|
132 |
-
img.save(filepath)
|
133 |
-
# print(f"Placeholder image saved: {filepath}") # Can be noisy, uncomment for debugging
|
134 |
-
except Exception as e:
|
135 |
-
print(f"Error saving image {filepath}: {e}")
|
136 |
-
return None
|
137 |
-
return filepath
|
138 |
-
|
139 |
|
140 |
def create_video_from_images(self, image_paths, output_filename="final_video.mp4", fps=1, duration_per_image=3):
|
|
|
141 |
if not image_paths:
|
142 |
-
|
143 |
return None
|
144 |
-
|
145 |
-
|
146 |
-
if not
|
147 |
-
|
148 |
return None
|
|
|
|
|
149 |
|
150 |
-
print(f"Attempting to create video from {len(valid_image_paths)} images: {valid_image_paths}")
|
151 |
-
|
152 |
try:
|
153 |
clips = []
|
154 |
-
for
|
155 |
try:
|
156 |
-
clip = ImageClip(
|
157 |
clips.append(clip)
|
158 |
-
except Exception as
|
159 |
-
|
160 |
|
161 |
if not clips:
|
162 |
-
print("Could not create any ImageClips from the provided image paths.")
|
163 |
return None
|
164 |
|
165 |
-
|
166 |
output_path = os.path.join(self.output_dir, output_filename)
|
167 |
|
168 |
-
|
169 |
-
|
170 |
output_path,
|
171 |
fps=fps,
|
172 |
codec='libx264',
|
173 |
-
audio_codec='aac',
|
174 |
-
temp_audiofile=os.path.join(self.output_dir, '
|
175 |
remove_temp=True,
|
176 |
-
threads=os.cpu_count() or 2,
|
177 |
-
logger=
|
178 |
)
|
179 |
-
|
180 |
-
|
181 |
-
|
182 |
-
|
183 |
-
|
184 |
-
|
185 |
-
|
186 |
return output_path
|
|
|
187 |
except Exception as e:
|
188 |
-
|
189 |
-
if isinstance(e, OSError):
|
190 |
-
|
191 |
-
"its installation, or permissions to write temporary files.")
|
192 |
return None
|
|
|
1 |
# core/visual_engine.py
|
2 |
+
import tempfile
|
3 |
+
import logging
|
4 |
+
from pathlib import Path
|
5 |
from PIL import Image, ImageDraw, ImageFont
|
6 |
from moviepy.editor import ImageClip, concatenate_videoclips
|
7 |
import os
|
8 |
|
9 |
+
# Set up logging
|
10 |
+
logger = logging.getLogger(__name__)
|
11 |
+
logging.basicConfig(level=logging.INFO)
|
12 |
+
|
13 |
class VisualEngine:
|
14 |
+
def __init__(self, output_dir=None):
|
15 |
+
"""
|
16 |
+
Initialize the visual engine with a safe output directory.
|
17 |
+
|
18 |
+
If no output_dir is provided, creates a temporary directory in the system temp location.
|
19 |
+
This avoids permission issues as /tmp is typically writable by all users.
|
20 |
+
"""
|
21 |
+
self.output_dir = output_dir or self._create_temp_output_dir()
|
22 |
+
logger.info(f"Using output directory: {self.output_dir}")
|
23 |
+
|
24 |
+
# Ensure the directory exists and has proper permissions
|
25 |
os.makedirs(self.output_dir, exist_ok=True)
|
26 |
+
os.chmod(self.output_dir, 0o775) # Ensure writable by user and group
|
27 |
|
28 |
# --- Font Setup ---
|
29 |
+
self.font_filename = "arial.ttf"
|
|
|
|
|
|
|
30 |
self.font_size_pil = 24
|
31 |
|
32 |
+
# Try multiple font locations with fallbacks
|
33 |
+
self.font = self._load_font_with_fallbacks()
|
34 |
+
|
35 |
+
if self.font:
|
36 |
+
logger.info(f"Successfully loaded font: {self.font_path}")
|
37 |
+
else:
|
38 |
+
logger.warning("Could not load any custom font. Falling back to default font.")
|
39 |
self.font = ImageFont.load_default()
|
40 |
+
self.font_size_pil = 11 # Adjust for default font
|
41 |
+
|
42 |
+
def _create_temp_output_dir(self):
|
43 |
+
"""Create a temporary directory with appropriate permissions"""
|
44 |
+
temp_dir = tempfile.mkdtemp(prefix="cinegen_media_")
|
45 |
+
os.chmod(temp_dir, 0o775) # rwxrwxr-x permissions
|
46 |
+
return temp_dir
|
47 |
+
|
48 |
+
def _load_font_with_fallbacks(self):
|
49 |
+
"""Try multiple font locations with graceful fallback"""
|
50 |
+
# List of possible font locations (Docker and local development)
|
51 |
+
possible_font_paths = [
|
52 |
+
# Docker container path
|
53 |
+
f"/usr/local/share/fonts/truetype/mycustomfonts/{self.font_filename}",
|
54 |
+
# Local development path (relative to script)
|
55 |
+
os.path.join(os.path.dirname(__file__), "..", "assets", "fonts", self.font_filename),
|
56 |
+
# System fonts as last resort
|
57 |
+
f"/usr/share/fonts/truetype/{self.font_filename}",
|
58 |
+
f"/usr/share/fonts/TTF/{self.font_filename}"
|
59 |
+
]
|
60 |
+
|
61 |
+
for font_path in possible_font_paths:
|
62 |
+
try:
|
63 |
+
if os.path.exists(font_path):
|
64 |
+
self.font_path = font_path
|
65 |
+
return ImageFont.truetype(font_path, self.font_size_pil)
|
66 |
+
except IOError as e:
|
67 |
+
logger.warning(f"Could not load font from {font_path}: {str(e)}")
|
68 |
+
|
69 |
+
return None
|
70 |
|
71 |
def _get_text_dimensions(self, text_content, font_obj):
|
72 |
"""
|
73 |
Gets the width and height of a single line of text with the given font.
|
74 |
Returns (width, height).
|
75 |
"""
|
76 |
+
if not text_content:
|
77 |
+
return 0, self.font_size_pil
|
78 |
|
79 |
try:
|
80 |
+
# Modern Pillow (>=8.0.0)
|
81 |
+
if hasattr(font_obj, 'getbbox'):
|
82 |
bbox = font_obj.getbbox(text_content)
|
83 |
+
return bbox[2] - bbox[0], bbox[3] - bbox[1]
|
84 |
+
# Legacy Pillow
|
85 |
+
elif hasattr(font_obj, 'getsize'):
|
86 |
+
return font_obj.getsize(text_content)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
87 |
except Exception as e:
|
88 |
+
logger.warning(f"Error measuring text: {str(e)}")
|
89 |
+
|
90 |
+
# Fallback calculation
|
91 |
+
avg_char_width = self.font_size_pil * 0.6
|
92 |
+
return int(len(text_content) * avg_char_width), self.font_size_pil
|
|
|
93 |
|
94 |
def create_placeholder_image(self, text_description, filename, size=(1280, 720)):
|
95 |
+
"""Create placeholder image with wrapped text"""
|
96 |
+
try:
|
97 |
+
img = Image.new('RGB', size, color=(30, 30, 60))
|
98 |
+
draw = ImageDraw.Draw(img)
|
99 |
+
|
100 |
+
if not text_description:
|
101 |
+
text_description = "No description provided"
|
102 |
+
|
103 |
+
# Create text with wrapping
|
104 |
+
lines = self._wrap_text(text_description, size[0] - 80, draw)
|
105 |
+
|
106 |
+
# Calculate vertical position to center text
|
107 |
+
_, line_height = self._get_text_dimensions("Tg", self.font)
|
108 |
+
total_height = len(lines) * line_height * 1.3
|
109 |
+
y_pos = max(40, (size[1] - total_height) / 2)
|
110 |
+
|
111 |
+
# Draw each line
|
112 |
+
for line in lines:
|
113 |
+
line_width, _ = self._get_text_dimensions(line, self.font)
|
114 |
+
x_pos = (size[0] - line_width) / 2
|
115 |
+
draw.text((x_pos, y_pos), line, fill=(220, 220, 150), font=self.font)
|
116 |
+
y_pos += line_height * 1.3
|
117 |
+
|
118 |
+
# Save to output directory
|
119 |
+
output_path = os.path.join(self.output_dir, filename)
|
120 |
+
img.save(output_path)
|
121 |
+
logger.debug(f"Created placeholder image: {output_path}")
|
122 |
+
return output_path
|
123 |
+
|
124 |
+
except Exception as e:
|
125 |
+
logger.error(f"Error creating placeholder image: {str(e)}")
|
126 |
+
return None
|
127 |
|
128 |
+
def _wrap_text(self, text, max_width, draw):
|
129 |
+
"""Wrap text to fit within specified width"""
|
130 |
+
words = text.split()
|
131 |
lines = []
|
132 |
+
current_line = []
|
133 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
134 |
for word in words:
|
135 |
+
test_line = ' '.join(current_line + [word])
|
136 |
+
test_width, _ = self._get_text_dimensions(test_line, self.font)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
137 |
|
138 |
+
if test_width <= max_width:
|
139 |
+
current_line.append(word)
|
140 |
+
else:
|
141 |
+
if current_line:
|
142 |
+
lines.append(' '.join(current_line))
|
143 |
+
current_line = [word]
|
144 |
+
|
145 |
+
# Handle very long words
|
146 |
+
if self._get_text_dimensions(word, self.font)[0] > max_width:
|
147 |
+
# Break word if needed
|
148 |
+
while self._get_text_dimensions(''.join(current_line), self.font)[0] > max_width:
|
149 |
+
current_line[0] = current_line[0][:-1]
|
150 |
|
151 |
+
if current_line:
|
152 |
+
lines.append(' '.join(current_line))
|
153 |
+
|
154 |
+
return lines or ["(Text rendering error)"]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
155 |
|
156 |
def create_video_from_images(self, image_paths, output_filename="final_video.mp4", fps=1, duration_per_image=3):
|
157 |
+
"""Create video from sequence of images"""
|
158 |
if not image_paths:
|
159 |
+
logger.error("No images provided for video creation")
|
160 |
return None
|
161 |
+
|
162 |
+
valid_paths = [p for p in image_paths if p and os.path.exists(p)]
|
163 |
+
if not valid_paths:
|
164 |
+
logger.error("No valid image paths found")
|
165 |
return None
|
166 |
+
|
167 |
+
logger.info(f"Creating video from {len(valid_paths)} images")
|
168 |
|
|
|
|
|
169 |
try:
|
170 |
clips = []
|
171 |
+
for img_path in valid_paths:
|
172 |
try:
|
173 |
+
clip = ImageClip(img_path).set_duration(duration_per_image)
|
174 |
clips.append(clip)
|
175 |
+
except Exception as e:
|
176 |
+
logger.error(f"Error processing {img_path}: {str(e)}")
|
177 |
|
178 |
if not clips:
|
|
|
179 |
return None
|
180 |
|
181 |
+
video = concatenate_videoclips(clips, method="compose")
|
182 |
output_path = os.path.join(self.output_dir, output_filename)
|
183 |
|
184 |
+
# Write video file
|
185 |
+
video.write_videofile(
|
186 |
output_path,
|
187 |
fps=fps,
|
188 |
codec='libx264',
|
189 |
+
audio_codec='aac',
|
190 |
+
temp_audiofile=os.path.join(self.output_dir, 'temp_audio.m4a'),
|
191 |
remove_temp=True,
|
192 |
+
threads=os.cpu_count() or 2,
|
193 |
+
logger=None # Suppress moviepy's verbose output
|
194 |
)
|
195 |
+
|
196 |
+
# Clean up resources
|
197 |
+
for clip in clips:
|
198 |
+
clip.close()
|
199 |
+
video.close()
|
200 |
+
|
201 |
+
logger.info(f"Video created: {output_path}")
|
202 |
return output_path
|
203 |
+
|
204 |
except Exception as e:
|
205 |
+
logger.error(f"Video creation failed: {str(e)}")
|
206 |
+
if isinstance(e, OSError):
|
207 |
+
logger.error("OSError: Check ffmpeg installation and file permissions")
|
|
|
208 |
return None
|