""" Chat Storage System for Alice ============================ Handles saving and managing chat sessions with incremental saving, session management, and organized file structure. Now includes HuggingFace dataset sync for persistent cloud storage. """ import os import json import uuid from datetime import datetime from typing import Dict, List, Tuple, Optional import traceback from huggingface_hub import HfApi, upload_file, create_repo, repo_exists, hf_hub_download from huggingface_hub.utils import RepositoryNotFoundError class HuggingFaceChatSync: """Handles synchronization of chat files with HuggingFace dataset repository.""" def __init__(self, org_name: str, dataset_name: str, hf_token: str = None): """ Initialize HuggingFace sync. Args: org_name: HuggingFace organization name dataset_name: Dataset repository name hf_token: HuggingFace token (or from environment) """ self.hf_token = hf_token or os.getenv("HF_TOKEN") self.org_name = org_name self.dataset_name = dataset_name self.repo_id = f"{org_name}/{dataset_name}" self.api = HfApi(token=self.hf_token) if self.hf_token else None self.enabled = bool(self.hf_token) if self.enabled: self._ensure_dataset_exists() else: print("⚠️ HuggingFace sync disabled: No HF_TOKEN found") def _ensure_dataset_exists(self): """Create the dataset repository if it doesn't exist.""" if not self.enabled: return try: # Check if repository exists if not repo_exists(self.repo_id, repo_type="dataset", token=self.hf_token): print(f"📁 Creating HuggingFace dataset: {self.repo_id}") create_repo( repo_id=self.repo_id, repo_type="dataset", token=self.hf_token, private=False, # Set to True if you want private dataset exist_ok=True ) print(f"✅ Dataset repository created: https://huggingface.co/datasets/{self.repo_id}") else: print(f"✅ Using existing HuggingFace dataset: {self.repo_id}") except Exception as e: print(f"❌ Error setting up HuggingFace dataset: {e}") self.enabled = False def sync_chat_file(self, local_filepath: str, filename: str) -> bool: """ Sync a chat file to HuggingFace dataset. Args: local_filepath: Path to the local chat file filename: Name for the file in the dataset Returns: bool: True if successful, False otherwise """ if not self.enabled: return False try: # Upload file to dataset repository upload_file( path_or_fileobj=local_filepath, path_in_repo=f"chats/{filename}", repo_id=self.repo_id, repo_type="dataset", token=self.hf_token, commit_message=f"Update chat: {filename}" ) print(f"☁️ Synced to HuggingFace: {filename}") return True except Exception as e: print(f"❌ HuggingFace sync failed for {filename}: {e}") return False def list_remote_chats(self) -> List[str]: """List all chat files in the HuggingFace dataset.""" if not self.enabled: return [] try: repo_files = self.api.list_repo_files( repo_id=self.repo_id, repo_type="dataset" ) # Filter for chat files chat_files = [f for f in repo_files if f.startswith("chats/") and f.endswith(".txt")] return [f.replace("chats/", "") for f in chat_files] except Exception as e: print(f"❌ Error listing remote chats: {e}") return [] class ChatStorage: """Manages chat session storage with incremental saving and session management.""" def __init__(self, base_dir: str = None, enable_hf_sync: bool = True): """ Initialize chat storage system. Args: base_dir: Base directory for storing chat files enable_hf_sync: Whether to enable HuggingFace dataset sync """ # Try multiple directories in order of preference if base_dir is None: # Try persistent storage first, then fallback to temporary possible_dirs = ["/data/chats", "/tmp/chats", "./chats"] base_dir = self._find_writable_directory(possible_dirs) self.base_dir = base_dir self.current_session_id = None self.current_chat_file = None self.ensure_directory_structure() # Initialize HuggingFace sync self.hf_sync = None if enable_hf_sync: try: # Get HF token from environment variable only hf_token = os.getenv("HF_TOKEN") if hf_token: self.hf_sync = HuggingFaceChatSync( org_name="alice-restoration-project", dataset_name="alice-chat-storage", hf_token=hf_token ) print("🤗 HuggingFace dataset sync enabled") else: print("⚠️ HF_TOKEN environment variable not found - HuggingFace sync disabled") except Exception as e: print(f"⚠️ HuggingFace sync initialization failed: {e}") self.hf_sync = None def _find_writable_directory(self, possible_dirs): """Find the first writable directory from a list of options.""" for dir_path in possible_dirs: try: # Try to create the directory os.makedirs(dir_path, exist_ok=True) # Test if we can write to it test_file = os.path.join(dir_path, "write_test.tmp") with open(test_file, 'w') as f: f.write("test") os.remove(test_file) print(f"✅ Using writable directory: {dir_path}") return dir_path except (PermissionError, OSError, FileNotFoundError) as e: print(f"⚠️ Cannot use {dir_path}: {e}") continue # If all fail, use current directory as last resort fallback = "./chats" print(f"🔄 Falling back to: {fallback}") return fallback def ensure_directory_structure(self): """Create the chats directory structure if it doesn't exist.""" try: os.makedirs(self.base_dir, exist_ok=True) print(f"✅ Chat storage directory ready: {self.base_dir}") except Exception as e: print(f"❌ Error creating chat storage directory: {e}") def start_new_session(self, initial_settings: Optional[Dict] = None) -> str: """ Start a new chat session with unique ID. Args: initial_settings: Optional dictionary of initial settings to save Returns: session_id: Unique session identifier """ try: # Generate unique session ID self.current_session_id = str(uuid.uuid4())[:8] # Create filename with timestamp and session ID (now .txt format) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"chat_{timestamp}_{self.current_session_id}.txt" self.current_chat_file = os.path.join(self.base_dir, filename) # Initialize chat file with header header = f"""Chat Session: {self.current_session_id} Started: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Settings: {json.dumps(initial_settings or {}, indent=2) if initial_settings else "Default"} === CONVERSATION === """ # Save initial session file with open(self.current_chat_file, 'w', encoding='utf-8') as f: f.write(header) print(f"🆕 New chat session started: {self.current_session_id}") print(f"📁 Chat file: {self.current_chat_file}") # Sync initial session to HuggingFace dataset if enabled if self.hf_sync and self.hf_sync.enabled: filename = os.path.basename(self.current_chat_file) self.hf_sync.sync_chat_file(self.current_chat_file, filename) return self.current_session_id except Exception as e: print(f"❌ Error starting new session: {e}") traceback.print_exc() return None def save_turn(self, user_message: str, alice_response: str, additional_data: Optional[Dict] = None) -> bool: """ Save a single conversation turn (user message + Alice response). Args: user_message: The user's message alice_response: Alice's response additional_data: Optional additional data to save with this turn Returns: bool: True if saved successfully, False otherwise """ if not self.current_session_id or not self.current_chat_file: print("❌ No active session. Call start_new_session() first.") return False try: # Count current turns by reading the file turn_number = self._count_turns() + 1 # Format the conversation turn in User/Alice format turn_text = f"User: {user_message}\nAlice: {alice_response}\n\n" # Append to the chat file with open(self.current_chat_file, 'a', encoding='utf-8') as f: f.write(turn_text) print(f"💾 Turn {turn_number} saved to session {self.current_session_id}") # Sync to HuggingFace dataset if enabled if self.hf_sync and self.hf_sync.enabled: filename = os.path.basename(self.current_chat_file) self.hf_sync.sync_chat_file(self.current_chat_file, filename) return True except Exception as e: print(f"❌ Error saving turn: {e}") traceback.print_exc() return False def _count_turns(self) -> int: """Count the number of conversation turns in the current chat file.""" if not self.current_chat_file or not os.path.exists(self.current_chat_file): return 0 try: with open(self.current_chat_file, 'r', encoding='utf-8') as f: content = f.read() # Count occurrences of "User:" which indicates the start of each turn return content.count("User:") except Exception as e: print(f"❌ Error counting turns: {e}") return 0 def get_session_info(self) -> Optional[Dict]: """Get information about the current session.""" if not self.current_session_id or not self.current_chat_file: return None try: # Extract creation time from filename filename = os.path.basename(self.current_chat_file) timestamp_part = filename.split('_')[1] + '_' + filename.split('_')[2] created_at = datetime.strptime(timestamp_part, "%Y%m%d_%H%M%S").isoformat() # Get file modification time as last updated last_updated = datetime.fromtimestamp(os.path.getmtime(self.current_chat_file)).isoformat() # Count turns total_turns = self._count_turns() return { "session_id": self.current_session_id, "chat_file": self.current_chat_file, "created_at": created_at, "last_updated": last_updated, "total_turns": total_turns, "settings": {} # Settings are in the file header but not easily parsed } except Exception as e: print(f"❌ Error getting session info: {e}") return None def list_chat_files(self) -> List[Dict]: """ List all chat files in the storage directory. Returns: List of dictionaries with chat file information """ chat_files = [] try: if not os.path.exists(self.base_dir): return chat_files for filename in os.listdir(self.base_dir): if filename.startswith("chat_") and filename.endswith(".txt"): filepath = os.path.join(self.base_dir, filename) try: # Extract session ID from filename session_id = filename.split('_')[-1].replace('.txt', '') # Extract creation time from filename timestamp_part = filename.split('_')[1] + '_' + filename.split('_')[2] created_at = datetime.strptime(timestamp_part, "%Y%m%d_%H%M%S").isoformat() # Get file modification time as last updated last_updated = datetime.fromtimestamp(os.path.getmtime(filepath)).isoformat() # Count turns in the file with open(filepath, 'r', encoding='utf-8') as f: content = f.read() total_turns = content.count("User:") chat_files.append({ "filename": filename, "filepath": filepath, "session_id": session_id, "created_at": created_at, "last_updated": last_updated, "total_turns": total_turns }) except Exception as e: print(f"⚠️ Error reading chat file {filename}: {e}") # Sort by creation time, newest first chat_files.sort(key=lambda x: x.get("created_at", ""), reverse=True) except Exception as e: print(f"❌ Error listing chat files: {e}") return chat_files def load_chat_session(self, session_id: str) -> Optional[str]: """ Load a specific chat session by session ID. Args: session_id: The session ID to load Returns: Chat content as string or None if not found """ try: chat_files = self.list_chat_files() for chat_file in chat_files: if chat_file["session_id"] == session_id: with open(chat_file["filepath"], 'r', encoding='utf-8') as f: return f.read() print(f"❌ Chat session {session_id} not found") return None except Exception as e: print(f"❌ Error loading chat session {session_id}: {e}") return None def restore_from_huggingface(self) -> bool: """ Restore chat files from HuggingFace dataset when local storage is empty. Useful for recovering after Space restarts. Returns: bool: True if any files were restored, False otherwise """ if not self.hf_sync or not self.hf_sync.enabled: print("⚠️ HuggingFace sync not available for restore") return False try: # Check if local storage is empty or needs restoration local_files = self.list_chat_files() remote_files = self.hf_sync.list_remote_chats() if not remote_files: print("📂 No chat files found in HuggingFace dataset") return False restored_count = 0 for remote_filename in remote_files: # Check if file already exists locally local_exists = any(f["filename"] == remote_filename for f in local_files) if not local_exists: try: # Download file from HuggingFace downloaded_path = hf_hub_download( repo_id=self.hf_sync.repo_id, filename=f"chats/{remote_filename}", repo_type="dataset", token=self.hf_sync.hf_token, local_dir=self.base_dir, local_dir_use_symlinks=False ) # Move file to correct location local_filepath = os.path.join(self.base_dir, remote_filename) if downloaded_path != local_filepath: import shutil shutil.move(downloaded_path, local_filepath) print(f"📥 Restored from HuggingFace: {remote_filename}") restored_count += 1 except Exception as e: print(f"❌ Failed to restore {remote_filename}: {e}") if restored_count > 0: print(f"✅ Restored {restored_count} chat files from HuggingFace") return True else: print("📂 All chat files already exist locally") return False except Exception as e: print(f"❌ Error during HuggingFace restore: {e}") return False # Global chat storage instance chat_storage = ChatStorage() def initialize_chat_storage(): """Initialize the chat storage system.""" print("🗄️ Initializing chat storage system...") # Try to restore from HuggingFace if local storage is empty local_files = chat_storage.list_chat_files() if not local_files: print("📂 No local chat files found, attempting HuggingFace restore...") chat_storage.restore_from_huggingface() # Create initial session from config import MODEL_SETTINGS, CONVERSATION_SETTINGS initial_settings = { "model_settings": MODEL_SETTINGS, "conversation_settings": CONVERSATION_SETTINGS } session_id = chat_storage.start_new_session(initial_settings) if session_id: print(f"✅ Chat storage initialized with session: {session_id}") return True else: print("❌ Failed to initialize chat storage") return False def save_chat_turn(user_message: str, alice_response: str, additional_data: Optional[Dict] = None) -> bool: """ Convenience function to save a chat turn. Args: user_message: The user's message alice_response: Alice's response additional_data: Optional additional data Returns: bool: True if saved successfully """ return chat_storage.save_turn(user_message, alice_response, additional_data) def get_current_session_info() -> Optional[Dict]: """Get information about the current chat session.""" return chat_storage.get_session_info() def list_all_chats() -> List[Dict]: """List all saved chat sessions.""" return chat_storage.list_chat_files() def restore_chats_from_huggingface() -> bool: """Convenience function to restore chats from HuggingFace dataset.""" return chat_storage.restore_from_huggingface()