mgbam commited on
Commit
8583908
·
verified ·
1 Parent(s): 238ec73

Update core/visual_engine.py

Browse files
Files changed (1) hide show
  1. core/visual_engine.py +125 -98
core/visual_engine.py CHANGED
@@ -1,10 +1,13 @@
1
  # core/visual_engine.py
2
  from PIL import Image, ImageDraw, ImageFont
3
- from moviepy.editor import ImageClip, concatenate_videoclips
 
 
 
4
  import os
5
- import openai # Import OpenAI library
6
- import requests # To download images from URLs
7
- import io # To handle image data in memory
8
 
9
  class VisualEngine:
10
  def __init__(self, output_dir="temp_generated_media"):
@@ -13,8 +16,13 @@ class VisualEngine:
13
 
14
  self.font_filename = "arial.ttf"
15
  self.font_path_in_container = f"/usr/local/share/fonts/truetype/mycustomfonts/{self.font_filename}"
16
- self.font_size_pil = 24
17
-
 
 
 
 
 
18
  try:
19
  self.font = ImageFont.truetype(self.font_path_in_container, self.font_size_pil)
20
  print(f"Successfully loaded font: {self.font_path_in_container} for placeholders.")
@@ -23,39 +31,27 @@ class VisualEngine:
23
  self.font = ImageFont.load_default()
24
  self.font_size_pil = 11
25
 
26
- # --- OpenAI API Client Setup ---
27
  self.openai_api_key = None
28
- self.USE_AI_IMAGE_GENERATION = False # Default to False
29
-
30
- try:
31
- # Try to get API key from Hugging Face secrets (via Streamlit's secrets)
32
- # This assumes app.py has loaded st.secrets["OPENAI_API_KEY"] into st.session_state
33
- # A better way for a library class is to pass the key in or have it set globally
34
- # For now, let's assume it will be set via a method or directly if running outside Streamlit context
35
- # In app.py, you would do: st.session_state.visual_engine.set_openai_api_key(st.secrets["OPENAI_API_KEY"])
36
- pass # Key will be set by set_openai_api_key method
37
- except Exception as e:
38
- print(f"OpenAI API key not immediately available for VisualEngine: {e}")
39
-
40
- # You can choose DALL-E 2 or DALL-E 3. DALL-E 3 is generally better.
41
- # For DALL-E 3, the 'model' parameter is "dall-e-3"
42
- # For DALL-E 2, the 'model' parameter is "dall-e-2" (or implicitly if not specified for older image create)
43
  self.dalle_model = "dall-e-3"
44
- self.image_size = "1024x1024" # DALL-E 3 supports 1024x1024, 1792x1024, or 1024x1792
 
 
 
45
 
46
  def set_openai_api_key(self, api_key):
 
47
  if api_key:
48
  self.openai_api_key = api_key
49
- openai.api_key = self.openai_api_key # Set it for the openai library
50
  self.USE_AI_IMAGE_GENERATION = True
51
  print("OpenAI API key set. AI Image Generation Enabled with DALL-E.")
52
  else:
53
  self.USE_AI_IMAGE_GENERATION = False
54
  print("OpenAI API key not provided. AI Image Generation Disabled. Using placeholders.")
55
 
56
-
57
  def _get_text_dimensions(self, text_content, font_obj):
58
- # ... (this method remains the same as your last working version) ...
59
  if text_content == "" or text_content is None:
60
  return 0, self.font_size_pil
61
  try:
@@ -77,9 +73,8 @@ class VisualEngine:
77
  height_estimate = self.font_size_pil * 1.2
78
  return int(len(text_content) * avg_char_width), int(height_estimate if height_estimate > 0 else self.font_size_pil)
79
 
80
-
81
  def _create_placeholder_image_content(self, text_description, filename, size=(1024, 576)):
82
- # ... (this method remains the same as your last working version) ...
83
  img = Image.new('RGB', size, color=(30, 30, 60))
84
  draw = ImageDraw.Draw(img)
85
  padding = 30
