Spaces:
Running
Running
""" | |
Progressive Image Reveal Mosaic System for Tag Collector Game | |
This module provides a completely different approach to the tag mosaic visualization, | |
treating it as a progressive image reveal where each tag discovery unveils portions | |
of a template image, with rarer tags revealing more pixels. | |
""" | |
import os | |
import hashlib | |
import streamlit as st | |
import numpy as np | |
import math | |
import io | |
import time | |
import random | |
from PIL import Image, ImageDraw, ImageFilter | |
from game_constants import RARITY_LEVELS, ENKEPHALIN_CURRENCY_NAME, ENKEPHALIN_ICON | |
# Default paths | |
DEFAULT_TEMPLATES_DIR = "mosaics/templates" | |
DEFAULT_MOSAICS_DIR = "mosaics" | |
def ensure_directories(): | |
"""Ensure all required directories exist""" | |
# Create mosaics directory if it doesn't exist | |
if not os.path.exists(DEFAULT_MOSAICS_DIR): | |
os.makedirs(DEFAULT_MOSAICS_DIR) | |
# Create templates directory if it doesn't exist | |
if not os.path.exists(DEFAULT_TEMPLATES_DIR): | |
os.makedirs(DEFAULT_TEMPLATES_DIR) | |
def initialize_mosaic(mosaic_name="main", total_tags=100): | |
""" | |
Initialize the appropriate mosaic type based on the template file. | |
Args: | |
mosaic_name: Name of the mosaic (used for file paths) | |
total_tags: Total number of tags expected for this mosaic | |
Returns: | |
An instance of RevealMosaic, AnimatedRevealMosaic, or VideoRevealMosaic | |
""" | |
# Check for GIF template | |
gif_path = os.path.join(DEFAULT_TEMPLATES_DIR, f"{mosaic_name}_template.gif") | |
if os.path.exists(gif_path): | |
# Try to open as GIF and check if animated | |
try: | |
img = Image.open(gif_path) | |
is_animated = hasattr(img, 'n_frames') and img.n_frames > 1 | |
if is_animated: | |
print(f"Found animated GIF template: {gif_path}") | |
return AnimatedRevealMosaic( | |
total_tags=total_tags, | |
template_path=gif_path, | |
mosaic_name=mosaic_name | |
) | |
except Exception as e: | |
print(f"Error checking animation: {e}") | |
# Fallback to standard reveal mosaic | |
return RevealMosaic( | |
total_tags=total_tags, | |
mosaic_name=mosaic_name | |
) | |
class RevealMosaic: | |
"""Manages the progressive revealing of an image as tags are discovered""" | |
def __init__(self, | |
total_tags=100, | |
template_path=None, | |
mosaic_name="main", | |
save_path=None, | |
mask_color=(0, 0, 0)): | |
""" | |
Initialize the reveal mosaic | |
Args: | |
total_tags: Total number of tags expected for this mosaic | |
template_path: Path to the template image. If None, generated from mosaic_name | |
mosaic_name: Name of the mosaic for storage | |
save_path: Path to save the mosaic mask. If None, generated from mosaic_name | |
mask_color: Color to use for the mask (default: black) | |
""" | |
# Store basic parameters | |
self.total_tags = max(1, total_tags) # Ensure at least 1 tag | |
self.mosaic_name = mosaic_name | |
self.mask_color = mask_color | |
# Ensure directories exist | |
ensure_directories() | |
# Generate paths if not provided | |
if save_path is None: | |
self.save_path = os.path.join(DEFAULT_MOSAICS_DIR, f"{mosaic_name}_mosaic_mask.png") | |
else: | |
self.save_path = save_path | |
if template_path is None: | |
self.template_path = os.path.join(DEFAULT_TEMPLATES_DIR, f"{mosaic_name}_template.png") | |
else: | |
self.template_path = template_path | |
# Initialize tracking sets | |
self.processed_tags = set() # Tags we've already processed | |
self.revealed_pixels = set() # Pixels that have been revealed | |
self.highlighted_tags = [] # Recently added tags | |
# Load or create template image | |
self.template_image = self.load_template_image() | |
# Get image dimensions | |
self.width, self.height = self.template_image.size | |
self.total_pixels = self.width * self.height | |
# Calculate how many pixels each tag should reveal on average | |
self.base_pixels_per_tag = self.total_pixels / self.total_tags | |
print(f"Base pixels per tag: {self.base_pixels_per_tag:.1f} (Total: {self.total_pixels} pixels, {self.total_tags} tags)") | |
# Create or load the mask | |
self.mask = self.load_or_create_mask() | |
# Create an image-wide priority map once (for consistent reveals) | |
# Only create this when needed to save memory and startup time | |
self._priority_map = None | |
# Track last update time | |
self.last_update_time = time.time() | |
# Flag to indicate if an update is needed | |
self.needs_update = False | |
# Cache for the final image | |
self._cached_image = None | |
# Flag for whether the cache is valid | |
self._cache_valid = False | |
def priority_map(self): | |
"""Lazy-load the priority map only when needed""" | |
if self._priority_map is None: | |
print(f"Generating priority map for {self.mosaic_name}...") | |
self._priority_map = self.create_priority_map() | |
return self._priority_map | |
def load_template_image(self): | |
"""Load the template image or create a default one""" | |
if os.path.exists(self.template_path): | |
try: | |
img = Image.open(self.template_path).convert('RGB') | |
print(f"Loaded template image from {self.template_path}") | |
return img | |
except Exception as e: | |
print(f"Error loading template image: {e}") | |
# If no image exists or there was an error, create a default | |
return self.create_default_template() | |
def load_or_create_mask(self): | |
"""Load an existing mask or create a new one""" | |
if os.path.exists(self.save_path): | |
try: | |
mask = Image.open(self.save_path).convert('L') | |
# Ensure mask size matches template | |
if mask.size != self.template_image.size: | |
mask = mask.resize(self.template_image.size) | |
print(f"Loaded mask from {self.save_path}") | |
# Count revealed pixels (where mask is 0/transparent) | |
revealed_count = 0 | |
mask_data = mask.getdata() | |
for i, pixel in enumerate(mask_data): | |
if pixel == 0: | |
x = i % self.width | |
y = i // self.width | |
self.revealed_pixels.add((x, y)) | |
revealed_count += 1 | |
print(f"Mask has {revealed_count} revealed pixels out of {self.total_pixels}") | |
return mask | |
except Exception as e: | |
print(f"Error loading mask: {e}") | |
# Create a new fully opaque mask (nothing revealed) | |
mask = Image.new('L', self.template_image.size, 255) | |
return mask | |
def has_new_tags(self, collected_tags): | |
""" | |
Check if there are new tags that haven't been processed yet. | |
Args: | |
collected_tags: Dictionary of {tag_name: info} of collected tags | |
Returns: | |
Tuple of (has_new_tags, count_of_new_tags) | |
""" | |
new_tag_count = 0 | |
# Check for tags that aren't in processed_tags | |
for tag, info in collected_tags.items(): | |
# Only include tags with count > 0 | |
if info.get("count", 0) <= 0: | |
continue | |
# Count tags we haven't processed yet | |
if tag not in self.processed_tags: | |
new_tag_count += 1 | |
return (new_tag_count > 0, new_tag_count) | |
def update_with_tags(self, collected_tags, metadata=None, force_update=False): | |
""" | |
Update the mosaic with newly collected tags. | |
Args: | |
collected_tags: Dictionary of {tag_name: info} of collected tags | |
metadata: Optional metadata (not used in this implementation) | |
force_update: Force update even if there are no new tags | |
Returns: | |
Number of newly revealed pixels | |
""" | |
# If no force update, quick check for new tags | |
has_new, new_count = self.has_new_tags(collected_tags) | |
if not has_new and not force_update: | |
# No new tags to process | |
return 0 | |
# Get start time for performance tracking | |
start_time = time.time() | |
# Clear previous highlights | |
self.highlighted_tags = [] | |
# Track if we need to update the mask | |
self.needs_update = False | |
total_newly_revealed = 0 | |
# Process all tags in the collection if this is first run or force_update | |
# Otherwise just process new tags | |
known_tags = set(self.processed_tags) | |
all_tags_to_process = [] | |
# First pass: determine which tags to process | |
for tag, info in collected_tags.items(): | |
# Only include tags with count > 0 | |
if info.get("count", 0) <= 0: | |
continue | |
# Mark new tags for processing | |
if tag not in known_tags: | |
# Get the tag's rarity | |
rarity = info.get("rarity", "Canard") # Default to lowest rarity | |
all_tags_to_process.append((tag, rarity, True)) # True = new tag | |
# If no newly discovered tags, but on first run, process all existing tags | |
if not all_tags_to_process and (len(self.processed_tags) == 0 or force_update): | |
for tag, info in collected_tags.items(): | |
if info.get("count", 0) <= 0: | |
continue | |
rarity = info.get("rarity", "Canard") | |
all_tags_to_process.append((tag, rarity, False)) # False = not new | |
# If nothing to process, return early | |
if not all_tags_to_process: | |
return 0 | |
# Sort tags by rarity (process rarest first) | |
rarity_order = ["Impuritas Civitas", "Star of the City", "Urban Nightmare", | |
"Urban Plague", "Urban Legend", "Urban Myth", "Canard"] | |
def get_rarity_rank(tag_tuple): | |
_, rarity, _ = tag_tuple | |
if rarity in rarity_order: | |
return rarity_order.index(rarity) | |
return len(rarity_order) # Unknown rarity goes at the end | |
all_tags_to_process.sort(key=get_rarity_rank) | |
# Process tags | |
for tag, rarity, is_new in all_tags_to_process: | |
# Reveal pixels for this tag | |
newly_revealed = self.reveal_pixels_for_tag(tag, rarity) | |
total_newly_revealed += newly_revealed | |
# Mark this tag as processed | |
self.processed_tags.add(tag) | |
# Update and save the mask if we revealed new pixels | |
if self.needs_update: | |
self.update_mask() | |
# Invalidate the image cache | |
self._cache_valid = False | |
# Update last update time | |
self.last_update_time = time.time() | |
# Report performance | |
end_time = time.time() | |
print(f"Mosaic update processed {len(all_tags_to_process)} tags, revealed {total_newly_revealed} pixels in {end_time - start_time:.3f}s") | |
return total_newly_revealed | |
def reveal_pixels_for_tag(self, tag, rarity): | |
""" | |
Reveal pixels for a newly discovered tag. | |
Args: | |
tag: The tag name | |
rarity: The tag's rarity | |
Returns: | |
Number of newly revealed pixels | |
""" | |
# Calculate how many pixels to reveal | |
pixels_to_reveal = self.calculate_pixels_to_reveal(rarity) | |
# Get next set of pixels to reveal from priority map | |
newly_revealed = [] | |
for pixel in self.priority_map: | |
if pixel not in self.revealed_pixels: | |
newly_revealed.append(pixel) | |
self.revealed_pixels.add(pixel) | |
if len(newly_revealed) >= pixels_to_reveal: | |
break | |
# No need to update if no new pixels were revealed | |
if not newly_revealed: | |
return 0 | |
# Add tag to highlighted tags | |
if newly_revealed: | |
# Add a random revealed pixel to highlight | |
highlight_pixel = random.choice(newly_revealed) | |
self.highlighted_tags.append((tag, highlight_pixel[0], highlight_pixel[1], rarity)) | |
# Flag that we need to update the mask | |
self.needs_update = True | |
# Return count of newly revealed pixels | |
return len(newly_revealed) | |
def update_mask(self): | |
"""Update and save the mask based on current revealed pixels""" | |
# Create a new mask image | |
new_mask = Image.new('L', (self.width, self.height), 255) | |
# Reveal pixels (set to 0/transparent where we want the image to show) | |
pixels_updated = 0 | |
for x, y in self.revealed_pixels: | |
if 0 <= x < self.width and 0 <= y < self.height: | |
new_mask.putpixel((x, y), 0) | |
pixels_updated += 1 | |
print(f"Updated mask with {pixels_updated} revealed pixels") | |
# Save the updated mask | |
self.mask = new_mask | |
try: | |
self.mask.save(self.save_path) | |
print(f"Saved mask to {self.save_path}") | |
except Exception as e: | |
print(f"Error saving mask: {e}") | |
# Invalidate the image cache since the mask has changed | |
self._cache_valid = False | |
def get_image(self, force_refresh=False): | |
""" | |
Get the current mosaic image (template with mask applied). | |
Uses caching to avoid regenerating the image unless needed. | |
Args: | |
force_refresh: Force regeneration of the image even if cached | |
Returns: | |
PIL Image of the current state | |
""" | |
# Check if we need to regenerate the image | |
if force_refresh or not self._cache_valid or self._cached_image is None: | |
print(f"Regenerating mosaic image for {self.mosaic_name}") | |
# If the mask file exists but differs from our current mask, reload it | |
if os.path.exists(self.save_path): | |
try: | |
file_mask = Image.open(self.save_path).convert('L') | |
if file_mask.size == self.mask.size: | |
# Check if the masks are different | |
diff = 0 | |
file_data = file_mask.getdata() | |
mask_data = self.mask.getdata() | |
for i, (f_pixel, m_pixel) in enumerate(zip(file_data, mask_data)): | |
if f_pixel != m_pixel: | |
diff += 1 | |
if diff > 0: | |
print(f"Mask file differs from memory by {diff} pixels, reloading") | |
self.mask = file_mask | |
except Exception as e: | |
print(f"Error comparing masks: {e}") | |
# Create a copy of the template | |
result = self.template_image.copy() | |
# Create a solid color image for unrevealed areas | |
mask_color_img = Image.new('RGB', result.size, self.mask_color) | |
# Apply the mask (0=transparent, 255=opaque) | |
result.paste(mask_color_img, (0, 0), self.mask) | |
# Cache the result | |
self._cached_image = result | |
self._cache_valid = True | |
return result | |
else: | |
# Return cached image | |
return self._cached_image | |
def load_or_create_mask(self): | |
"""Load an existing mask or create a new one""" | |
if os.path.exists(self.save_path): | |
try: | |
mask = Image.open(self.save_path).convert('L') | |
# Ensure mask size matches template | |
if mask.size != self.template_image.size: | |
print(f"Resizing mask from {mask.size} to {self.template_image.size}") | |
mask = mask.resize(self.template_image.size) | |
# Save the resized mask | |
mask.save(self.save_path) | |
print(f"Loaded mask from {self.save_path}") | |
# Count revealed pixels (where mask is 0/transparent) | |
revealed_count = 0 | |
mask_data = mask.getdata() | |
for i, pixel in enumerate(mask_data): | |
if pixel == 0: # Fully transparent | |
x = i % self.width | |
y = i // self.width | |
self.revealed_pixels.add((x, y)) | |
revealed_count += 1 | |
print(f"Loaded mask has {revealed_count} revealed pixels out of {self.total_pixels}") | |
return mask | |
except Exception as e: | |
print(f"Error loading mask: {e}") | |
# Create a new fully opaque mask (nothing revealed) | |
print(f"Creating new mask for {self.mosaic_name}") | |
mask = Image.new('L', self.template_image.size, 255) | |
# Save the empty mask | |
mask.save(self.save_path) | |
return mask | |
def get_stats(self): | |
""" | |
Get statistics about the mosaic completion. | |
Returns: | |
Dictionary with completion statistics | |
""" | |
# Calculate completion percentage | |
revealed_count = len(self.revealed_pixels) | |
completion_percentage = min(100, (revealed_count / self.total_pixels) * 100) | |
return { | |
"revealed_pixels": revealed_count, | |
"total_pixels": self.total_pixels, | |
"completion_percentage": completion_percentage, | |
"completion_pattern": get_completion_pattern(completion_percentage), | |
"newly_highlighted": len(self.highlighted_tags), | |
"has_new_tags": len(self.highlighted_tags) > 0 | |
} | |
def calculate_pixels_to_reveal(self, rarity): | |
""" | |
Calculate how many pixels to reveal based on tag rarity and total tags. | |
Args: | |
rarity: The rarity of the discovered tag | |
Returns: | |
Number of pixels to reveal | |
""" | |
# Get total tags to distribute pixels among | |
# This value should be set correctly during initialization | |
total_tags_expected = max(1, self.total_tags) | |
# Calculate base pixels per tag (equal distribution) | |
base_pixels_per_tag = self.total_pixels / total_tags_expected | |
# Apply rarity multiplier | |
rarity_multiplier = 1.0 | |
if rarity == "Canard": | |
rarity_multiplier = 0.5 # Half the average | |
elif rarity == "Urban Myth": | |
rarity_multiplier = 0.8 # Slightly below average | |
elif rarity == "Urban Legend": | |
rarity_multiplier = 1.0 # Average | |
elif rarity == "Urban Plague": | |
rarity_multiplier = 1.5 # Above average | |
elif rarity == "Urban Nightmare": | |
rarity_multiplier = 2.0 # Double | |
elif rarity == "Star of the City": | |
rarity_multiplier = 3.0 # Triple | |
elif rarity == "Impuritas Civitas": | |
rarity_multiplier = 5.0 # Five times | |
# Calculate pixels to reveal, ensuring at least 100 pixels | |
pixels_to_reveal = max(100, int(base_pixels_per_tag * rarity_multiplier)) | |
# Don't reveal too many at once for visual smoothness | |
max_at_once = min(100000, int(self.total_pixels * 0.1)) # 10% of total or 100k, whichever is less | |
pixels_to_reveal = min(pixels_to_reveal, max_at_once) | |
# Make sure we don't try to reveal more than what's left | |
unrevealed_pixels = self.total_pixels - len(self.revealed_pixels) | |
pixels_to_reveal = min(pixels_to_reveal, unrevealed_pixels) | |
return pixels_to_reveal | |
def create_priority_map(self): | |
""" | |
Create a priority map for pixel reveal order. | |
Returns: | |
List of pixel coordinates in priority order (highest to lowest) | |
""" | |
# Convert the image to grayscale for brightness analysis | |
gray_img = self.template_image.convert('L') | |
# Get brightness values | |
brightness_map = {} | |
width, height = gray_img.size | |
# Create center point and max distance for normalization | |
center_x, center_y = width // 2, height // 2 | |
max_dist = math.sqrt(center_x**2 + center_y**2) | |
# Calculate priority based on: | |
# 1. Distance from center (higher = closer to center) | |
# 2. Brightness (higher = brighter parts of image) | |
for y in range(height): | |
for x in range(width): | |
# Distance from center (normalized 0-1) | |
dx, dy = x - center_x, y - center_y | |
distance = math.sqrt(dx**2 + dy**2) | |
distance_factor = 1.0 - (distance / max_dist) | |
# Brightness (normalized 0-1) | |
brightness = gray_img.getpixel((x, y)) / 255.0 | |
# Edge detection factor - look for neighboring pixels with different brightness | |
# This helps reveal edges of objects first | |
edge_factor = 0.0 | |
if x > 0 and x < width-1 and y > 0 and y < height-1: | |
# Check surrounding pixels | |
neighbors = [ | |
gray_img.getpixel((x-1, y)), | |
gray_img.getpixel((x+1, y)), | |
gray_img.getpixel((x, y-1)), | |
gray_img.getpixel((x, y+1)) | |
] | |
current = gray_img.getpixel((x, y)) | |
# Calculate average difference with neighbors | |
diff_sum = sum(abs(current - n) for n in neighbors) | |
edge_factor = min(1.0, diff_sum / (4 * 255.0)) | |
# Calculate priority - weight factors according to importance | |
# 40% distance from center, 40% brightness, 20% edge detection | |
priority = (distance_factor * 0.4) + (brightness * 0.4) + (edge_factor * 0.2) | |
# Store in map with a small random factor to prevent exact ties | |
random_factor = random.random() * 0.01 # 1% randomness | |
brightness_map[(x, y)] = priority + random_factor | |
# Sort by priority (highest to lowest) | |
sorted_pixels = sorted(brightness_map.items(), key=lambda x: x[1], reverse=True) | |
# Return just the pixel coordinates in order | |
return [pixel for pixel, _ in sorted_pixels] | |
def create_default_template(self, width=1024, height=1024): | |
"""Create a default colorful template image""" | |
# Create a new black canvas | |
img = Image.new('RGB', (width, height), (0, 0, 0)) | |
draw = ImageDraw.Draw(img) | |
# Draw a colorful circular pattern | |
center_x, center_y = width // 2, height // 2 | |
max_radius = min(width, height) // 2 - 10 | |
# Draw colorful background gradients | |
for y in range(height): | |
for x in range(width): | |
# Calculate distance from center | |
dx, dy = x - center_x, y - center_y | |
distance = math.sqrt(dx*dx + dy*dy) | |
# Calculate angle | |
angle = math.atan2(dy, dx) | |
# Normalize distance | |
norm_distance = min(1.0, distance / max_radius) | |
# Create color based on angle and distance | |
# This creates a colorful cosmic-like background | |
hue = (math.degrees(angle) % 360) / 360.0 | |
saturation = 0.7 - 0.3 * norm_distance | |
value = 0.2 + 0.3 * (1 - norm_distance) | |
# Convert HSV to RGB | |
h = hue * 6 | |
i = int(h) | |
f = h - i | |
p = value * (1 - saturation) | |
q = value * (1 - saturation * f) | |
t = value * (1 - saturation * (1 - f)) | |
if i == 0: | |
r, g, b = value, t, p | |
elif i == 1: | |
r, g, b = q, value, p | |
elif i == 2: | |
r, g, b = p, value, t | |
elif i == 3: | |
r, g, b = p, q, value | |
elif i == 4: | |
r, g, b = t, p, value | |
else: | |
r, g, b = value, p, q | |
r, g, b = int(r * 255), int(g * 255), int(b * 255) | |
# Make the center brighter | |
if distance < max_radius * 0.2: | |
# Central brightness | |
brightness = 1.0 - (distance / (max_radius * 0.2)) | |
r = min(255, r + int(brightness * (255 - r))) | |
g = min(255, g + int(brightness * (255 - g))) | |
b = min(255, b + int(brightness * (255 - b))) | |
# Add a bright spot at the center | |
img.putpixel((x, y), (r, g, b)) | |
# Draw a central bright spot | |
for r in range(50, 0, -1): | |
brightness = 1.0 - (r / 50) | |
color = ( | |
min(255, int(200 + brightness * 55)), | |
min(255, int(150 + brightness * 105)), | |
min(255, int(100 + brightness * 155)) | |
) | |
draw.ellipse((center_x - r, center_y - r, center_x + r, center_y + r), fill=color) | |
# Apply a slight blur for a smoother appearance | |
img = img.filter(ImageFilter.GaussianBlur(radius=1.5)) | |
# Save the template | |
img.save(self.template_path) | |
print(f"Created default template at {self.template_path}") | |
return img | |
def get_completion_pattern(completion_percentage): | |
""" | |
Get a description of the completion pattern based on percentage. | |
Args: | |
completion_percentage: Percentage of completion (0-100) | |
Returns: | |
String description of what's visible | |
""" | |
if completion_percentage < 1: | |
return "the first glimpses of a hidden image" | |
elif completion_percentage < 5: | |
return "emerging fragments of a mysterious picture" | |
elif completion_percentage < 15: | |
return "a partial revelation of the concealed artwork" | |
elif completion_percentage < 30: | |
return "a quarter of the image taking shape" | |
elif completion_percentage < 50: | |
return "half of the picture becoming clear" | |
elif completion_percentage < 75: | |
return "most of the image revealed" | |
elif completion_percentage < 95: | |
return "nearly complete image with just a few hidden details" | |
else: | |
return "fully revealed artwork" | |
class AnimatedRevealMosaic(RevealMosaic): | |
"""Manages the progressive revealing of an animated GIF or video as tags are discovered""" | |
def __init__(self, | |
total_tags=100, | |
template_path=None, | |
mosaic_name="main", | |
save_path=None, | |
mask_color=(0, 0, 0)): | |
"""Initialize with support for animated GIFs""" | |
# Store animation-specific attributes | |
self.is_animated = False | |
self.frames = [] | |
self.frame_durations = [] | |
# Call parent initializer | |
super().__init__( | |
total_tags=total_tags, | |
template_path=template_path, | |
mosaic_name=mosaic_name, | |
save_path=save_path, | |
mask_color=mask_color | |
) | |
def load_template_image(self): | |
"""Load the template image with support for animated GIFs""" | |
if os.path.exists(self.template_path): | |
try: | |
# Open the image | |
img = Image.open(self.template_path) | |
# Check if it's an animated GIF | |
try: | |
# Get number of frames | |
self.is_animated = hasattr(img, 'n_frames') and img.n_frames > 1 | |
if self.is_animated: | |
print(f"Loading animated GIF with {img.n_frames} frames") | |
# Store all frames | |
self.frames = [] | |
self.frame_durations = [] | |
for frame_idx in range(img.n_frames): | |
img.seek(frame_idx) | |
# Store frame duration | |
self.frame_durations.append(img.info.get('duration', 100)) # Default 100ms | |
# Store frame as RGB | |
self.frames.append(img.convert('RGB').copy()) | |
# Return the first frame as the template_image | |
return self.frames[0] | |
else: | |
# Not animated, treat as regular image | |
print(f"Loaded static template image from {self.template_path}") | |
return img.convert('RGB') | |
except Exception as e: | |
print(f"Error processing animation: {e}") | |
# Fallback to static image | |
return img.convert('RGB') | |
except Exception as e: | |
print(f"Error loading template image: {e}") | |
# If no image exists or there was an error, create a default | |
return self.create_default_template() | |
def get_image(self, force_refresh=False): | |
""" | |
Get the current mosaic image with support for animation. | |
Args: | |
force_refresh: Force regeneration of the image even if cached | |
Returns: | |
PIL Image or list of PIL Images for animated GIFs | |
""" | |
# If not animated, use parent method | |
if not self.is_animated or not self.frames: | |
return super().get_image(force_refresh) | |
# Check if we need to regenerate the image | |
if force_refresh or not self._cache_valid or self._cached_image is None: | |
print(f"Regenerating animated mosaic image for {self.mosaic_name}") | |
# Create masked frames | |
masked_frames = [] | |
# Create a solid color image for unrevealed areas | |
mask_color_img = Image.new('RGB', self.frames[0].size, self.mask_color) | |
# Apply mask to each frame | |
for frame in self.frames: | |
# Create a copy of the frame | |
result_frame = frame.copy() | |
# Apply the mask (0=transparent, 255=opaque) | |
result_frame.paste(mask_color_img, (0, 0), self.mask) | |
# Add to masked frames | |
masked_frames.append(result_frame) | |
# Store the frames and mark cache as valid | |
self._cached_image = masked_frames | |
self._cache_valid = True | |
return masked_frames | |
else: | |
# Return cached frames | |
return self._cached_image | |
def display_tag_mosaic(): | |
"""Display the tag mosaic in the game UI with progressive image reveal""" | |
import streamlit as st | |
# Create a container for the mosaic display | |
with st.container(): | |
st.subheader("π§© Tag Collection Mosaic") | |
# Add loading warning | |
st.info("β³ Note: Initial loading or switching templates may take some time for high-resolution images due to pixel processing.") | |
# Add an expander for advanced settings | |
with st.expander("Mosaic Settings", expanded=False): | |
# Only render the uploader if the expander is open | |
uploaded_file = st.file_uploader("Upload a custom template image", type=["png", "jpg", "jpeg", "gif"]) | |
if uploaded_file is not None: | |
try: | |
# Process the uploaded image | |
image = Image.open(uploaded_file) | |
# Check if it's an animated GIF | |
is_animated = hasattr(image, 'n_frames') and image.n_frames > 1 | |
if is_animated: | |
st.info(f"Animated GIF detected with {image.n_frames} frames. Processing may take longer.") | |
elif image.width * image.height > 2000000: # More than 2 million pixels | |
st.warning(f"Large image detected ({image.width}x{image.height}). Processing may take longer.") | |
# Create templates directory if it doesn't exist | |
ensure_directories() | |
# Save as template | |
template_path = os.path.join(DEFAULT_TEMPLATES_DIR, "main_template.gif" if is_animated else "main_template.png") | |
image.save(template_path) | |
st.success("Template updated! Tags will now progressively reveal this image.") | |
# Clear the current mask to start fresh | |
mask_path = os.path.join(DEFAULT_MOSAICS_DIR, "main_mosaic_mask.png") | |
if os.path.exists(mask_path): | |
os.remove(mask_path) | |
# Reinitialize the mosaic | |
if 'tag_mosaic' in st.session_state: | |
del st.session_state.tag_mosaic | |
except Exception as e: | |
st.error(f"Error processing image: {e}") | |
# Initialize the mosaic if not already in session state | |
if 'tag_mosaic' not in st.session_state: | |
# Try to load the total tags count from metadata if available | |
total_tags = 70527 # Default | |
try: | |
if hasattr(st.session_state, 'model') and hasattr(st.session_state.model, 'dataset'): | |
if hasattr(st.session_state.model.dataset, 'tag_to_idx'): | |
total_tags = len(st.session_state.model.dataset.tag_to_idx) | |
except Exception as e: | |
print(f"Error getting tag count from metadata: {e}") | |
# Check for animated template and use appropriate class | |
template_path = os.path.join(DEFAULT_TEMPLATES_DIR, "main_template.gif") | |
if os.path.exists(template_path): | |
# Try to open as GIF and check if animated | |
try: | |
img = Image.open(template_path) | |
is_animated = hasattr(img, 'n_frames') and img.n_frames > 1 | |
if is_animated: | |
# Create the animated reveal mosaic | |
st.session_state.tag_mosaic = AnimatedRevealMosaic( | |
total_tags=total_tags, | |
template_path=template_path, | |
mosaic_name="main" | |
) | |
print("Using AnimatedRevealMosaic") | |
else: | |
# Fallback to standard reveal mosaic | |
st.session_state.tag_mosaic = RevealMosaic( | |
total_tags=total_tags, | |
mosaic_name="main" | |
) | |
except Exception as e: | |
print(f"Error checking animation: {e}") | |
# Fallback to standard reveal mosaic | |
st.session_state.tag_mosaic = RevealMosaic( | |
total_tags=total_tags, | |
mosaic_name="main" | |
) | |
else: | |
# No animated GIF template, use standard reveal mosaic | |
st.session_state.tag_mosaic = RevealMosaic( | |
total_tags=total_tags, | |
mosaic_name="main" | |
) | |
# Get the mosaic from session state | |
mosaic = st.session_state.tag_mosaic | |
# Make sure processed_tags is initialized | |
if not hasattr(mosaic, 'processed_tags'): | |
mosaic.processed_tags = set() | |
# Display milestone tracker for main collection at the top | |
if hasattr(st.session_state, 'collected_tags'): | |
# Import the milestone tracker display function if needed | |
from series_mosaics import display_milestone_tracker | |
display_milestone_tracker("main", st.session_state.collected_tags, mosaic.total_tags) | |
# Add update button | |
update_requested = st.button("π Update Mosaic") | |
# Display the manually update message | |
if not update_requested: | |
st.info("Click the 'Update Mosaic' button to process new tag discoveries and update the image.") | |
# Update the mosaic with the latest collected tags only if requested | |
newly_revealed = 0 | |
if update_requested and hasattr(st.session_state, 'collected_tags'): | |
# Show processing spinner | |
with st.spinner("Processing tag discoveries and updating mosaic..."): | |
# Get optional metadata if available | |
metadata = st.session_state.model.dataset if hasattr(st.session_state, 'model') else None | |
newly_revealed = mosaic.update_with_tags(st.session_state.collected_tags, metadata, force_update=True) | |
# Check for milestone rewards after updating | |
from series_mosaics import check_and_award_milestone_rewards # Import the reward function | |
# For main collection, use total model tags as the total (if available) | |
total_main_tags = mosaic.total_tags | |
milestone, reward = check_and_award_milestone_rewards("main", st.session_state.collected_tags, total_main_tags) | |
# Show appropriate messages based on update results | |
if milestone is not None: | |
# Show milestone achievement message with celebration | |
st.balloons() | |
st.success(f"π MILESTONE ACHIEVED! {milestone}% Completion of Main Collection!") | |
st.success(f"Rewarded with {reward} {ENKEPHALIN_ICON} {ENKEPHALIN_CURRENCY_NAME}!") | |
# Force a rerun to update the UI with new enkephalin | |
st.rerun() | |
elif newly_revealed > 0: | |
st.success(f"Successfully updated! Revealed {newly_revealed} new pixels.") | |
else: | |
st.info("No new pixels to reveal.") | |
# Get mosaic stats | |
stats = mosaic.get_stats() | |
# Show completion stats | |
col1, col2 = st.columns(2) | |
with col1: | |
st.write(f"**Completion:** {stats['completion_percentage']:.2f}%") | |
st.write(f"**Pixels Revealed:** {stats['revealed_pixels']} / {stats['total_pixels']}") | |
with col2: | |
st.write(f"**Status:** {stats['completion_pattern']}") | |
if newly_revealed > 0: | |
st.write(f"**Newly Revealed:** {newly_revealed} pixels") | |
# Display the mosaic image | |
mosaic_img = mosaic.get_image() | |
# Check if we have an animated mosaic | |
if hasattr(mosaic, 'is_animated') and mosaic.is_animated and isinstance(mosaic_img, list): | |
# Convert animated frames to GIF for display | |
img_bytes = io.BytesIO() | |
# Save as animated GIF | |
mosaic_img[0].save( | |
img_bytes, | |
format='GIF', | |
save_all=True, | |
append_images=mosaic_img[1:], | |
duration=mosaic.frame_durations, | |
loop=0 | |
) | |
img_bytes.seek(0) | |
st.image(img_bytes, caption="Your Tag Collection Mosaic - Each discovery reveals more of the hidden image", | |
use_container_width=True) | |
else: | |
# Handle static image as before | |
img_bytes = io.BytesIO() | |
mosaic_img.save(img_bytes, format='PNG') | |
img_bytes.seek(0) | |
st.image(img_bytes, caption="Your Tag Collection Mosaic - Each discovery reveals more of the hidden image", | |
use_container_width=True) | |
# Show legend for rarities | |
st.write("**Rarity Legend:**") | |
cols = st.columns(len(RARITY_LEVELS)) | |
for i, (rarity, info) in enumerate(RARITY_LEVELS.items()): | |
with cols[i]: | |
st.markdown( | |
f"<div style='background-color:{info['color']};height:20px;width:20px;display:inline-block;margin-right:5px;'></div> {rarity}", | |
unsafe_allow_html=True | |
) | |
# Show recently added tags | |
if mosaic.highlighted_tags: | |
with st.expander("Recently Added Tags", expanded=False): | |
for tag, _, _, rarity in mosaic.highlighted_tags: | |
color = RARITY_LEVELS.get(rarity, {}).get("color", "#AAAAAA") | |
st.markdown( | |
f"<span style='color:{color};font-weight:bold;'>{tag}</span>", | |
unsafe_allow_html=True | |
) |