import gradio as gr import pandas as pd import threading from datetime import datetime import os import json import sqlite3 import time from dotenv import load_dotenv DEMO_MODE = os.getenv("DEMO_MODE", "False").lower() == 'true' load_dotenv() try: from datasets import load_dataset, Dataset, DatasetDict, Features, Value HF_DATASETS_AVAILABLE = True except ImportError: HF_DATASETS_AVAILABLE = False Features, Value = None, None STORAGE_BACKEND_CONFIG = os.getenv("STORAGE_BACKEND", "JSON").upper() HF_DATASET_REPO = os.getenv("HF_DATASET_REPO") HF_TOKEN = os.getenv("HF_TOKEN") DB_FILE_JSON = "social_data_unified.json" # Changed filename to avoid conflicts DB_FILE_SQLITE = "social_data_unified.db" # Changed filename db_lock = threading.Lock() HF_BACKUP_THRESHOLD = int(os.getenv("HF_BACKUP_THRESHOLD", 10)) dirty_operations_count = 0 # --- New Global Data Structure --- users_db = {} entries_df = pd.DataFrame() post_id_counter = 0 # Single counter for all entries # Define the schema for the unified entries table ENTRY_SCHEMA = { "post_id": "Int64", # Use nullable integer "reply_to_id": "Int64", # Use nullable integer, None for top-level posts "username": "object", "content": "object", "timestamp": "object", "type": "object" # 'post' or 'comment' } def force_persist_data(): global dirty_operations_count with db_lock: storage_backend = STORAGE_BACKEND_CONFIG print(f"Attempting to persist data to {storage_backend}") if storage_backend == "RAM": print("RAM backend. No persistence.") return True, "RAM backend. No persistence." elif storage_backend == "SQLITE": try: with sqlite3.connect(DB_FILE_SQLITE) as conn: cursor = conn.cursor() # Users table cursor.execute("CREATE TABLE IF NOT EXISTS users (username TEXT PRIMARY KEY, password TEXT NOT NULL)") # Entries table - new schema cursor.execute("CREATE TABLE IF NOT EXISTS entries (post_id INTEGER PRIMARY KEY, reply_to_id INTEGER, username TEXT, content TEXT, timestamp TEXT, type TEXT)") # Save users users_to_save = [(u, p) for u, p in users_db.items()] if users_to_save: # Avoid executing with empty list conn.executemany("INSERT OR REPLACE INTO users (username, password) VALUES (?, ?)", users_to_save) # Save entries (replace existing data) # Ensure Int64 columns are correctly handled as nullable integers for SQL entries_to_save = entries_df.copy() entries_to_save['reply_to_id'] = entries_to_save['reply_to_id'].astype('object').where(entries_to_save['reply_to_id'].notna(), None) entries_to_save.to_sql('entries', conn, if_exists='replace', index=False) conn.commit() print("Successfully saved to SQLite.") return True, "Successfully saved to SQLite." except Exception as e: print(f"Error saving to SQLite: {e}") return False, f"Error saving to SQLite: {e}" elif storage_backend == "JSON": try: data_to_save = { "users": users_db, "entries": entries_df.to_dict('records') } with open(DB_FILE_JSON, "w") as f: json.dump(data_to_save, f, indent=2) print("Successfully saved to JSON file.") return True, "Successfully saved to JSON file." except Exception as e: print(f"Error saving to JSON: {e}") return False, f"Error saving to JSON: {e}" elif storage_backend == "HF_DATASET": if not all([HF_DATASETS_AVAILABLE, HF_TOKEN, HF_DATASET_REPO]): print("HF_DATASET backend is not configured correctly.") return False, "HF_DATASET backend is not configured correctly." try: print("Pushing data to Hugging Face Hub...") # Convert nullable Int64 columns to standard int/float for dataset entries_for_hf = entries_df.copy() # Hugging Face datasets typically handle None/null correctly for integer types # Ensure type hints are correct or handle potential type issues entries_for_hf['post_id'] = entries_for_hf['post_id'].astype('int64') # Non-nullable ID entries_for_hf['reply_to_id'] = entries_for_hf['reply_to_id'].astype('float64') # Use float for nullable integer in HF datasets user_dataset = Dataset.from_pandas(pd.DataFrame(list(users_db.items()), columns=['username', 'password'])) entries_dataset = Dataset.from_pandas(entries_for_hf) dataset_dict = DatasetDict({ 'users': user_dataset, 'entries': entries_dataset, }) # Define features explicitly for nullable types if needed, though pandas conversion often works # user_features = Features({'username': Value('string'), 'password': Value('string')}) # entry_features = Features({'post_id': Value('int64'), 'reply_to_id': Value('int64'), 'username': Value('string'), 'content': Value('string'), 'timestamp': Value('string'), 'type': Value('string')}) # Pass features to from_pandas or push_to_hub if needed, but auto-detection is often sufficient for basic types dataset_dict.push_to_hub(HF_DATASET_REPO, token=HF_TOKEN, private=True) dirty_operations_count = 0 print(f"Successfully pushed data to {HF_DATASET_REPO}.") return True, f"Successfully pushed data to {HF_DATASET_REPO}." except Exception as e: print(f"Error pushing to Hugging Face Hub: {e}") return False, f"Error pushing to Hugging Face Hub: {e}" print("Unknown backend.") return False, "Unknown backend." def handle_persistence_after_change(): global dirty_operations_count storage_backend = STORAGE_BACKEND_CONFIG if storage_backend in ["JSON", "SQLITE"]: force_persist_data() elif storage_backend == "HF_DATASET": with db_lock: dirty_operations_count += 1 if dirty_operations_count >= HF_BACKUP_THRESHOLD: force_persist_data() def load_data(): global STORAGE_BACKEND_CONFIG, users_db, entries_df, post_id_counter storage_backend = STORAGE_BACKEND_CONFIG with db_lock: users = {"admin": "password"} # Initialize entries DataFrame with the correct schema entries = pd.DataFrame({k: pd.Series(dtype=v) for k, v in ENTRY_SCHEMA.items()}) if storage_backend == "SQLITE": try: with sqlite3.connect(DB_FILE_SQLITE) as conn: cursor = conn.cursor() # Create tables if they don't exist cursor.execute("CREATE TABLE IF NOT EXISTS users (username TEXT PRIMARY KEY, password TEXT NOT NULL)") cursor.execute("CREATE TABLE IF NOT EXISTS entries (post_id INTEGER PRIMARY KEY, reply_to_id INTEGER, username TEXT, content TEXT, timestamp TEXT, type TEXT)") # Add default admin user if not exists cursor.execute("INSERT OR IGNORE INTO users (username, password) VALUES (?, ?)", ("admin", "password")) conn.commit() # Load data users = dict(conn.execute("SELECT username, password FROM users").fetchall()) entries = pd.read_sql_query("SELECT * FROM entries", conn) # Ensure correct dtypes, especially for nullable integers for col, dtype in ENTRY_SCHEMA.items(): if col in entries.columns: try: entries[col] = entries[col].astype(dtype) except Exception as e: print(f"Warning: Could not convert column {col} to {dtype} from SQLite. {e}") print(f"Successfully loaded data from SQLite: {DB_FILE_SQLITE}") except Exception as e: print(f"CRITICAL: Failed to use SQLite. Falling back to RAM. Error: {e}") STORAGE_BACKEND_CONFIG = "RAM" elif storage_backend == "JSON": if os.path.exists(DB_FILE_JSON): try: with open(DB_FILE_JSON, "r") as f: data = json.load(f) users = data.get("users", users) loaded_entries_list = data.get("entries", []) entries = pd.DataFrame(loaded_entries_list) # Ensure correct dtypes after loading from JSON if not entries.empty: for col, dtype in ENTRY_SCHEMA.items(): if col in entries.columns: try: entries[col] = entries[col].astype(dtype) except Exception as e: print(f"Warning: Could not convert column {col} to {dtype} from JSON. {e}") else: # If JSON was empty or missing entries key, ensure empty DF has schema entries = pd.DataFrame({k: pd.Series(dtype=v) for k, v in ENTRY_SCHEMA.items()}) except (json.JSONDecodeError, KeyError, Exception) as e: print(f"Error loading JSON data: {e}. Initializing with empty data.") users = {"admin":"password"} # Reset users on load error? Or keep default? Let's keep default. entries = pd.DataFrame({k: pd.Series(dtype=v) for k, v in ENTRY_SCHEMA.items()}) elif storage_backend == "HF_DATASET": if all([HF_DATASETS_AVAILABLE, HF_TOKEN, HF_DATASET_REPO]): try: print(f"Attempting to load from HF Dataset '{HF_DATASET_REPO}'...") ds_dict = load_dataset(HF_DATASET_REPO, token=HF_TOKEN, trust_remote_code=True) if ds_dict and 'users' in ds_dict and 'entries' in ds_dict: # Load users if ds_dict['users'].num_rows > 0: users = dict(zip(ds_dict['users']['username'], ds_dict['users']['password'])) else: users = {"admin":"password"} # Default admin if no users # Load entries entries = ds_dict['entries'].to_pandas() # Ensure correct dtypes, especially for nullable integers if not entries.empty: for col, dtype in ENTRY_SCHEMA.items(): if col in entries.columns: try: # HF datasets might load Int64 as float or object, convert explicitly if dtype == "Int64": # Pandas nullable integer entries[col] = pd.to_numeric(entries[col], errors='coerce').astype(dtype) else: entries[col] = entries[col].astype(dtype) except Exception as e: print(f"Warning: Could not convert column {col} to {dtype} from HF Dataset. {e}") else: # If entries dataset is empty, ensure empty DF has schema entries = pd.DataFrame({k: pd.Series(dtype=v) for k, v in ENTRY_SCHEMA.items()}) print("Successfully loaded data from HF Dataset.") else: raise ValueError("Dataset dictionary is empty or malformed (missing 'users' or 'entries').") except Exception as e: print(f"Could not load from HF Dataset '{HF_DATASET_REPO}'. Attempting to initialize. Error: {e}") try: # Define features including nullable types if possible, or rely on pandas conversion user_features = Features({'username': Value('string'), 'password': Value('string')}) # Use float64 for nullable int in HF Features as a common workaround entry_features = Features({ 'post_id': Value('int64'), 'reply_to_id': Value('float64'), # HF datasets often use float for nullable int 'username': Value('string'), 'content': Value('string'), 'timestamp': Value('string'), 'type': Value('string') }) initial_users_df = pd.DataFrame(list(users.items()), columns=['username', 'password']) # Ensure initial empty entries DF conforms to the HF features expected types initial_entries_df = pd.DataFrame({k: pd.Series(dtype='float64' if k in ['post_id', 'reply_to_id'] else 'object') for k in ENTRY_SCHEMA.keys()}) dataset_dict = DatasetDict({ 'users': Dataset.from_pandas(initial_users_df, features=user_features), 'entries': Dataset.from_pandas(initial_entries_df, features=entry_features) # Use initial empty with HF types }) dataset_dict.push_to_hub(HF_DATASET_REPO, token=HF_TOKEN, private=True) print(f"Successfully initialized new empty HF Dataset at {HF_DATASET_REPO}.") # After initializing, reset entries_df to pandas schema entries = pd.DataFrame({k: pd.Series(dtype=v) for k, v in ENTRY_SCHEMA.items()}) except Exception as e_push: print(f"CRITICAL: Failed to create new HF Dataset. Falling back to RAM. Push Error: {e_push}") STORAGE_BACKEND_CONFIG = "RAM" else: print("HF_DATASET backend not fully configured. Falling back to RAM.") STORAGE_BACKEND_CONFIG = "RAM" else: # RAM backend or fallback print("Using RAM backend.") # Initialize global variables after loading/initializing users_db = users entries_df = entries # Calculate the next post_id counter value post_id_counter = int(entries_df['post_id'].max()) if not entries_df.empty and entries_df['post_id'].notna().any() else 0 print(f"Loaded data. Users: {len(users_db)}, Entries: {len(entries_df)}. Next Post ID: {post_id_counter + 1}") # --- Load Data Initially --- load_data() # --- API Functions (adapted for unified structure) --- def api_register(username, password): if not username or not password: return "Failed: Username/password cannot be empty." with db_lock: if username in users_db: return f"Failed: Username '{username}' already exists." users_db[username] = password handle_persistence_after_change() return f"Success: User '{username}' registered." def api_login(username, password): # Simulate authentication token (basic user:pass string) # In a real app, use proper token/session management return f"{username}:{password}" if users_db.get(username) == password else "Failed: Invalid credentials." def _get_user_from_token(token): if not token or ':' not in token: return None user, pwd = token.split(':', 1) with db_lock: # Access users_db requires lock return user if users_db.get(user) == pwd else None def api_create_post(auth_token, content): """Creates a top-level post entry.""" global entries_df, post_id_counter username = _get_user_from_token(auth_token) if not username: return "Failed: Invalid auth token." if not content: return "Failed: Content cannot be empty." with db_lock: post_id_counter += 1 new_entry = pd.DataFrame([{ "post_id": post_id_counter, "reply_to_id": pd.NA, # Use pandas NA for nullable integer "username": username, "content": content, "timestamp": datetime.utcnow().isoformat(), "type": "post" }]).astype(ENTRY_SCHEMA) # Ensure correct dtypes entries_df = pd.concat([entries_df, new_entry], ignore_index=True) handle_persistence_after_change() return f"Success: Post {post_id_counter} created." def api_create_comment(auth_token, reply_to_id, content): """Creates a comment/reply entry.""" global entries_df, post_id_counter username = _get_user_from_token(auth_token) if not username: return "Failed: Invalid auth token." if not content: return "Failed: Content cannot be empty." if reply_to_id is None: return "Failed: Reply to ID cannot be empty for a comment/reply." try: reply_to_id = int(reply_to_id) # Ensure it's an integer except (ValueError, TypeError): return "Failed: Invalid Reply To ID." with db_lock: # Check if the entry being replied to exists if reply_to_id not in entries_df['post_id'].values: return f"Failed: Entry with ID {reply_to_id} not found." post_id_counter += 1 new_entry = pd.DataFrame([{ "post_id": post_id_counter, "reply_to_id": reply_to_id, "username": username, "content": content, "timestamp": datetime.utcnow().isoformat(), "type": "comment" # All replies are 'comment' type in this scheme }]).astype(ENTRY_SCHEMA) # Ensure correct dtypes entries_df = pd.concat([entries_df, new_entry], ignore_index=True) handle_persistence_after_change() return f"Success: Comment/Reply {post_id_counter} created (replying to {reply_to_id})." def api_get_feed(): """Retrieves all entries sorted by timestamp.""" with db_lock: # Return a copy to prevent external modifications feed_data = entries_df.copy() if feed_data.empty: # Return empty DataFrame with expected columns return pd.DataFrame({k: pd.Series(dtype=v) for k, v in ENTRY_SCHEMA.items()}) # Ensure timestamp is datetime for sorting, handle potential errors try: feed_data['timestamp'] = pd.to_datetime(feed_data['timestamp']) except Exception as e: print(f"Warning: Could not convert timestamp column to datetime: {e}") # If conversion fails, sort by post_id or keep unsorted as fallback # Let's skip sorting by timestamp if conversion fails pass # Sort (prefer timestamp, fallback to post_id if timestamp fails or is identical) if 'timestamp' in feed_data.columns and pd.api.types.is_datetime64_any_dtype(feed_data['timestamp']): feed_data = feed_data.sort_values(by=['timestamp', 'post_id'], ascending=[False, False]) else: feed_data = feed_data.sort_values(by='post_id', ascending=False) # Select and rename/reorder columns for display if necessary # The current schema matches well, just need to ensure all columns are present display_columns = list(ENTRY_SCHEMA.keys()) # Use all columns in the schema feed_data = feed_data.reindex(columns=display_columns) # Fill NaN/NA for display purposes (optional, but can make table cleaner) # Convert nullable Int64 NA to empty string or specific placeholder for display for col in ['post_id', 'reply_to_id']: if col in feed_data.columns: feed_data[col] = feed_data[col].apply(lambda x: '' if pd.isna(x) else int(x)) # Display int without .0 return feed_data # --- UI Functions (adapted for unified structure) --- def ui_manual_post(username, password, content): auth_token = api_login(username, password) if "Failed" in auth_token: return "Login failed.", api_get_feed() return api_create_post(auth_token, content), api_get_feed() def ui_manual_comment(username, password, reply_to_id, content): auth_token = api_login(username, password) if "Failed" in auth_token: return "Login failed.", api_get_feed() return api_create_comment(auth_token, reply_to_id, content), api_get_feed() def ui_save_to_json(): # Call the general persistence function targeting JSON success, message = force_persist_data() # Modify message to indicate JSON specifically if needed, or keep general if "Successfully saved to JSON file." in message: return f"Successfully saved current state to {DB_FILE_JSON}." else: return message # Return the error message from persistence # --- Gradio UI --- with gr.Blocks(theme=gr.themes.Soft(), title="Social App") as demo: gr.Markdown("# Social Media Server for iLearn Agent") gr.Markdown(f"This app provides an API for iLearn agents to interact with. **Storage Backend: `{STORAGE_BACKEND_CONFIG}`**") with gr.Tabs(): with gr.TabItem("Live Feed"): # Define DataFrame columns based on the new schema feed_columns = [(col, "number" if "id" in col else "text") for col in ENTRY_SCHEMA.keys()] feed_df_display = gr.DataFrame(label="Feed", interactive=False, wrap=True, headers=list(ENTRY_SCHEMA.keys())) refresh_btn = gr.Button("Refresh Feed") with gr.TabItem("Manual Actions"): manual_action_status = gr.Textbox(label="Action Status", interactive=False) with gr.Row(): with gr.Group(): gr.Markdown("### Create Post") post_user = gr.Textbox(label="User", value="admin") post_pass = gr.Textbox(label="Pass", type="password", value="password") post_content = gr.Textbox(label="Content", lines=3) post_button = gr.Button("Submit Post", variant="primary") with gr.Group(): gr.Markdown("### Create Comment / Reply") comment_user = gr.Textbox(label="User", value="admin") comment_pass = gr.Textbox(label="Pass", type="password", value="password") # Updated UI field for the single Reply To ID comment_reply_to_id = gr.Number(label="Reply To Entry ID (Post or Comment ID)", precision=0) # precision=0 for integer input comment_content = gr.Textbox(label="Content", lines=2) comment_button = gr.Button("Submit Comment", variant="primary") with gr.Group(): gr.Markdown("### Data Management") save_json_button = gr.Button("Save Current State to JSON") # Button label kept simple, func calls general persistence # --- UI Actions --- # Post button now calls ui_manual_post which calls api_create_post post_button.click(ui_manual_post, [post_user, post_pass, post_content], [manual_action_status, feed_df_display]) # Comment button calls ui_manual_comment with the single reply_to_id field comment_button.click(ui_manual_comment, [comment_user, comment_pass, comment_reply_to_id, comment_content], [manual_action_status, feed_df_display]) save_json_button.click(ui_save_to_json, None, [manual_action_status]) refresh_btn.click(api_get_feed, None, feed_df_display) # Load feed on startup demo.load(api_get_feed, None, feed_df_display) # --- Gradio API Endpoints (adapted for unified structure) --- # Ensure API names match the expected iLearn agent interactions with gr.Column(visible=False): # Hide API interfaces in the main UI gr.Interface(api_register, ["text", "text"], "text", api_name="register") gr.Interface(api_login, ["text", "text"], "text", api_name="login") # api_create_post: token, content gr.Interface(api_create_post, ["text", "text"], "text", api_name="create_post") # api_create_comment: token, reply_to_id, content # Note: Gradio interface infers types; Number will be float unless precision=0 and converted gr.Interface(api_create_comment, ["text", "number", "text"], "text", api_name="create_comment") # api_get_feed: no input, returns dataframe gr.Interface(api_get_feed, None, "dataframe", api_name="get_feed") if __name__ == "__main__": # Ensure initial persistence happens on first run if not loading data if not os.path.exists(DB_FILE_JSON) and not os.path.exists(DB_FILE_SQLITE) and STORAGE_BACKEND_CONFIG != "HF_DATASET": print("No existing data files found. Performing initial save.") force_persist_data() # Persist the initial admin user and empty tables demo.queue().launch(server_name="0.0.0.0", server_port=7860, share=False)