@@ -126,103 +121,135 @@ class VisualEngine:
126
  return None
127
  return filepath
128
 
129
-
130
  def generate_image_visual(self, image_prompt_text, scene_identifier_filename):
 
131
  filepath = os.path.join(self.output_dir, scene_identifier_filename)
132
-
133
  if self.USE_AI_IMAGE_GENERATION and self.openai_api_key:
134
  try:
135
  print(f"Generating DALL-E ({self.dalle_model}) image for: {image_prompt_text[:100]}...")
136
-
137
- # Note: Prompts for DALL-E 3 are often best if they are quite descriptive.
138
- # DALL-E 3 also automatically revises prompts to be more detailed if they are too short.
139
- # You might want to consider passing the "revised_prompt" back to the UI if you display it.
140
-
141
- # Using the newer client syntax for openai >= 1.0.0
142
- client = openai.OpenAI(api_key=self.openai_api_key) # Initialize client with key
143
-
144
  response = client.images.generate(
145
- model=self.dalle_model,
146
- prompt=image_prompt_text,
147
- n=1, # Number of images to generate
148
- size=self.image_size, # e.g., "1024x1024"
149
- quality="standard", # or "hd" for DALL-E 3 (hd costs more)
150
- response_format="url" # Get a URL to download the image
151
- # style="vivid" # or "natural" for DALL-E 3
152
  )
153
-
154
  image_url = response.data[0].url
155
- revised_prompt_dalle3 = response.data[0].revised_prompt # DALL-E 3 provides this
156
- if revised_prompt_dalle3:
157
- print(f"DALL-E 3 revised prompt: {revised_prompt_dalle3[:150]}...")
158
-
159
- # Download the image from the URL
160
- image_response = requests.get(image_url, timeout=30) # Added timeout
161
- image_response.raise_for_status() # Raise an exception for bad status codes
162
-
163
- # Save the image
164
  img_data = Image.open(io.BytesIO(image_response.content))
165
 
166
- # DALL-E images are usually PNG. Let's ensure we save as PNG.
167
- # The filename already has .png from app.py, so this should be fine.
168
- img_data.save(filepath)
169
 
 
170
  print(f"AI Image (DALL-E) saved: {filepath}")
171
  return filepath
172
- except openai.APIError as e: # Catch OpenAI specific errors
173
- print(f"OpenAI API Error generating image: {e}")
174
- print(f"Status Code: {e.status_code}, Error Type: {e.type}")
175
- print(f"Message: {e.message}")
176
  except requests.exceptions.RequestException as e:
177
- print(f"Error downloading image from DALL-E URL: {e}")
178
  except Exception as e:
179
- print(f"Generic error during DALL-E image generation or saving: {e}")
180
-
181
- # Fallback to placeholder if any AI generation error occurs
182
  print("Falling back to placeholder image due to DALL-E error.")
183
  return self._create_placeholder_image_content(
184
- f"[DALL-E Generation Failed] Original Prompt: {image_prompt_text}",
185
- scene_identifier_filename
 
 
 
 
186
  )
187
-
188
- else: # Fallback to placeholder if AI generation is not enabled or API key missing
189
- # print(f"AI image generation not enabled/ready. Creating placeholder for: {image_prompt_text[:70]}...")
190
- return self._create_placeholder_image_content(image_prompt_text, scene_identifier_filename)
191
 
192
 
193
- def create_video_from_images(self, image_paths, output_filename="final_video.mp4", fps=1, duration_per_image=3):
194
- # ... (this method remains the same as your last working version) ...
195
- if not image_paths:
196
- print("No images provided to create video.")
 
 
 
 
197
  return None
