CingenAI / core /visual_engine.py
mgbam's picture
Update core/visual_engine.py
41b47a8 verified
raw
history blame
11.8 kB
# core/visual_engine.py
from PIL import Image, ImageDraw, ImageFont
from moviepy.editor import ImageClip, concatenate_videoclips
import os
# For future AI image generation - uncomment and add to requirements.txt when ready
# import torch
# from diffusers import StableDiffusionPipeline
# import requests # For API-based image generation
class VisualEngine:
def __init__(self, output_dir="temp_generated_media"):
self.output_dir = output_dir
os.makedirs(self.output_dir, exist_ok=True)
# --- Font Setup for Placeholders ---
self.font_filename = "arial.ttf" # Or your chosen font (e.g., DejaVuSans.ttf)
self.font_path_in_container = f"/usr/local/share/fonts/truetype/mycustomfonts/{self.font_filename}"
self.font_size_pil = 24
try:
self.font = ImageFont.truetype(self.font_path_in_container, self.font_size_pil)
print(f"Successfully loaded font: {self.font_path_in_container} for placeholders.")
except IOError:
print(f"Warning: Could not load font from '{self.font_path_in_container}'. "
f"Placeholders will use default font.")
self.font = ImageFont.load_default()
self.font_size_pil = 11
# --- AI Image Generation Model/Client (Conceptual) ---
# self.image_generation_pipe = None # For diffusers
# self.image_api_client = None # For API clients
# self.USE_AI_IMAGE_GENERATION = False # Set to True when implemented
# Example: Initialize Stable Diffusion (uncomment and configure when ready)
# if self.USE_AI_IMAGE_GENERATION and torch.cuda.is_available():
# try:
# print("Attempting to load Stable Diffusion model...")
# self.image_generation_pipe = StableDiffusionPipeline.from_pretrained(
# "runwayml/stable-diffusion-v1-5",
# torch_dtype=torch.float16,
# # use_safetensors=True # If available for the model
# )
# self.image_generation_pipe.to("cuda")
# print("Stable Diffusion model loaded successfully on GPU.")
# except Exception as e:
# print(f"Error loading Stable Diffusion model: {e}. Will use placeholders.")
# self.USE_AI_IMAGE_GENERATION = False # Fallback
# elif self.USE_AI_IMAGE_GENERATION:
# print("CUDA not available. AI Image generation (Stable Diffusion) disabled. Using placeholders.")
# self.USE_AI_IMAGE_GENERATION = False
def _get_text_dimensions(self, text_content, font_obj):
if text_content == "" or text_content is None:
return 0, self.font_size_pil
try:
if hasattr(font_obj, 'getbbox'):
bbox = font_obj.getbbox(text_content)
width = bbox[2] - bbox[0]
height = bbox[3] - bbox[1]
return width, height if height > 0 else self.font_size_pil
elif hasattr(font_obj, 'getsize'):
width, height = font_obj.getsize(text_content)
return width, height if height > 0 else self.font_size_pil
else:
avg_char_width = self.font_size_pil * 0.6
height_estimate = self.font_size_pil * 1.2
return int(len(text_content) * avg_char_width), int(height_estimate if height_estimate > 0 else self.font_size_pil)
except Exception as e:
print(f"Warning: Error getting text dimensions for '{text_content}': {e}. Using estimates.")
avg_char_width = self.font_size_pil * 0.6
height_estimate = self.font_size_pil * 1.2
return int(len(text_content) * avg_char_width), int(height_estimate if height_estimate > 0 else self.font_size_pil)
def _create_placeholder_image_content(self, text_description, filename, size=(1024, 576)): # Common 16:9 aspect
# This is the actual placeholder drawing logic, kept separate
img = Image.new('RGB', size, color=(30, 30, 60))
draw = ImageDraw.Draw(img)
padding = 30 # Reduced padding for smaller image
max_text_width = size[0] - (2 * padding)
lines = []
if not text_description: text_description = "(No description provided for placeholder)"
words = text_description.split()
current_line = ""
for word in words:
test_line_candidate = current_line + word + " "
line_width, _ = self._get_text_dimensions(test_line_candidate.strip(), self.font)
if line_width <= max_text_width and current_line != "": current_line = test_line_candidate
elif line_width <= max_text_width and current_line == "": current_line = test_line_candidate
elif current_line != "":
lines.append(current_line.strip())
current_line = word + " "
else:
temp_word = word
while self._get_text_dimensions(temp_word, self.font)[0] > max_text_width and len(temp_word) > 0: temp_word = temp_word[:-1]
lines.append(temp_word)
current_line = ""
if current_line.strip(): lines.append(current_line.strip())
if not lines: lines.append("(Text error in placeholder)")
_, single_line_height = self._get_text_dimensions("Tg", self.font)
if single_line_height == 0: single_line_height = self.font_size_pil
line_spacing_factor = 1.3
estimated_line_block_height = len(lines) * single_line_height * line_spacing_factor
y_text = (size[1] - estimated_line_block_height) / 2.0
if y_text < padding: y_text = float(padding)
for line_idx, line in enumerate(lines):
if line_idx >= 7 and len(lines) > 8: # Limit lines displayed on placeholder if too many
draw.text(xy=(float(padding), y_text), text="...", fill=(200, 200, 130), font=self.font)
break
line_width, _ = self._get_text_dimensions(line, self.font)
x_text = (size[0] - line_width) / 2.0
if x_text < padding: x_text = float(padding)
draw.text(xy=(x_text, y_text), text=line, fill=(220, 220, 150), font=self.font)
y_text += single_line_height * line_spacing_factor
filepath = os.path.join(self.output_dir, filename)
try:
img.save(filepath)
except Exception as e:
print(f"Error saving placeholder image {filepath}: {e}")
return None
return filepath
def generate_image_visual(self, image_prompt_text, scene_identifier_filename):
"""
Generates an image visual. Tries AI generation if enabled,
otherwise falls back to a placeholder.
image_prompt_text: The detailed prompt from Gemini for image generation.
scene_identifier_filename: A unique filename for this image (e.g., "scene_1_visual.png").
"""
filepath = os.path.join(self.output_dir, scene_identifier_filename)
# --- UNCOMMENT AND IMPLEMENT ONE OF THESE SECTIONS WHEN READY ---
# if self.USE_AI_IMAGE_GENERATION and self.image_generation_pipe: # Example for Diffusers
# try:
# print(f"Generating AI image (Diffusers) for: {image_prompt_text[:100]}...")
# # Add parameters like negative_prompt, guidance_scale, num_inference_steps
# ai_image = self.image_generation_pipe(image_prompt_text).images[0]
# ai_image = ai_image.resize((1024, 576)) # Resize to a standard display size
# ai_image.save(filepath)
# print(f"AI Image (Diffusers) saved: {filepath}")
# return filepath
# except Exception as e:
# print(f"Error generating AI image with Diffusers: {e}. Creating placeholder.")
# return self._create_placeholder_image_content(image_prompt_text, scene_identifier_filename)
# elif self.USE_AI_IMAGE_GENERATION and self.image_api_client: # Example for an API
# try:
# print(f"Generating AI image (API) for: {image_prompt_text[:100]}...")
# # --- Replace with your actual API call logic ---
# # response = self.image_api_client.generate(prompt=image_prompt_text, ...)
# # image_data = response.get_image_data()
# # with open(filepath, 'wb') as f:
# # f.write(image_data)
# # --- End of API call logic ---
# print(f"AI Image (API) saved: {filepath}")
# return filepath
# except Exception as e:
# print(f"Error generating AI image with API: {e}. Creating placeholder.")
# return self._create_placeholder_image_content(image_prompt_text, scene_identifier_filename)
# else: # Fallback to placeholder if AI generation is not enabled or failed initialization
print(f"AI image generation not enabled/ready. Creating placeholder for: {image_prompt_text[:70]}...")
return self._create_placeholder_image_content(image_prompt_text, scene_identifier_filename)
def create_video_from_images(self, image_paths, output_filename="final_video.mp4", fps=1, duration_per_image=3):
if not image_paths:
print("No images provided to create video.")
return None
valid_image_paths = [p for p in image_paths if p and os.path.exists(p)]
if not valid_image_paths:
print("No valid image paths found to create video.")
return None
print(f"Attempting to create video from {len(valid_image_paths)} images.")
try:
clips = []
for m_path in valid_image_paths:
try:
# Ensure image is a reasonable size for video, MoviePy can struggle with huge images
img_for_clip = Image.open(m_path)
if img_for_clip.width > 1920 or img_for_clip.height > 1080:
img_for_clip.thumbnail((1920, 1080)) # Resize if too large
# Save back to a temp path or directly use the PIL image object if ImageClip supports it well
# For simplicity, let's assume ImageClip handles PIL Image objects
clip = ImageClip(m_path).set_duration(duration_per_image) # Using path directly for now
# clip = ImageClip(np.array(img_for_clip)).set_duration(duration_per_image) # If using PIL image
clips.append(clip)
except Exception as e_clip:
print(f"Error creating ImageClip for {m_path}: {e_clip}. Skipping.")
if not clips:
print("Could not create any ImageClips.")
return None
video_clip = concatenate_videoclips(clips, method="compose")
output_path = os.path.join(self.output_dir, output_filename)
print(f"Writing video to: {output_path}")
video_clip.write_videofile(
output_path, fps=fps, codec='libx264', audio_codec='aac',
temp_audiofile=os.path.join(self.output_dir, f'temp-audio-{os.urandom(4).hex()}.m4a'),
remove_temp=True, threads=os.cpu_count() or 2, logger='bar'
)
for clip_to_close in clips: clip_to_close.close()
if hasattr(video_clip, 'close'): video_clip.close()
print(f"Video successfully created: {output_path}")
return output_path
except Exception as e:
print(f"Error during video creation: {e}")
return None