Spaces:
Running
Running
""" | |
Enhanced Tag Storage System for Tag Collector Game | |
This module provides functions for efficient storage and retrieval of tag data and game state, | |
reducing reliance on Streamlit's session state for better performance. | |
""" | |
import os | |
import json | |
import time | |
import streamlit as st | |
from typing import Dict, Any, Set, Optional, Tuple, List | |
# Define constants | |
DEFAULT_TAGS_FILE = "tag_collection.json" | |
DEFAULT_MOSAIC_DATA_FILE = "mosaic_data.json" | |
DEFAULT_GAME_STATE_FILE = "game_state.json" | |
DEFAULT_LIBRARY_STATE_FILE = "library_state.json" | |
DEFAULT_ESSENCE_STATE_FILE = "essence_state.json" | |
TAG_STORAGE_VERSION = "1.2.0" # Updated version for new functionality | |
def ensure_storage_directory(directory: str = "storage") -> str: | |
"""Ensure the storage directory exists and return its path""" | |
if not os.path.exists(directory): | |
os.makedirs(directory) | |
return directory | |
def get_storage_path(filename: str, directory: str = "storage") -> str: | |
"""Get the full path for a storage file""" | |
storage_dir = ensure_storage_directory(directory) | |
return os.path.join(storage_dir, filename) | |
def save_tag_collection( | |
collected_tags: Dict[str, Dict[str, Any]], | |
tag_history: List[Dict[str, Any]] = None, | |
processed_tags: Optional[Set[str]] = None, | |
filename: str = DEFAULT_TAGS_FILE | |
) -> bool: | |
""" | |
Save the tag collection and related data to a JSON file | |
Args: | |
collected_tags: Dictionary of collected tags | |
tag_history: List of recent tag discoveries | |
processed_tags: Set of processed tags for mosaic visualization | |
filename: Name of the file to save to | |
Returns: | |
Boolean indicating if the save was successful | |
""" | |
try: | |
filepath = get_storage_path(filename) | |
# Prepare the data to save | |
save_data = { | |
"version": TAG_STORAGE_VERSION, | |
"timestamp": time.time(), | |
"save_date": time.strftime("%Y-%m-%d %H:%M:%S"), | |
"collected_tags": collected_tags, | |
"tag_history": tag_history if tag_history is not None else [] | |
} | |
# Convert processed_tags set to list for JSON serialization | |
if processed_tags is not None: | |
save_data["processed_tags"] = list(processed_tags) | |
# Save to file with pretty formatting | |
with open(filepath, 'w', encoding='utf-8') as f: | |
json.dump(save_data, f, indent=2, ensure_ascii=False) | |
print(f"Saved {len(collected_tags)} tags to {filepath}") | |
return True | |
except Exception as e: | |
print(f"Error saving tag collection: {str(e)}") | |
return False | |
def load_tag_collection(filename: str = DEFAULT_TAGS_FILE) -> Tuple[Optional[Dict[str, Any]], Optional[List[Dict[str, Any]]], Optional[Set[str]]]: | |
""" | |
Load the tag collection from a JSON file | |
Args: | |
filename: Name of the file to load from | |
Returns: | |
Tuple of (collected_tags, tag_history, processed_tags) or (None, None, None) if loading fails | |
""" | |
try: | |
filepath = get_storage_path(filename) | |
if not os.path.exists(filepath): | |
print(f"No tag collection file found at {filepath}") | |
return None, None, None | |
with open(filepath, 'r', encoding='utf-8') as f: | |
data = json.load(f) | |
collected_tags = data.get("collected_tags", {}) | |
tag_history = data.get("tag_history", []) | |
# Convert processed_tags back to a set | |
processed_tags = set(data.get("processed_tags", [])) | |
print(f"Loaded {len(collected_tags)} tags from {filepath}") | |
return collected_tags, tag_history, processed_tags | |
except Exception as e: | |
print(f"Error loading tag collection: {str(e)}") | |
return None, None, None | |
def add_tag_to_collection( | |
collected_tags: Dict[str, Dict[str, Any]], | |
tag: str, | |
rarity: str, | |
category: str, | |
tag_history: List[Dict[str, Any]], | |
is_update_only: bool = False | |
) -> Tuple[Dict[str, Dict[str, Any]], List[Dict[str, Any]], bool, int]: | |
""" | |
Add or update a tag in the collection | |
Args: | |
collected_tags: Dictionary of collected tags | |
tag: The tag name to add | |
rarity: Tag rarity | |
category: Tag category | |
tag_history: Tag history list | |
is_update_only: Only update existing tags, don't add new ones | |
Returns: | |
Tuple of (updated_collection, updated_history, is_new_tag, currency_earned) | |
""" | |
# Initialize return values | |
is_new_tag = False | |
currency_earned = 0 | |
timestamp = time.strftime("%H:%M:%S") | |
# Copy the collection to avoid modifying the original | |
updated_collection = collected_tags.copy() | |
updated_history = tag_history.copy() if tag_history else [] | |
# Check if this is a new tag | |
if tag not in updated_collection: | |
if is_update_only: | |
# Skip adding if this is update-only mode | |
return updated_collection, updated_history, False, 0 | |
# New tag discovery | |
is_new_tag = True | |
# Create new tag entry | |
updated_collection[tag] = { | |
"count": 1, | |
"rarity": rarity, | |
"category": category, | |
"discovery_time": timestamp, | |
"ever_sacrificed": False, # Track if this tag has ever been sacrificed | |
"is_unique": True # All tags are unique when first discovered | |
} | |
# Calculate currency based on rarity (you'll need access to RARITY_LEVELS here) | |
# For now we'll just use a placeholder | |
# currency_earned = RARITY_LEVELS.get(rarity, {}).get("value", 0) | |
# This would need to be implemented based on your game logic | |
# Add to history for new discoveries | |
updated_history.append({ | |
"tag": tag, | |
"rarity": rarity, | |
"value": currency_earned, | |
"time": timestamp, | |
"is_new": True | |
}) | |
else: | |
# Tag already exists, increment count | |
updated_collection[tag]["count"] += 1 | |
# If count was previously 0 (all were sacrificed) and now it's 1, | |
# we should update the is_unique flag | |
if updated_collection[tag]["count"] == 1 and updated_collection[tag].get("is_unique", False) == False: | |
updated_collection[tag]["is_unique"] = True | |
return updated_collection, updated_history, is_new_tag, currency_earned | |
def mark_tag_sacrificed( | |
collected_tags: Dict[str, Dict[str, Any]], | |
tag: str, | |
quantity: int = 1 | |
) -> Dict[str, Dict[str, Any]]: | |
""" | |
Mark a tag as sacrificed | |
Args: | |
collected_tags: Dictionary of collected tags | |
tag: The tag name to sacrifice | |
quantity: How many to sacrifice | |
Returns: | |
Updated collection dictionary | |
""" | |
if tag not in collected_tags: | |
return collected_tags | |
# Copy the collection to avoid modifying the original | |
updated_collection = collected_tags.copy() | |
# Mark as sacrificed at least once | |
updated_collection[tag]["ever_sacrificed"] = True | |
# Reduce the count | |
updated_collection[tag]["count"] = max(0, updated_collection[tag]["count"] - quantity) | |
# If count reaches 0, it's no longer unique in the active collection | |
if updated_collection[tag]["count"] == 0: | |
updated_collection[tag]["is_unique"] = False | |
return updated_collection | |
def save_game_state( | |
st_state, | |
filename: str = DEFAULT_GAME_STATE_FILE | |
) -> bool: | |
""" | |
Save the full game state to a JSON file | |
Args: | |
st_state: Streamlit session state | |
filename: Name of the file to save to | |
Returns: | |
Boolean indicating if the save was successful | |
""" | |
try: | |
filepath = get_storage_path(filename) | |
# Extract core game state from session state | |
game_data = { | |
"version": TAG_STORAGE_VERSION, | |
"timestamp": time.time(), | |
"save_date": time.strftime("%Y-%m-%d %H:%M:%S"), | |
"threshold": getattr(st_state, 'threshold', 0.50), | |
"tag_currency": getattr(st_state, 'tag_currency', 0), | |
"enkephalin": getattr(st_state, 'enkephalin', 0), | |
"purchased_upgrades": getattr(st_state, 'purchased_upgrades', []), | |
"achievements": list(getattr(st_state, 'achievements', set())), | |
"game_stats": getattr(st_state, 'game_stats', {}), | |
"tag_power_bonus": getattr(st_state, 'tag_power_bonus', 0), | |
"coin_multiplier": getattr(st_state, 'coin_multiplier', 1.0), | |
"unlocked_combinations": list(getattr(st_state, 'unlocked_combinations', set())), | |
"combination_bonuses": getattr(st_state, 'combination_bonuses', {"threshold_reduction": 0, "coin_bonus": 0}), | |
"sacrificed_tags": getattr(st_state, 'sacrificed_tags', {}) | |
} | |
# Add mastered categories if they exist | |
if hasattr(st_state, 'mastered_categories'): | |
if isinstance(st_state.mastered_categories, set): | |
game_data["mastered_categories"] = list(st_state.mastered_categories) | |
else: | |
game_data["mastered_categories"] = st_state.mastered_categories | |
# Save to file with pretty formatting | |
with open(filepath, 'w', encoding='utf-8') as f: | |
json.dump(game_data, f, indent=2) | |
print(f"Saved game state to {filepath}") | |
return True | |
except Exception as e: | |
print(f"Error saving game state: {str(e)}") | |
return False | |
def load_game_state( | |
st_state, | |
filename: str = DEFAULT_GAME_STATE_FILE | |
) -> bool: | |
""" | |
Load the game state from a JSON file and update session state | |
Args: | |
st_state: Streamlit session state to update | |
filename: Name of the file to load from | |
Returns: | |
Boolean indicating if the load was successful | |
""" | |
try: | |
filepath = get_storage_path(filename) | |
if not os.path.exists(filepath): | |
print(f"No game state file found at {filepath}") | |
return False | |
with open(filepath, 'r', encoding='utf-8') as f: | |
game_data = json.load(f) | |
# Update session state with loaded data | |
st_state.threshold = game_data.get("threshold", 0.50) | |
st_state.tag_currency = game_data.get("tag_currency", 0) | |
st_state.enkephalin = game_data.get("enkephalin", 0) | |
st_state.purchased_upgrades = game_data.get("purchased_upgrades", []) | |
st_state.achievements = set(game_data.get("achievements", [])) | |
st_state.game_stats = game_data.get("game_stats", {}) | |
st_state.tag_power_bonus = game_data.get("tag_power_bonus", 0) | |
st_state.coin_multiplier = game_data.get("coin_multiplier", 1.0) | |
st_state.unlocked_combinations = set(game_data.get("unlocked_combinations", [])) | |
st_state.combination_bonuses = game_data.get("combination_bonuses", {"threshold_reduction": 0, "coin_bonus": 0}) | |
st_state.sacrificed_tags = game_data.get("sacrificed_tags", {}) | |
# Load mastered categories if available | |
if "mastered_categories" in game_data: | |
if isinstance(game_data["mastered_categories"], list): | |
st_state.mastered_categories = set(game_data["mastered_categories"]) | |
else: | |
st_state.mastered_categories = game_data["mastered_categories"] | |
print(f"Loaded game state from {filepath}") | |
return True | |
except Exception as e: | |
print(f"Error loading game state: {str(e)}") | |
return False | |
def save_mosaic_data( | |
processed_tags: Set[str], | |
filled_cells: Set[Tuple[int, int]], | |
highlighted_tags: List[Tuple[str, int, int, str]], | |
mosaic_name: str = "main", | |
filename: str = DEFAULT_MOSAIC_DATA_FILE | |
) -> bool: | |
""" | |
Save mosaic-specific data to a JSON file | |
Args: | |
processed_tags: Set of tags that have been processed by the mosaic | |
filled_cells: Set of grid coordinates that have been filled | |
highlighted_tags: List of highlighted tags with their positions | |
mosaic_name: Name of the mosaic | |
filename: Name of the file to save to | |
Returns: | |
Boolean indicating if the save was successful | |
""" | |
try: | |
filepath = get_storage_path(filename) | |
# Load existing data if available | |
existing_data = {} | |
if os.path.exists(filepath): | |
with open(filepath, 'r', encoding='utf-8') as f: | |
existing_data = json.load(f) | |
# Convert sets to lists for JSON serialization | |
mosaic_data = { | |
"processed_tags": list(processed_tags), | |
"filled_cells": [list(cell) for cell in filled_cells], | |
"highlighted_tags": highlighted_tags, | |
"last_updated": time.strftime("%Y-%m-%d %H:%M:%S") | |
} | |
# Update just this mosaic's data | |
existing_data[mosaic_name] = mosaic_data | |
# Save to file | |
with open(filepath, 'w', encoding='utf-8') as f: | |
json.dump(existing_data, f, indent=2) | |
print(f"Saved mosaic data for {mosaic_name} with {len(processed_tags)} processed tags") | |
return True | |
except Exception as e: | |
print(f"Error saving mosaic data: {str(e)}") | |
return False | |
def load_mosaic_data( | |
mosaic_name: str = "main", | |
filename: str = DEFAULT_MOSAIC_DATA_FILE | |
) -> Tuple[Optional[Set[str]], Optional[Set[Tuple[int, int]]], Optional[List[Tuple[str, int, int, str]]]]: | |
""" | |
Load mosaic-specific data from a JSON file | |
Args: | |
mosaic_name: Name of the mosaic to load | |
filename: Name of the file to load from | |
Returns: | |
Tuple of (processed_tags, filled_cells, highlighted_tags) or (None, None, None) if loading fails | |
""" | |
try: | |
filepath = get_storage_path(filename) | |
if not os.path.exists(filepath): | |
print(f"No mosaic data file found at {filepath}") | |
return None, None, None | |
with open(filepath, 'r', encoding='utf-8') as f: | |
all_data = json.load(f) | |
# Get this specific mosaic's data | |
if mosaic_name not in all_data: | |
print(f"No data found for mosaic {mosaic_name}") | |
return None, None, None | |
mosaic_data = all_data[mosaic_name] | |
# Convert lists back to sets where needed | |
processed_tags = set(mosaic_data.get("processed_tags", [])) | |
filled_cells = {tuple(cell) for cell in mosaic_data.get("filled_cells", [])} | |
highlighted_tags = mosaic_data.get("highlighted_tags", []) | |
print(f"Loaded mosaic data for {mosaic_name} with {len(processed_tags)} processed tags") | |
return processed_tags, filled_cells, highlighted_tags | |
except Exception as e: | |
print(f"Error loading mosaic data: {str(e)}") | |
return None, None, None | |
def update_tag_storage_from_session(st_state) -> bool: | |
""" | |
Update the tag storage from the current session state | |
Args: | |
st_state: Streamlit session state object | |
Returns: | |
Boolean indicating if the update was successful | |
""" | |
try: | |
# Extract tag data from session state | |
collected_tags = getattr(st_state, 'collected_tags', {}) | |
tag_history = getattr(st_state, 'tag_history', []) | |
# Extract mosaic data if available | |
processed_tags = None | |
if hasattr(st_state, 'tag_mosaic') and hasattr(st_state.tag_mosaic, 'processed_tags'): | |
processed_tags = st_state.tag_mosaic.processed_tags | |
# Save the tag collection | |
tag_save_success = save_tag_collection(collected_tags, tag_history, processed_tags) | |
# Save the game state | |
game_save_success = save_game_state(st_state) | |
return tag_save_success and game_save_success | |
except Exception as e: | |
print(f"Error updating storage from session: {str(e)}") | |
return False | |
def update_session_from_tag_storage(st_state) -> bool: | |
""" | |
Update the session state from the tag storage | |
Args: | |
st_state: Streamlit session state object | |
Returns: | |
Boolean indicating if the update was successful | |
""" | |
try: | |
# Load tag data | |
collected_tags, tag_history, processed_tags = load_tag_collection() | |
if collected_tags is None: | |
# No tag data to load, check if we have game state data at least | |
if not os.path.exists(get_storage_path(DEFAULT_GAME_STATE_FILE)): | |
# No data at all | |
return False | |
else: | |
# Update tag-related session state | |
st_state.collected_tags = collected_tags | |
if tag_history is not None: | |
st_state.tag_history = tag_history | |
# Update mosaic data if available | |
if processed_tags is not None and hasattr(st_state, 'tag_mosaic'): | |
st_state.tag_mosaic.processed_tags = processed_tags | |
# Load game state (even if no tag data was found) | |
game_state_loaded = load_game_state(st_state) | |
return game_state_loaded | |
except Exception as e: | |
print(f"Error updating session from storage: {str(e)}") | |
return False | |
def ensure_tag_tracking_fields(st_state): | |
""" | |
Make sure all tags in the collection have the required tracking fields | |
Args: | |
st_state: Streamlit session state object | |
""" | |
if not hasattr(st_state, 'collected_tags') or not st_state.collected_tags: | |
return | |
for tag, info in st_state.collected_tags.items(): | |
# Add ever_sacrificed field if it doesn't exist | |
if "ever_sacrificed" not in info: | |
info["ever_sacrificed"] = False | |
# Add is_unique field if it doesn't exist | |
if "is_unique" not in info: | |
info["is_unique"] = info["count"] > 0 | |
def periodic_save(st_state, interval_minutes: int = 5) -> None: | |
""" | |
Periodically save tag data based on the last save time | |
Args: | |
st_state: Streamlit session state object | |
interval_minutes: How often to save (in minutes) | |
""" | |
# Initialize last save time if not exists | |
if not hasattr(st_state, 'last_tag_save_time'): | |
st_state.last_tag_save_time = time.time() | |
# Check if it's time to save again | |
current_time = time.time() | |
elapsed_minutes = (current_time - st_state.last_tag_save_time) / 60 | |
if elapsed_minutes >= interval_minutes: | |
if save_game(st_state): | |
# Update the last save time | |
st_state.last_tag_save_time = current_time | |
print(f"Performed periodic save after {elapsed_minutes:.1f} minutes") | |
# ========================= NEW FUNCTIONS BELOW =========================== | |
def save_library_state( | |
library_state: Optional[Dict[str, Any]] = None, | |
session_state = None, | |
filename: str = DEFAULT_LIBRARY_STATE_FILE | |
) -> bool: | |
""" | |
Save the library state to a JSON file | |
Args: | |
library_state: Library state data dictionary (optional) | |
session_state: Streamlit session state to extract from (optional) | |
filename: Name of the file to save to | |
Returns: | |
Boolean indicating if the save was successful | |
""" | |
try: | |
filepath = get_storage_path(filename) | |
# If library_state is not provided but session_state is, extract from session_state | |
if library_state is None and session_state is not None: | |
library_state = { | |
"discovered_tags": getattr(session_state, 'discovered_tags', {}), | |
"library_exploration_history": getattr(session_state, 'library_exploration_history', []), | |
"expedition_results": getattr(session_state, 'expedition_results', []), | |
"library_growth": getattr(session_state, 'library_growth', { | |
"total_discoveries": 0, | |
"last_discovery_time": time.time() | |
}), | |
"library_upgrades": getattr(session_state, 'library_upgrades', { | |
"speed": 1, | |
"capacity": 1, | |
"rarity": 1 | |
}), | |
"last_expedition_time": getattr(session_state, 'last_expedition_time', 0) | |
} | |
# Add explored library tiers if they exist | |
if hasattr(session_state, 'explored_library_tiers'): | |
if isinstance(session_state.explored_library_tiers, set): | |
library_state["explored_library_tiers"] = list(session_state.explored_library_tiers) | |
else: | |
library_state["explored_library_tiers"] = session_state.explored_library_tiers | |
if library_state is None: | |
print("No library state to save.") | |
return False | |
# Add metadata | |
library_state["version"] = TAG_STORAGE_VERSION | |
library_state["timestamp"] = time.time() | |
library_state["save_date"] = time.strftime("%Y-%m-%d %H:%M:%S") | |
# Ensure discovered_tags has proper metadata | |
if "discovered_tags" in library_state: | |
for tag, info in library_state["discovered_tags"].items(): | |
# Add category if missing | |
if "category" not in info: | |
info["category"] = "unknown" | |
# Add discovery_count if missing | |
if "discovery_count" not in info: | |
info["discovery_count"] = 1 | |
# Save to file | |
with open(filepath, 'w', encoding='utf-8') as f: | |
json.dump(library_state, f, indent=2, ensure_ascii=False) | |
print(f"Saved library state to {filepath}") | |
return True | |
except Exception as e: | |
print(f"Error saving library state: {str(e)}") | |
return False | |
def load_library_state( | |
session_state = None, | |
filename: str = DEFAULT_LIBRARY_STATE_FILE | |
) -> Dict[str, Any]: | |
""" | |
Load the library state from a JSON file | |
Args: | |
session_state: Streamlit session state to update (optional) | |
filename: Name of the file to load from | |
Returns: | |
Dictionary of loaded library state or None if loading fails | |
""" | |
try: | |
filepath = get_storage_path(filename) | |
if not os.path.exists(filepath): | |
print(f"No library state file found at {filepath}") | |
return None | |
with open(filepath, 'r', encoding='utf-8') as f: | |
library_state = json.load(f) | |
# Update session state if provided | |
if session_state is not None: | |
# Update discovered tags | |
if "discovered_tags" in library_state: | |
session_state.discovered_tags = library_state["discovered_tags"] | |
# Update library exploration history | |
if "library_exploration_history" in library_state: | |
session_state.library_exploration_history = library_state["library_exploration_history"] | |
# Update expedition results | |
if "expedition_results" in library_state: | |
session_state.expedition_results = library_state["expedition_results"] | |
# Update library growth | |
if "library_growth" in library_state: | |
session_state.library_growth = library_state["library_growth"] | |
# Update library upgrades | |
if "library_upgrades" in library_state: | |
session_state.library_upgrades = library_state["library_upgrades"] | |
# Update last expedition time | |
if "last_expedition_time" in library_state: | |
session_state.last_expedition_time = library_state["last_expedition_time"] | |
# Update explored library tiers | |
if "explored_library_tiers" in library_state: | |
if isinstance(library_state["explored_library_tiers"], list): | |
session_state.explored_library_tiers = set(library_state["explored_library_tiers"]) | |
else: | |
session_state.explored_library_tiers = library_state["explored_library_tiers"] | |
print(f"Loaded library state from {filepath}") | |
return library_state | |
except Exception as e: | |
print(f"Error loading library state: {str(e)}") | |
return None | |
def add_discovered_tag( | |
tag: str, | |
rarity: str, | |
session_state, | |
library_floor: str = None, | |
category: str = "unknown" | |
) -> bool: | |
""" | |
Add a tag to the discovered tags with enriched metadata | |
Args: | |
tag: The tag name | |
rarity: The tag rarity level | |
session_state: Streamlit session state | |
library_floor: The library floor where it was discovered (optional) | |
category: The tag category (optional) | |
Returns: | |
Boolean indicating if it's a new discovery | |
""" | |
# Initialize discovered_tags if not present | |
if not hasattr(session_state, 'discovered_tags'): | |
session_state.discovered_tags = {} | |
# Initialize explored_library_tiers if not present | |
if not hasattr(session_state, 'explored_library_tiers'): | |
session_state.explored_library_tiers = set() | |
# Check if this is a new discovery | |
is_new = tag not in session_state.discovered_tags | |
# Get current time | |
timestamp = time.strftime("%Y-%m-%d %H:%M:%S") | |
# Try to get tag category from metadata | |
if category == "unknown": | |
# First check metadata.tag_to_category | |
if hasattr(session_state, 'metadata') and 'tag_to_category' in session_state.metadata: | |
if tag in session_state.metadata['tag_to_category']: | |
category = session_state.metadata['tag_to_category'][tag] | |
# If not found, check tag_rarity_metadata | |
elif hasattr(session_state, 'tag_rarity_metadata') and session_state.tag_rarity_metadata: | |
if tag in session_state.tag_rarity_metadata: | |
tag_info = session_state.tag_rarity_metadata[tag] | |
if isinstance(tag_info, dict) and "category" in tag_info: | |
category = tag_info["category"] | |
# Get or update tag info | |
if is_new: | |
# Create new tag entry with rich metadata | |
session_state.discovered_tags[tag] = { | |
"rarity": rarity, | |
"discovery_time": timestamp, | |
"category": category, | |
"discovery_count": 1, | |
"last_seen": timestamp | |
} | |
# Add library floor if provided | |
if library_floor: | |
session_state.discovered_tags[tag]["library_floor"] = library_floor | |
# Track exploration of library tiers | |
session_state.explored_library_tiers.add(library_floor) | |
else: | |
# Update existing tag | |
tag_info = session_state.discovered_tags[tag] | |
tag_info["discovery_count"] = tag_info.get("discovery_count", 1) + 1 | |
tag_info["last_seen"] = timestamp | |
# Update category if we found a better one and current is "unknown" | |
if category != "unknown" and tag_info.get("category", "unknown") == "unknown": | |
tag_info["category"] = category | |
# Only update library floor if provided | |
if library_floor: | |
tag_info["library_floor"] = library_floor | |
# Track exploration | |
session_state.explored_library_tiers.add(library_floor) | |
# Save library state after updating | |
save_library_state(session_state=session_state) | |
return is_new | |
def purchase_library_upgrade( | |
upgrade_type: str, | |
session_state, | |
calculate_cost_func = None | |
) -> bool: | |
""" | |
Purchase a library upgrade | |
Args: | |
upgrade_type: The type of upgrade ("speed", "capacity", or "rarity") | |
session_state: Streamlit session state | |
calculate_cost_func: Function to calculate the cost (optional) | |
Returns: | |
Boolean indicating if the purchase was successful | |
""" | |
# Initialize library_upgrades if not present | |
if not hasattr(session_state, 'library_upgrades'): | |
session_state.library_upgrades = { | |
"speed": 1, | |
"capacity": 1, | |
"rarity": 1 | |
} | |
# Get current level | |
current_level = session_state.library_upgrades.get(upgrade_type, 1) | |
# Calculate cost using provided function or default formula | |
if calculate_cost_func: | |
cost = calculate_cost_func(upgrade_type, current_level) | |
else: | |
# Default cost calculation | |
base_cost = { | |
"speed": 50, | |
"capacity": 100, | |
"rarity": 150 | |
}.get(upgrade_type, 100) | |
cost = int(base_cost * (current_level * 1.5)) | |
# Check if player can afford it | |
if not hasattr(session_state, 'tag_currency') or session_state.tag_currency < cost: | |
print(f"Cannot afford {upgrade_type} upgrade: needs {cost}, has {getattr(session_state, 'tag_currency', 0)}") | |
return False | |
# Apply purchase | |
session_state.tag_currency -= cost | |
session_state.library_upgrades[upgrade_type] = current_level + 1 | |
# Update stats | |
if hasattr(session_state, 'game_stats'): | |
if "currency_spent" not in session_state.game_stats: | |
session_state.game_stats["currency_spent"] = 0 | |
session_state.game_stats["currency_spent"] += cost | |
# Save library state after purchasing | |
save_library_state(session_state=session_state) | |
save_game_state(session_state) | |
print(f"Purchased {upgrade_type} upgrade to level {current_level + 1} for {cost}") | |
return True | |
def save_essence_state( | |
essence_state: Optional[Dict[str, Any]] = None, | |
session_state = None, | |
filename: str = DEFAULT_ESSENCE_STATE_FILE | |
) -> bool: | |
""" | |
Save the essence generator state to a JSON file | |
Args: | |
essence_state: Essence state data dictionary (optional) | |
session_state: Streamlit session state to extract from (optional) | |
filename: Name of the file to save to | |
Returns: | |
Boolean indicating if the save was successful | |
""" | |
try: | |
filepath = get_storage_path(filename) | |
# If essence_state is not provided but session_state is, extract from session_state | |
if essence_state is None and session_state is not None: | |
essence_state = { | |
"generated_essences": getattr(session_state, 'generated_essences', {}), | |
"essence_custom_settings": getattr(session_state, 'essence_custom_settings', {}), | |
"manual_tags": getattr(session_state, 'manual_tags', {}) | |
} | |
if essence_state is None: | |
print("No essence state to save.") | |
return False | |
# Add metadata | |
essence_state["version"] = TAG_STORAGE_VERSION | |
essence_state["timestamp"] = time.time() | |
essence_state["save_date"] = time.strftime("%Y-%m-%d %H:%M:%S") | |
# Remove paths from generated_essences to keep only metadata | |
if "generated_essences" in essence_state: | |
for tag, info in essence_state["generated_essences"].items(): | |
if "path" in info: | |
# Store just the filename, not the full path | |
info["path"] = os.path.basename(info["path"]) | |
# Save to file | |
with open(filepath, 'w', encoding='utf-8') as f: | |
json.dump(essence_state, f, indent=2, ensure_ascii=False) | |
print(f"Saved essence state to {filepath}") | |
return True | |
except Exception as e: | |
print(f"Error saving essence state: {str(e)}") | |
return False | |
def load_essence_state( | |
session_state = None, | |
filename: str = DEFAULT_ESSENCE_STATE_FILE | |
) -> Dict[str, Any]: | |
""" | |
Load the essence generator state from a JSON file | |
Args: | |
session_state: Streamlit session state to update (optional) | |
filename: Name of the file to load from | |
Returns: | |
Dictionary of loaded essence state or None if loading fails | |
""" | |
try: | |
filepath = get_storage_path(filename) | |
if not os.path.exists(filepath): | |
print(f"No essence state file found at {filepath}") | |
return None | |
with open(filepath, 'r', encoding='utf-8') as f: | |
essence_state = json.load(f) | |
# Update session state if provided | |
if session_state is not None: | |
# Update generated essences | |
if "generated_essences" in essence_state: | |
generated_essences = essence_state["generated_essences"] | |
# Restore full paths for essence images | |
essence_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "game_data", "essences") | |
for tag, info in generated_essences.items(): | |
if "path" in info and not os.path.isabs(info["path"]): | |
info["path"] = os.path.join(essence_dir, os.path.basename(info["path"])) | |
session_state.generated_essences = generated_essences | |
# Update essence custom settings | |
if "essence_custom_settings" in essence_state: | |
session_state.essence_custom_settings = essence_state["essence_custom_settings"] | |
# Update manual tags | |
if "manual_tags" in essence_state: | |
session_state.manual_tags = essence_state["manual_tags"] | |
print(f"Loaded essence state from {filepath}") | |
return essence_state | |
except Exception as e: | |
print(f"Error loading essence state: {str(e)}") | |
return None | |
def save_game(st_state) -> bool: | |
""" | |
Save all game data including tags, game state, library state, and essence state | |
Args: | |
st_state: Streamlit session state object | |
Returns: | |
Boolean indicating if the save was successful | |
""" | |
try: | |
# First, make sure any existing tags have the new fields for tracking | |
ensure_tag_tracking_fields(st_state) | |
# Save all components | |
tag_success = update_tag_storage_from_session(st_state) | |
library_success = save_library_state(session_state=st_state) | |
essence_success = save_essence_state(session_state=st_state) | |
# Log results | |
if tag_success and library_success and essence_success: | |
print("Successfully saved all game components") | |
else: | |
print(f"Partial save - Tags: {tag_success}, Library: {library_success}, Essence: {essence_success}") | |
return tag_success | |
except Exception as e: | |
print(f"Error in full game save: {str(e)}") | |
return False | |
def load_game(st_state) -> bool: | |
""" | |
Load all game data including tags, game state, library state, and essence state | |
Args: | |
st_state: Streamlit session state object | |
Returns: | |
Boolean indicating if the load was successful | |
""" | |
try: | |
# Load all components | |
tag_success = update_session_from_tag_storage(st_state) | |
# Try to load library and essence data even if tag load failed | |
library_success = load_library_state(st_state) is not None | |
essence_success = load_essence_state(st_state) is not None | |
# Ensure tag tracking fields exist after loading | |
if tag_success: | |
ensure_tag_tracking_fields(st_state) | |
# Log results | |
if tag_success and library_success and essence_success: | |
print("Successfully loaded all game components") | |
else: | |
print(f"Partial load - Tags: {tag_success}, Library: {library_success}, Essence: {essence_success}") | |
return tag_success or library_success or essence_success | |
except Exception as e: | |
print(f"Error in full game load: {str(e)}") | |
return False | |
# Automatic save when imported (useful for debugging) | |
if __name__ == "__main__": | |
print("Enhanced tag storage module loaded. Run directly for testing.") |