198
- valid_image_paths = [p for p in image_paths if p and os.path.exists(p)]
199
- if not valid_image_paths:
200
- print("No valid image paths found to create video.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
201
  return None
202
- print(f"Attempting to create video from {len(valid_image_paths)} images.")
 
 
 
 
 
 
203
  try:
204
- clips = []
205
- for m_path in valid_image_paths:
206
- try:
207
- clip = ImageClip(m_path).set_duration(duration_per_image)
208
- clips.append(clip)
209
- except Exception as e_clip:
210
- print(f"Error creating ImageClip for {m_path}: {e_clip}. Skipping.")
211
- if not clips:
212
- print("Could not create any ImageClips.")
213
- return None
214
- video_clip = concatenate_videoclips(clips, method="compose")
215
- output_path = os.path.join(self.output_dir, output_filename)
216
- print(f"Writing video to: {output_path}")
217
- video_clip.write_videofile(
218
  output_path, fps=fps, codec='libx264', audio_codec='aac',
219
  temp_audiofile=os.path.join(self.output_dir, f'temp-audio-{os.urandom(4).hex()}.m4a'),
220
  remove_temp=True, threads=os.cpu_count() or 2, logger='bar'
221
  )
222
- for clip_to_close in clips: clip_to_close.close()
223
- if hasattr(video_clip, 'close'): video_clip.close()
224
  print(f"Video successfully created: {output_path}")
225
  return output_path
226
  except Exception as e:
227
- print(f"Error during video creation: {e}")
228
- return None
 
 
 
 
1
  # core/visual_engine.py
2
  from PIL import Image, ImageDraw, ImageFont
3
+ from moviepy.editor import (ImageClip, concatenate_videoclips, TextClip,
4
+ CompositeVideoClip, vfx) # Added vfx for effects
5
+ import moviepy.video.fx.all as vfx # More explicit import for resize
6
+ import numpy as np # For converting PIL images to numpy arrays for moviepy
7
  import os
8
+ import openai
9
+ import requests
10
+ import io
11
 
12
  class VisualEngine:
13
  def __init__(self, output_dir="temp_generated_media"):
 
16
 
17
  self.font_filename = "arial.ttf"
18
  self.font_path_in_container = f"/usr/local/share/fonts/truetype/mycustomfonts/{self.font_filename}"
19
+ self.font_size_pil = 24 # For placeholder images
20
+ self.video_overlay_font_size = 36 # For text overlays on video
21
+ self.video_overlay_font_color = 'white'
22
+ # For video overlays, try to use a system font that moviepy/ImageMagick can find
23
+ # Or provide a path to a .ttf file for TextClip's font parameter
24
+ self.video_overlay_font = 'Arial' # Generic name, ImageMagick might find it. Or use self.font_path_in_container
25
+
26
  try:
27
  self.font = ImageFont.truetype(self.font_path_in_container, self.font_size_pil)
28
  print(f"Successfully loaded font: {self.font_path_in_container} for placeholders.")
 
31
  self.font = ImageFont.load_default()
32
  self.font_size_pil = 11
33
 
 
34
  self.openai_api_key = None
35
+ self.USE_AI_IMAGE_GENERATION = False
 
 
 
 
 
 
 
 
 
 
 
 
 
 
36
  self.dalle_model = "dall-e-3"
37
+ self.image_size = "1024x1024"
38
+ # For DALL-E 3, you might want a slightly larger video frame to accommodate 1024x1024 images
39
+ self.video_frame_size = (1024, 576) # 16:9, DALL-E images will be letterboxed or cropped if not 16:9.
40
+ # Or (1024,1024) if you want square video frames.
41
 
42
  def set_openai_api_key(self, api_key):
43
+ # ... (remains the same) ...
44
  if api_key:
45
  self.openai_api_key = api_key
46
+ # openai.api_key = self.openai_api_key # Older versions. New client takes key per call.
47
  self.USE_AI_IMAGE_GENERATION = True
48
  print("OpenAI API key set. AI Image Generation Enabled with DALL-E.")
49
  else:
50
  self.USE_AI_IMAGE_GENERATION = False
51
  print("OpenAI API key not provided. AI Image Generation Disabled. Using placeholders.")
52
 
 
53
  def _get_text_dimensions(self, text_content, font_obj):
54
+ # ... (remains the same) ...
55
  if text_content == "" or text_content is None:
56
  return 0, self.font_size_pil
57
  try:
 
73
  height_estimate = self.font_size_pil * 1.2
74
  return int(len(text_content) * avg_char_width), int(height_estimate if height_estimate > 0 else self.font_size_pil)
75
 
 
76
  def _create_placeholder_image_content(self, text_description, filename, size=(1024, 576)):
77
+ # ... (remains the same) ...
78
  img = Image.new('RGB', size, color=(30, 30, 60))
79
  draw = ImageDraw.Draw(img)
80
  padding = 30
 
121
  return None
122
  return filepath
123
 
 
124
  def generate_image_visual(self, image_prompt_text, scene_identifier_filename):
125
+ # ... (DALL-E logic remains the same, including fallback to _create_placeholder_image_content) ...
126
  filepath = os.path.join(self.output_dir, scene_identifier_filename)
 
127
  if self.USE_AI_IMAGE_GENERATION and self.openai_api_key:
128
  try:
129
  print(f"Generating DALL-E ({self.dalle_model}) image for: {image_prompt_text[:100]}...")
130
+ client = openai.OpenAI(api_key=self.openai_api_key)
 
 
 
 
 
 
 
131
  response = client.images.generate(
132
+ model=self.dalle_model, prompt=image_prompt_text, n=1,
133
+ size=self.image_size, quality="standard", response_format="url"
 
 
 
 
 
134
  )
 
135
  image_url = response.data[0].url
136
+ revised_prompt_dalle3 = response.data[0].revised_prompt
137
+ if revised_prompt_dalle3: print(f"DALL-E 3 revised prompt: {revised_prompt_dalle3[:150]}...")
138
+ image_response = requests.get(image_url, timeout=60) # Increased timeout for image download
139
+ image_response.raise_for_status()
 
 
 
 
 
140
  img_data = Image.open(io.BytesIO(image_response.content))
141
 
142
+ # Ensure image is RGB before saving as PNG (some APIs might return RGBA)
143
+ if img_data.mode == 'RGBA':
144
+ img_data = img_data.convert('RGB')
145
 
146
+ img_data.save(filepath)
147
  print(f"AI Image (DALL-E) saved: {filepath}")
148
  return filepath
149
+ except openai.APIError as e:
150
+ print(f"OpenAI API Error: {e}")
 
 
151
  except requests.exceptions.RequestException as e:
152
+ print(f"Requests Error downloading DALL-E image: {e}")
153
  except Exception as e:
154
+ print(f"Generic error during DALL-E image generation: {e}")
 
 
155
  print("Falling back to placeholder image due to DALL-E error.")
156
  return self._create_placeholder_image_content(
157
+ f"[DALL-E Failed] Prompt: {image_prompt_text[:150]}...",
158
+ scene_identifier_filename, size=self.video_frame_size # Use video frame size for placeholder
159
+ )
160
+ else:
161
+ return self._create_placeholder_image_content(
162
+ image_prompt_text, scene_identifier_filename, size=self.video_frame_size
163
  )
 
 
 
 
164
 
165
 
166
+ def create_video_from_images(self, image_data_list, output_filename="final_video.mp4", fps=24, duration_per_image=3):
167
+ """
168
+ Creates a video from a list of image file paths and associated text.
169
+ image_data_list: List of dictionaries, each like:
170
+ {'path': 'path/to/image.png', 'scene_num': 1, 'key_action': 'Some action'}
171
+ """
172
+ if not image_data_list:
173
+ print("No image data provided to create video.")
174
  return None
175
+
176
+ print(f"Attempting to create video from {len(image_data_list)} images.")
177
+ processed_clips = []
178
+
179
+ for i, data in enumerate(image_data_list):
180
+ img_path = data.get('path')
181
+ scene_num = data.get('scene_num', i + 1)
182
+ key_action = data.get('key_action', '')
183
+
184
+ if not (img_path and os.path.exists(img_path)):
185
+ print(f"Image path invalid or not found: {img_path}. Skipping for video.")
186
+ continue
187
+ try:
188
+ # Load image and resize to fit video_frame_size, maintaining aspect ratio (letterbox/pillarbox)
189
+ pil_image = Image.open(img_path)
190
+ pil_image.thumbnail(self.video_frame_size, Image.Resampling.LANCZOS) # Resize in place
191
+
192
+ # Create a background matching video_frame_size
193
+ background = Image.new('RGB', self.video_frame_size, (0,0,0)) # Black background
194
+ # Paste the thumbnail onto the center of the background
195
+ paste_x = (self.video_frame_size[0] - pil_image.width) // 2
196
+ paste_y = (self.video_frame_size[1] - pil_image.height) // 2
197
+ background.paste(pil_image, (paste_x, paste_y))
198
+
199
+ # Convert PIL image to numpy array for MoviePy
200
+ frame_np = np.array(background)
201
+ img_clip = ImageClip(frame_np).set_duration(duration_per_image)
202
+
203
+ # Simple Ken Burns effect (zoom in slightly)
204
+ # End scale (e.g., 1.1 = 10% zoom in). Adjust for desired effect.
205
+ end_scale = 1.05
206
+ img_clip = img_clip.fx(vfx.resize, lambda t: 1 + (end_scale-1) * (t / duration_per_image) )
207
+ # To keep it centered while zooming:
208
+ img_clip = img_clip.set_position('center')
209
+
210
+
211
+ # Add Text Overlay for Scene Number and Key Action
212
+ overlay_text = f"Scene {scene_num}\n{key_action}"
213
+ txt_clip = TextClip(overlay_text, fontsize=self.video_overlay_font_size,
214
+ color=self.video_overlay_font_color,
215
+ font=self.video_overlay_font, # Ensure this font is findable by ImageMagick
216
+ bg_color='rgba(0,0,0,0.5)', # Semi-transparent black background
217
+ size=(img_clip.w * 0.9, None), # Width 90% of image, height auto
218
+ method='caption', # Auto-wrap text
219
+ align='West', # Left align
220
+ kerning=-1
221
+ ).set_duration(duration_per_image - 0.5).set_start(0.25) # Show for most of duration
222
+
223
+ txt_clip = txt_clip.set_position(('center', 0.85), relative=True) # Position at 85% from top, centered
224
+
225
+ # Composite the image and text
226
+ video_with_text_overlay = CompositeVideoClip([img_clip, txt_clip], size=self.video_frame_size)
227
+ processed_clips.append(video_with_text_overlay)
228
+
229
+ except Exception as e_clip:
230
+ print(f"Error processing image/creating clip for {img_path}: {e_clip}. Skipping.")
231
+
232
+ if not processed_clips:
233
+ print("No clips could be processed for the video.")
234
  return None
235
+
236
+ # Concatenate with crossfade transitions
237
+ final_video_clip = concatenate_videoclips(processed_clips, padding=-0.5, method="compose").fx(vfx.fadein, 0.5).fx(vfx.fadeout, 0.5)
238
+ # padding = -0.5 means 0.5s crossfade. Requires method="compose"
239
+
240
+ output_path = os.path.join(self.output_dir, output_filename)
241
+ print(f"Writing final video to: {output_path}")
242
  try:
243
+ final_video_clip.write_videofile(
 
 
 
 
 
 
 
 
 
 
 
 
 
244
  output_path, fps=fps, codec='libx264', audio_codec='aac',
245
  temp_audiofile=os.path.join(self.output_dir, f'temp-audio-{os.urandom(4).hex()}.m4a'),
246
  remove_temp=True, threads=os.cpu_count() or 2, logger='bar'
247
  )
 
 
248
  print(f"Video successfully created: {output_path}")
249
  return output_path
250
  except Exception as e:
251
+ print(f"Error writing final video file: {e}")
252
+ return None
253
+ finally: # Ensure clips are closed
254
+ for clip in processed_clips: clip.close()
255
+ if hasattr(final_video_clip, 'close'): final_video_clip.close()