mgbam commited on
Commit
41b47a8
·
verified ·
1 Parent(s): 87f981c

Update core/visual_engine.py

Browse files
Files changed (1) hide show
  1. core/visual_engine.py +179 -136
core/visual_engine.py CHANGED
@@ -1,175 +1,218 @@
1
  # core/visual_engine.py
2
- import tempfile
3
- import logging
4
  from PIL import Image, ImageDraw, ImageFont
5
  from moviepy.editor import ImageClip, concatenate_videoclips
6
  import os
7
-
8
- # Set up logging
9
- logger = logging.getLogger(__name__)
10
- logging.basicConfig(level=logging.INFO)
11
 
12
  class VisualEngine:
13
- def __init__(self, output_dir=None):
14
- self.output_dir = output_dir or self._create_temp_output_dir()
15
- logger.info(f"Using output directory: {self.output_dir}")
16
  os.makedirs(self.output_dir, exist_ok=True)
17
 
18
- # Font configuration
 
 
19
  self.font_size_pil = 24
20
- self.font = self._load_system_font()
21
 
22
- if self.font:
23
- logger.info(f"Using font: {self.font.path if hasattr(self.font, 'path') else 'default'}")
24
- else:
25
- logger.warning("Could not load any suitable font. Falling back to default font.")
 
 
26
  self.font = ImageFont.load_default()
27
  self.font_size_pil = 11
28
 
29
- def _create_temp_output_dir(self):
30
- """Create a temporary directory with appropriate permissions"""
31
- temp_dir = tempfile.mkdtemp(prefix="cinegen_media_")
32
- os.chmod(temp_dir, 0o775)
33
- return temp_dir
34
-
35
- def _load_system_font(self):
36
- """Load the best available system font"""
37
- font_names = [
38
- "DejaVuSans.ttf", # Common in Linux
39
- "FreeSans.ttf", # From fonts-freefont-ttf
40
- "LiberationSans-Regular.ttf", # Red Hat font
41
- "Arial.ttf", # Sometimes available
42
- "Vera.ttf" # Bitstream Vera
43
- ]
44
-
45
- # Try to load each font in order
46
- for font_name in font_names:
47
- try:
48
- return ImageFont.truetype(font_name, self.font_size_pil)
49
- except IOError:
50
- continue
51
-
52
- # Try system default sans-serif as last resort
53
- try:
54
- return ImageFont.truetype("sans-serif", self.font_size_pil)
55
- except:
56
- return None
57
 
58
- def _get_text_dimensions(self, text_content, font_obj):
59
- """Get text dimensions with modern Pillow methods"""
60
- if not text_content:
61
- return 0, self.font_size_pil
62
 
 
 
 
63
  try:
64
  if hasattr(font_obj, 'getbbox'):
65
  bbox = font_obj.getbbox(text_content)
66
- return bbox[2] - bbox[0], bbox[3] - bbox[1]
 
 
67
  elif hasattr(font_obj, 'getsize'):
68
- return font_obj.getsize(text_content)
 
 
 
 
 
69
  except Exception as e:
