# core/visual_engine.py from PIL import Image, ImageDraw, ImageFont from moviepy.editor import ImageClip, concatenate_videoclips import os # For future AI image generation - uncomment and add to requirements.txt when ready # import torch # from diffusers import StableDiffusionPipeline # import requests # For API-based image generation class VisualEngine: def __init__(self, output_dir="temp_generated_media"): self.output_dir = output_dir os.makedirs(self.output_dir, exist_ok=True) # --- Font Setup for Placeholders --- self.font_filename = "arial.ttf" # Or your chosen font (e.g., DejaVuSans.ttf) self.font_path_in_container = f"/usr/local/share/fonts/truetype/mycustomfonts/{self.font_filename}" self.font_size_pil = 24 try: self.font = ImageFont.truetype(self.font_path_in_container, self.font_size_pil) print(f"Successfully loaded font: {self.font_path_in_container} for placeholders.") except IOError: print(f"Warning: Could not load font from '{self.font_path_in_container}'. " f"Placeholders will use default font.") self.font = ImageFont.load_default() self.font_size_pil = 11 # --- AI Image Generation Model/Client (Conceptual) --- # self.image_generation_pipe = None # For diffusers # self.image_api_client = None # For API clients # self.USE_AI_IMAGE_GENERATION = False # Set to True when implemented # Example: Initialize Stable Diffusion (uncomment and configure when ready) # if self.USE_AI_IMAGE_GENERATION and torch.cuda.is_available(): # try: # print("Attempting to load Stable Diffusion model...") # self.image_generation_pipe = StableDiffusionPipeline.from_pretrained( # "runwayml/stable-diffusion-v1-5", # torch_dtype=torch.float16, # # use_safetensors=True # If available for the model # ) # self.image_generation_pipe.to("cuda") # print("Stable Diffusion model loaded successfully on GPU.") # except Exception as e: # print(f"Error loading Stable Diffusion model: {e}. Will use placeholders.") # self.USE_AI_IMAGE_GENERATION = False # Fallback # elif self.USE_AI_IMAGE_GENERATION: # print("CUDA not available. AI Image generation (Stable Diffusion) disabled. Using placeholders.") # self.USE_AI_IMAGE_GENERATION = False def _get_text_dimensions(self, text_content, font_obj): if text_content == "" or text_content is None: return 0, self.font_size_pil try: if hasattr(font_obj, 'getbbox'): bbox = font_obj.getbbox(text_content) width = bbox[2] - bbox[0] height = bbox[3] - bbox[1] return width, height if height > 0 else self.font_size_pil elif hasattr(font_obj, 'getsize'): width, height = font_obj.getsize(text_content) return width, height if height > 0 else self.font_size_pil else: avg_char_width = self.font_size_pil * 0.6 height_estimate = self.font_size_pil * 1.2 return int(len(text_content) * avg_char_width), int(height_estimate if height_estimate > 0 else self.font_size_pil) except Exception as e: print(f"Warning: Error getting text dimensions for '{text_content}': {e}. Using estimates.") avg_char_width = self.font_size_pil * 0.6 height_estimate = self.font_size_pil * 1.2 return int(len(text_content) * avg_char_width), int(height_estimate if height_estimate > 0 else self.font_size_pil) def _create_placeholder_image_content(self, text_description, filename, size=(1024, 576)): # Common 16:9 aspect # This is the actual placeholder drawing logic, kept separate img = Image.new('RGB', size, color=(30, 30, 60)) draw = ImageDraw.Draw(img) padding = 30 # Reduced padding for smaller image max_text_width = size[0] - (2 * padding) lines = [] if not text_description: text_description = "(No description provided for placeholder)" words = text_description.split() current_line = "" for word in words: test_line_candidate = current_line + word + " " line_width, _ = self._get_text_dimensions(test_line_candidate.strip(), self.font) if line_width <= max_text_width and current_line != "": current_line = test_line_candidate elif line_width <= max_text_width and current_line == "": current_line = test_line_candidate elif current_line != "": lines.append(current_line.strip()) current_line = word + " " else: temp_word = word while self._get_text_dimensions(temp_word, self.font)[0] > max_text_width and len(temp_word) > 0: temp_word = temp_word[:-1] lines.append(temp_word) current_line = "" if current_line.strip(): lines.append(current_line.strip()) if not lines: lines.append("(Text error in placeholder)") _, single_line_height = self._get_text_dimensions("Tg", self.font) if single_line_height == 0: single_line_height = self.font_size_pil line_spacing_factor = 1.3 estimated_line_block_height = len(lines) * single_line_height * line_spacing_factor y_text = (size[1] - estimated_line_block_height) / 2.0 if y_text < padding: y_text = float(padding) for line_idx, line in enumerate(lines): if line_idx >= 7 and len(lines) > 8: # Limit lines displayed on placeholder if too many draw.text(xy=(float(padding), y_text), text="...", fill=(200, 200, 130), font=self.font) break line_width, _ = self._get_text_dimensions(line, self.font) x_text = (size[0] - line_width) / 2.0 if x_text < padding: x_text = float(padding) draw.text(xy=(x_text, y_text), text=line, fill=(220, 220, 150), font=self.font) y_text += single_line_height * line_spacing_factor filepath = os.path.join(self.output_dir, filename) try: img.save(filepath) except Exception as e: print(f"Error saving placeholder image {filepath}: {e}") return None return filepath def generate_image_visual(self, image_prompt_text, scene_identifier_filename): """ Generates an image visual. Tries AI generation if enabled, otherwise falls back to a placeholder. image_prompt_text: The detailed prompt from Gemini for image generation. scene_identifier_filename: A unique filename for this image (e.g., "scene_1_visual.png"). """ filepath = os.path.join(self.output_dir, scene_identifier_filename) # --- UNCOMMENT AND IMPLEMENT ONE OF THESE SECTIONS WHEN READY --- # if self.USE_AI_IMAGE_GENERATION and self.image_generation_pipe: # Example for Diffusers # try: # print(f"Generating AI image (Diffusers) for: {image_prompt_text[:100]}...") # # Add parameters like negative_prompt, guidance_scale, num_inference_steps # ai_image = self.image_generation_pipe(image_prompt_text).images[0] # ai_image = ai_image.resize((1024, 576)) # Resize to a standard display size # ai_image.save(filepath) # print(f"AI Image (Diffusers) saved: {filepath}") # return filepath # except Exception as e: # print(f"Error generating AI image with Diffusers: {e}. Creating placeholder.") # return self._create_placeholder_image_content(image_prompt_text, scene_identifier_filename) # elif self.USE_AI_IMAGE_GENERATION and self.image_api_client: # Example for an API # try: # print(f"Generating AI image (API) for: {image_prompt_text[:100]}...") # # --- Replace with your actual API call logic --- # # response = self.image_api_client.generate(prompt=image_prompt_text, ...) # # image_data = response.get_image_data() # # with open(filepath, 'wb') as f: # # f.write(image_data) # # --- End of API call logic --- # print(f"AI Image (API) saved: {filepath}") # return filepath # except Exception as e: # print(f"Error generating AI image with API: {e}. Creating placeholder.") # return self._create_placeholder_image_content(image_prompt_text, scene_identifier_filename) # else: # Fallback to placeholder if AI generation is not enabled or failed initialization print(f"AI image generation not enabled/ready. Creating placeholder for: {image_prompt_text[:70]}...") return self._create_placeholder_image_content(image_prompt_text, scene_identifier_filename) def create_video_from_images(self, image_paths, output_filename="final_video.mp4", fps=1, duration_per_image=3): if not image_paths: print("No images provided to create video.") return None valid_image_paths = [p for p in image_paths if p and os.path.exists(p)] if not valid_image_paths: print("No valid image paths found to create video.") return None print(f"Attempting to create video from {len(valid_image_paths)} images.") try: clips = [] for m_path in valid_image_paths: try: # Ensure image is a reasonable size for video, MoviePy can struggle with huge images img_for_clip = Image.open(m_path) if img_for_clip.width > 1920 or img_for_clip.height > 1080: img_for_clip.thumbnail((1920, 1080)) # Resize if too large # Save back to a temp path or directly use the PIL image object if ImageClip supports it well # For simplicity, let's assume ImageClip handles PIL Image objects clip = ImageClip(m_path).set_duration(duration_per_image) # Using path directly for now # clip = ImageClip(np.array(img_for_clip)).set_duration(duration_per_image) # If using PIL image clips.append(clip) except Exception as e_clip: print(f"Error creating ImageClip for {m_path}: {e_clip}. Skipping.") if not clips: print("Could not create any ImageClips.") return None video_clip = concatenate_videoclips(clips, method="compose") output_path = os.path.join(self.output_dir, output_filename) print(f"Writing video to: {output_path}") video_clip.write_videofile( output_path, fps=fps, codec='libx264', audio_codec='aac', temp_audiofile=os.path.join(self.output_dir, f'temp-audio-{os.urandom(4).hex()}.m4a'), remove_temp=True, threads=os.cpu_count() or 2, logger='bar' ) for clip_to_close in clips: clip_to_close.close() if hasattr(video_clip, 'close'): video_clip.close() print(f"Video successfully created: {output_path}") return output_path except Exception as e: print(f"Error during video creation: {e}") return None