File size: 11,803 Bytes
287c9ca 41b47a8 5470dfc 287c9ca 41b47a8 287c9ca 50c620f 41b47a8 50c620f 5470dfc 41b47a8 b97795f 03bb9f6 5470dfc 41b47a8 b97795f 50c620f 41b47a8 50c620f 5470dfc 50c620f 41b47a8 5470dfc 41b47a8 50c620f 41b47a8 5470dfc 41b47a8 287c9ca 41b47a8 5470dfc 41b47a8 5470dfc 41b47a8 5470dfc 41b47a8 b97795f 41b47a8 287c9ca 41b47a8 b97795f 287c9ca 41b47a8 287c9ca 41b47a8 287c9ca 41b47a8 b97795f 41b47a8 b97795f 287c9ca 41b47a8 b97795f 41b47a8 b97795f 41b47a8 b97795f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# core/visual_engine.py
from PIL import Image, ImageDraw, ImageFont
from moviepy.editor import ImageClip, concatenate_videoclips
import os
# For future AI image generation - uncomment and add to requirements.txt when ready
# import torch
# from diffusers import StableDiffusionPipeline
# import requests # For API-based image generation
class VisualEngine:
def __init__(self, output_dir="temp_generated_media"):
self.output_dir = output_dir
os.makedirs(self.output_dir, exist_ok=True)
# --- Font Setup for Placeholders ---
self.font_filename = "arial.ttf" # Or your chosen font (e.g., DejaVuSans.ttf)
self.font_path_in_container = f"/usr/local/share/fonts/truetype/mycustomfonts/{self.font_filename}"
self.font_size_pil = 24
try:
self.font = ImageFont.truetype(self.font_path_in_container, self.font_size_pil)
print(f"Successfully loaded font: {self.font_path_in_container} for placeholders.")
except IOError:
print(f"Warning: Could not load font from '{self.font_path_in_container}'. "
f"Placeholders will use default font.")
self.font = ImageFont.load_default()
self.font_size_pil = 11
# --- AI Image Generation Model/Client (Conceptual) ---
# self.image_generation_pipe = None # For diffusers
# self.image_api_client = None # For API clients
# self.USE_AI_IMAGE_GENERATION = False # Set to True when implemented
# Example: Initialize Stable Diffusion (uncomment and configure when ready)
# if self.USE_AI_IMAGE_GENERATION and torch.cuda.is_available():
# try:
# print("Attempting to load Stable Diffusion model...")
# self.image_generation_pipe = StableDiffusionPipeline.from_pretrained(
# "runwayml/stable-diffusion-v1-5",
# torch_dtype=torch.float16,
# # use_safetensors=True # If available for the model
# )
# self.image_generation_pipe.to("cuda")
# print("Stable Diffusion model loaded successfully on GPU.")
# except Exception as e:
# print(f"Error loading Stable Diffusion model: {e}. Will use placeholders.")
# self.USE_AI_IMAGE_GENERATION = False # Fallback
# elif self.USE_AI_IMAGE_GENERATION:
# print("CUDA not available. AI Image generation (Stable Diffusion) disabled. Using placeholders.")
# self.USE_AI_IMAGE_GENERATION = False
def _get_text_dimensions(self, text_content, font_obj):
if text_content == "" or text_content is None:
return 0, self.font_size_pil
try:
if hasattr(font_obj, 'getbbox'):
bbox = font_obj.getbbox(text_content)
width = bbox[2] - bbox[0]
height = bbox[3] - bbox[1]
return width, height if height > 0 else self.font_size_pil
elif hasattr(font_obj, 'getsize'):
width, height = font_obj.getsize(text_content)
return width, height if height > 0 else self.font_size_pil
else:
avg_char_width = self.font_size_pil * 0.6
height_estimate = self.font_size_pil * 1.2
return int(len(text_content) * avg_char_width), int(height_estimate if height_estimate > 0 else self.font_size_pil)
except Exception as e:
print(f"Warning: Error getting text dimensions for '{text_content}': {e}. Using estimates.")
avg_char_width = self.font_size_pil * 0.6
height_estimate = self.font_size_pil * 1.2
return int(len(text_content) * avg_char_width), int(height_estimate if height_estimate > 0 else self.font_size_pil)
def _create_placeholder_image_content(self, text_description, filename, size=(1024, 576)): # Common 16:9 aspect
# This is the actual placeholder drawing logic, kept separate
img = Image.new('RGB', size, color=(30, 30, 60))
draw = ImageDraw.Draw(img)
padding = 30 # Reduced padding for smaller image
max_text_width = size[0] - (2 * padding)
lines = []
if not text_description: text_description = "(No description provided for placeholder)"
words = text_description.split()
current_line = ""
for word in words:
test_line_candidate = current_line + word + " "
line_width, _ = self._get_text_dimensions(test_line_candidate.strip(), self.font)
if line_width <= max_text_width and current_line != "": current_line = test_line_candidate
elif line_width <= max_text_width and current_line == "": current_line = test_line_candidate
elif current_line != "":
lines.append(current_line.strip())
current_line = word + " "
else:
temp_word = word
while self._get_text_dimensions(temp_word, self.font)[0] > max_text_width and len(temp_word) > 0: temp_word = temp_word[:-1]
lines.append(temp_word)
current_line = ""
if current_line.strip(): lines.append(current_line.strip())
if not lines: lines.append("(Text error in placeholder)")
_, single_line_height = self._get_text_dimensions("Tg", self.font)
if single_line_height == 0: single_line_height = self.font_size_pil
line_spacing_factor = 1.3
estimated_line_block_height = len(lines) * single_line_height * line_spacing_factor
y_text = (size[1] - estimated_line_block_height) / 2.0
if y_text < padding: y_text = float(padding)
for line_idx, line in enumerate(lines):
if line_idx >= 7 and len(lines) > 8: # Limit lines displayed on placeholder if too many
draw.text(xy=(float(padding), y_text), text="...", fill=(200, 200, 130), font=self.font)
break
line_width, _ = self._get_text_dimensions(line, self.font)
x_text = (size[0] - line_width) / 2.0
if x_text < padding: x_text = float(padding)
draw.text(xy=(x_text, y_text), text=line, fill=(220, 220, 150), font=self.font)
y_text += single_line_height * line_spacing_factor
filepath = os.path.join(self.output_dir, filename)
try:
img.save(filepath)
except Exception as e:
print(f"Error saving placeholder image {filepath}: {e}")
return None
return filepath
def generate_image_visual(self, image_prompt_text, scene_identifier_filename):
"""
Generates an image visual. Tries AI generation if enabled,
otherwise falls back to a placeholder.
image_prompt_text: The detailed prompt from Gemini for image generation.
scene_identifier_filename: A unique filename for this image (e.g., "scene_1_visual.png").
"""
filepath = os.path.join(self.output_dir, scene_identifier_filename)
# --- UNCOMMENT AND IMPLEMENT ONE OF THESE SECTIONS WHEN READY ---
# if self.USE_AI_IMAGE_GENERATION and self.image_generation_pipe: # Example for Diffusers
# try:
# print(f"Generating AI image (Diffusers) for: {image_prompt_text[:100]}...")
# # Add parameters like negative_prompt, guidance_scale, num_inference_steps
# ai_image = self.image_generation_pipe(image_prompt_text).images[0]
# ai_image = ai_image.resize((1024, 576)) # Resize to a standard display size
# ai_image.save(filepath)
# print(f"AI Image (Diffusers) saved: {filepath}")
# return filepath
# except Exception as e:
# print(f"Error generating AI image with Diffusers: {e}. Creating placeholder.")
# return self._create_placeholder_image_content(image_prompt_text, scene_identifier_filename)
# elif self.USE_AI_IMAGE_GENERATION and self.image_api_client: # Example for an API
# try:
# print(f"Generating AI image (API) for: {image_prompt_text[:100]}...")
# # --- Replace with your actual API call logic ---
# # response = self.image_api_client.generate(prompt=image_prompt_text, ...)
# # image_data = response.get_image_data()
# # with open(filepath, 'wb') as f:
# # f.write(image_data)
# # --- End of API call logic ---
# print(f"AI Image (API) saved: {filepath}")
# return filepath
# except Exception as e:
# print(f"Error generating AI image with API: {e}. Creating placeholder.")
# return self._create_placeholder_image_content(image_prompt_text, scene_identifier_filename)
# else: # Fallback to placeholder if AI generation is not enabled or failed initialization
print(f"AI image generation not enabled/ready. Creating placeholder for: {image_prompt_text[:70]}...")
return self._create_placeholder_image_content(image_prompt_text, scene_identifier_filename)
def create_video_from_images(self, image_paths, output_filename="final_video.mp4", fps=1, duration_per_image=3):
if not image_paths:
print("No images provided to create video.")
return None
valid_image_paths = [p for p in image_paths if p and os.path.exists(p)]
if not valid_image_paths:
print("No valid image paths found to create video.")
return None
print(f"Attempting to create video from {len(valid_image_paths)} images.")
try:
clips = []
for m_path in valid_image_paths:
try:
# Ensure image is a reasonable size for video, MoviePy can struggle with huge images
img_for_clip = Image.open(m_path)
if img_for_clip.width > 1920 or img_for_clip.height > 1080:
img_for_clip.thumbnail((1920, 1080)) # Resize if too large
# Save back to a temp path or directly use the PIL image object if ImageClip supports it well
# For simplicity, let's assume ImageClip handles PIL Image objects
clip = ImageClip(m_path).set_duration(duration_per_image) # Using path directly for now
# clip = ImageClip(np.array(img_for_clip)).set_duration(duration_per_image) # If using PIL image
clips.append(clip)
except Exception as e_clip:
print(f"Error creating ImageClip for {m_path}: {e_clip}. Skipping.")
if not clips:
print("Could not create any ImageClips.")
return None
video_clip = concatenate_videoclips(clips, method="compose")
output_path = os.path.join(self.output_dir, output_filename)
print(f"Writing video to: {output_path}")
video_clip.write_videofile(
output_path, fps=fps, codec='libx264', audio_codec='aac',
temp_audiofile=os.path.join(self.output_dir, f'temp-audio-{os.urandom(4).hex()}.m4a'),
remove_temp=True, threads=os.cpu_count() or 2, logger='bar'
)
for clip_to_close in clips: clip_to_close.close()
if hasattr(video_clip, 'close'): video_clip.close()
print(f"Video successfully created: {output_path}")
return output_path
except Exception as e:
print(f"Error during video creation: {e}")
return None |