70
- logger.warning(f"Error measuring text: {str(e)}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71
 
72
- # Fallback calculation
73
- avg_char_width = self.font_size_pil * 0.6
74
- return int(len(text_content) * avg_char_width), self.font_size_pil
 
 
 
 
 
 
 
 
 
 
 
 
 
75
 
76
- def create_placeholder_image(self, text_description, filename, size=(1280, 720)):
77
- """Create placeholder image with wrapped text"""
78
  try:
79
- img = Image.new('RGB', size, color=(30, 30, 60))
80
- draw = ImageDraw.Draw(img)
81
-
82
- if not text_description:
83
- text_description = "No description provided"
84
-
85
- # Create text with wrapping
86
- lines = self._wrap_text(text_description, size[0] - 80)
87
-
88
- # Calculate vertical position to center text
89
- _, line_height = self._get_text_dimensions("Tg", self.font)
90
- total_height = len(lines) * line_height * 1.3
91
- y_pos = max(40, (size[1] - total_height) / 2)
92
-
93
- # Draw each line
94
- for line in lines:
95
- line_width, _ = self._get_text_dimensions(line, self.font)
96
- x_pos = (size[0] - line_width) / 2
97
- draw.text((x_pos, y_pos), line, fill=(220, 220, 150), font=self.font)
98
- y_pos += line_height * 1.3
99
-
100
- # Save to output directory
101
- output_path = os.path.join(self.output_dir, filename)
102
- img.save(output_path)
103
- return output_path
104
-
105
  except Exception as e:
106
- logger.error(f"Error creating placeholder image: {str(e)}")
107
  return None
 
108
 
109
- def _wrap_text(self, text, max_width):
110
- """Wrap text to fit within specified width"""
111
- if not text:
112
- return ["(No text)"]
113
-
114
- words = text.split()
115
- lines = []
116
- current_line = []
117
-
118
- for word in words:
119
- test_line = ' '.join(current_line + [word])
120
- test_width, _ = self._get_text_dimensions(test_line, self.font)
121
-
122
- if test_width <= max_width:
123
- current_line.append(word)
124
- else:
125
- if current_line:
126
- lines.append(' '.join(current_line))
127
- current_line = [word]
128
-
129
- # Handle very long words
130
- if self._get_text_dimensions(word, self.font)[0] > max_width:
131
- while current_line and self._get_text_dimensions(''.join(current_line), self.font)[0] > max_width:
132
- current_line[0] = current_line[0][:-1]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
133
 
134
- if current_line:
135
- lines.append(' '.join(current_line))
136
-
137
- return lines or ["(Text rendering error)"]
138
 
139
  def create_video_from_images(self, image_paths, output_filename="final_video.mp4", fps=1, duration_per_image=3):
140
- """Create video from sequence of images"""
141
  if not image_paths:
142
- logger.error("No images provided for video creation")
143
  return None
144
-
145
- valid_paths = [p for p in image_paths if p and os.path.exists(p)]
146
- if not valid_paths:
147
- logger.error("No valid image paths found")
148
  return None
149
-
 
150
  try:
151
- clips = [ImageClip(img_path).set_duration(duration_per_image) for img_path in valid_paths]
152
- video = concatenate_videoclips(clips, method="compose")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
153
  output_path = os.path.join(self.output_dir, output_filename)
154
 
155
- video.write_videofile(
156
- output_path,
157
- fps=fps,
158
- codec='libx264',
159
- audio_codec='aac',
160
- temp_audiofile=os.path.join(self.output_dir, 'temp_audio.m4a'),
161
- remove_temp=True,
162
- threads=os.cpu_count() or 2,
163
- logger=None
164
  )
165
-
166
- # Clean up resources
167
- for clip in clips:
168
- clip.close()
169
- video.close()
170
-
171
  return output_path
172
-
173
  except Exception as e:
174
- logger.error(f"Video creation failed: {str(e)}")
175
  return None
 
1
  # core/visual_engine.py
 
 
2
  from PIL import Image, ImageDraw, ImageFont
3
  from moviepy.editor import ImageClip, concatenate_videoclips
4
  import os
5
+ # For future AI image generation - uncomment and add to requirements.txt when ready
6
+ # import torch
7
+ # from diffusers import StableDiffusionPipeline
8
+ # import requests # For API-based image generation
9
 
10
  class VisualEngine:
11
+ def __init__(self, output_dir="temp_generated_media"):
12
+ self.output_dir = output_dir
 
13
  os.makedirs(self.output_dir, exist_ok=True)
14
 
15
+ # --- Font Setup for Placeholders ---
16
+ self.font_filename = "arial.ttf" # Or your chosen font (e.g., DejaVuSans.ttf)
17
+ self.font_path_in_container = f"/usr/local/share/fonts/truetype/mycustomfonts/{self.font_filename}"
18
  self.font_size_pil = 24
 
19
 
20
+ try:
21
+ self.font = ImageFont.truetype(self.font_path_in_container, self.font_size_pil)
22
+ print(f"Successfully loaded font: {self.font_path_in_container} for placeholders.")
23
+ except IOError:
24
+ print(f"Warning: Could not load font from '{self.font_path_in_container}'. "
25
+ f"Placeholders will use default font.")
26
  self.font = ImageFont.load_default()
27
  self.font_size_pil = 11
28
 
29
+ # --- AI Image Generation Model/Client (Conceptual) ---
30
+ # self.image_generation_pipe = None # For diffusers
31
+ # self.image_api_client = None # For API clients
32
+ # self.USE_AI_IMAGE_GENERATION = False # Set to True when implemented
33
+
34
+ # Example: Initialize Stable Diffusion (uncomment and configure when ready)
35
+ # if self.USE_AI_IMAGE_GENERATION and torch.cuda.is_available():
36
+ # try:
37
+ # print("Attempting to load Stable Diffusion model...")
38
+ # self.image_generation_pipe = StableDiffusionPipeline.from_pretrained(
39
+ # "runwayml/stable-diffusion-v1-5",
40
+ # torch_dtype=torch.float16,
41
+ # # use_safetensors=True # If available for the model
42
+ # )
43
+ # self.image_generation_pipe.to("cuda")
44
+ # print("Stable Diffusion model loaded successfully on GPU.")
45
+ # except Exception as e:
46
+ # print(f"Error loading Stable Diffusion model: {e}. Will use placeholders.")
47
+ # self.USE_AI_IMAGE_GENERATION = False # Fallback
48
+ # elif self.USE_AI_IMAGE_GENERATION:
49
+ # print("CUDA not available. AI Image generation (Stable Diffusion) disabled. Using placeholders.")
50
+ # self.USE_AI_IMAGE_GENERATION = False
 
 
 
 
 
 
51
 
 
 
 
 
52
 
53
+ def _get_text_dimensions(self, text_content, font_obj):
54
+ if text_content == "" or text_content is None:
55
+ return 0, self.font_size_pil
56
  try:
57
  if hasattr(font_obj, 'getbbox'):
58
  bbox = font_obj.getbbox(text_content)
59
+ width = bbox[2] - bbox[0]
60
+ height = bbox[3] - bbox[1]
61
+ return width, height if height > 0 else self.font_size_pil
62
  elif hasattr(font_obj, 'getsize'):
63
+ width, height = font_obj.getsize(text_content)
64
+ return width, height if height > 0 else self.font_size_pil
65
+ else:
66
+ avg_char_width = self.font_size_pil * 0.6
67
+ height_estimate = self.font_size_pil * 1.2
68
+ return int(len(text_content) * avg_char_width), int(height_estimate if height_estimate > 0 else self.font_size_pil)
69
  except Exception as e:
70
+ print(f"Warning: Error getting text dimensions for '{text_content}': {e}. Using estimates.")
71
+ avg_char_width = self.font_size_pil * 0.6
72
+ height_estimate = self.font_size_pil * 1.2
73
+ return int(len(text_content) * avg_char_width), int(height_estimate if height_estimate > 0 else self.font_size_pil)
74
+
75
+
76
+ def _create_placeholder_image_content(self, text_description, filename, size=(1024, 576)): # Common 16:9 aspect
77
+ # This is the actual placeholder drawing logic, kept separate
78
+ img = Image.new('RGB', size, color=(30, 30, 60))
79
+ draw = ImageDraw.Draw(img)
80
+ padding = 30 # Reduced padding for smaller image
81
+ max_text_width = size[0] - (2 * padding)
82
+ lines = []
83
+ if not text_description: text_description = "(No description provided for placeholder)"
84
+ words = text_description.split()
85
+ current_line = ""
86
+
87
+ for word in words:
88
+ test_line_candidate = current_line + word + " "
89
+ line_width, _ = self._get_text_dimensions(test_line_candidate.strip(), self.font)
90
+ if line_width <= max_text_width and current_line != "": current_line = test_line_candidate
91
+ elif line_width <= max_text_width and current_line == "": current_line = test_line_candidate
92
+ elif current_line != "":
93
+ lines.append(current_line.strip())
94
+ current_line = word + " "
95
+ else:
96
+ temp_word = word
97
+ while self._get_text_dimensions(temp_word, self.font)[0] > max_text_width and len(temp_word) > 0: temp_word = temp_word[:-1]
98
+ lines.append(temp_word)
99
+ current_line = ""
100
+ if current_line.strip(): lines.append(current_line.strip())
101
+ if not lines: lines.append("(Text error in placeholder)")
102
 
103
+ _, single_line_height = self._get_text_dimensions("Tg", self.font)
104
+ if single_line_height == 0: single_line_height = self.font_size_pil
105
+ line_spacing_factor = 1.3
106
+ estimated_line_block_height = len(lines) * single_line_height * line_spacing_factor
107
+ y_text = (size[1] - estimated_line_block_height) / 2.0
108
+ if y_text < padding: y_text = float(padding)
109
+
110
+ for line_idx, line in enumerate(lines):
111
+ if line_idx >= 7 and len(lines) > 8: # Limit lines displayed on placeholder if too many
112
+ draw.text(xy=(float(padding), y_text), text="...", fill=(200, 200, 130), font=self.font)
113
+ break
114
+ line_width, _ = self._get_text_dimensions(line, self.font)
115
+ x_text = (size[0] - line_width) / 2.0
116
+ if x_text < padding: x_text = float(padding)
117
+ draw.text(xy=(x_text, y_text), text=line, fill=(220, 220, 150), font=self.font)
118
+ y_text += single_line_height * line_spacing_factor
119
 
120
+ filepath = os.path.join(self.output_dir, filename)
 
121
  try:
122
+ img.save(filepath)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
123
  except Exception as e:
124
+ print(f"Error saving placeholder image {filepath}: {e}")
125
  return None
126
+ return filepath
127
 
128
+
129
+ def generate_image_visual(self, image_prompt_text, scene_identifier_filename):
130
+ """
131
+ Generates an image visual. Tries AI generation if enabled,
132
+ otherwise falls back to a placeholder.
133
+ image_prompt_text: The detailed prompt from Gemini for image generation.
134
+ scene_identifier_filename: A unique filename for this image (e.g., "scene_1_visual.png").
135
+ """
136
+ filepath = os.path.join(self.output_dir, scene_identifier_filename)
137
+
138
+ # --- UNCOMMENT AND IMPLEMENT ONE OF THESE SECTIONS WHEN READY ---
139
+ # if self.USE_AI_IMAGE_GENERATION and self.image_generation_pipe: # Example for Diffusers
140
+ # try:
141
+ # print(f"Generating AI image (Diffusers) for: {image_prompt_text[:100]}...")
142
+ # # Add parameters like negative_prompt, guidance_scale, num_inference_steps
143
+ # ai_image = self.image_generation_pipe(image_prompt_text).images[0]
144
+ # ai_image = ai_image.resize((1024, 576)) # Resize to a standard display size
145
+ # ai_image.save(filepath)
146
+ # print(f"AI Image (Diffusers) saved: {filepath}")
147
+ # return filepath
148
+ # except Exception as e:
149
+ # print(f"Error generating AI image with Diffusers: {e}. Creating placeholder.")
150
+ # return self._create_placeholder_image_content(image_prompt_text, scene_identifier_filename)
151
+
152
+ # elif self.USE_AI_IMAGE_GENERATION and self.image_api_client: # Example for an API
153
+ # try:
154
+ # print(f"Generating AI image (API) for: {image_prompt_text[:100]}...")
155
+ # # --- Replace with your actual API call logic ---
156
+ # # response = self.image_api_client.generate(prompt=image_prompt_text, ...)
157
+ # # image_data = response.get_image_data()
158
+ # # with open(filepath, 'wb') as f:
159
+ # # f.write(image_data)
160
+ # # --- End of API call logic ---
161
+ # print(f"AI Image (API) saved: {filepath}")
162
+ # return filepath
163
+ # except Exception as e:
164
+ # print(f"Error generating AI image with API: {e}. Creating placeholder.")
165
+ # return self._create_placeholder_image_content(image_prompt_text, scene_identifier_filename)
166
 
167
+ # else: # Fallback to placeholder if AI generation is not enabled or failed initialization
168
+ print(f"AI image generation not enabled/ready. Creating placeholder for: {image_prompt_text[:70]}...")
169
+ return self._create_placeholder_image_content(image_prompt_text, scene_identifier_filename)
170
+
171
 
172
  def create_video_from_images(self, image_paths, output_filename="final_video.mp4", fps=1, duration_per_image=3):
 
173
  if not image_paths:
174
+ print("No images provided to create video.")
175
  return None
176
+ valid_image_paths = [p for p in image_paths if p and os.path.exists(p)]
177
+ if not valid_image_paths:
178
+ print("No valid image paths found to create video.")
 
179
  return None
180
+ print(f"Attempting to create video from {len(valid_image_paths)} images.")
181
+
182
  try:
183
+ clips = []
184
+ for m_path in valid_image_paths:
185
+ try:
186
+ # Ensure image is a reasonable size for video, MoviePy can struggle with huge images
187
+ img_for_clip = Image.open(m_path)
188
+ if img_for_clip.width > 1920 or img_for_clip.height > 1080:
189
+ img_for_clip.thumbnail((1920, 1080)) # Resize if too large
190
+ # Save back to a temp path or directly use the PIL image object if ImageClip supports it well
191
+ # For simplicity, let's assume ImageClip handles PIL Image objects
192
+
193
+ clip = ImageClip(m_path).set_duration(duration_per_image) # Using path directly for now
194
+ # clip = ImageClip(np.array(img_for_clip)).set_duration(duration_per_image) # If using PIL image
195
+ clips.append(clip)
196
+ except Exception as e_clip:
197
+ print(f"Error creating ImageClip for {m_path}: {e_clip}. Skipping.")
198
+
199
+ if not clips:
200
+ print("Could not create any ImageClips.")
201
+ return None
202
+
203
+ video_clip = concatenate_videoclips(clips, method="compose")
204
  output_path = os.path.join(self.output_dir, output_filename)
205
 
206
+ print(f"Writing video to: {output_path}")
207
+ video_clip.write_videofile(
208
+ output_path, fps=fps, codec='libx264', audio_codec='aac',
209
+ temp_audiofile=os.path.join(self.output_dir, f'temp-audio-{os.urandom(4).hex()}.m4a'),
210
+ remove_temp=True, threads=os.cpu_count() or 2, logger='bar'
 
 
 
 
211
  )
212
+ for clip_to_close in clips: clip_to_close.close()
213
+ if hasattr(video_clip, 'close'): video_clip.close()
214
+ print(f"Video successfully created: {output_path}")
 
 
 
215
  return output_path
 
216
  except Exception as e:
217
+ print(f"Error during video creation: {e}")
218
  return None