import gradio as gr import pandas as pd import threading from datetime import datetime import os import json import sqlite3 import time from dotenv import load_dotenv DEMO_MODE = os.getenv("DEMO_MODE", "True").lower() == 'true' # --- Load Environment & Configuration --- load_dotenv() try: from datasets import load_dataset, Dataset, DatasetDict, Features, Value HF_DATASETS_AVAILABLE = True except ImportError: HF_DATASETS_AVAILABLE = False Features, Value = None, None STORAGE_BACKEND_CONFIG = os.getenv("STORAGE_BACKEND", "HF_DATASET").upper() HF_DATASET_REPO = os.getenv("HF_DATASET_REPO") HF_TOKEN = os.getenv("HF_TOKEN") HF_BACKUP_THRESHOLD = int(os.getenv("HF_BACKUP_THRESHOLD", 10)) DB_FILE_JSON = "social_data.json" DB_FILE_SQLITE = "social_data.db" db_lock = threading.Lock() dirty_operations_count = 0 # --- Database Initialization and Persistence --- def force_persist_data(): global dirty_operations_count with db_lock: storage_backend = STORAGE_BACKEND_CONFIG if storage_backend == "RAM": return True, "RAM backend. No persistence." elif storage_backend == "SQLITE": with sqlite3.connect(DB_FILE_SQLITE) as conn: users_df = pd.DataFrame(list(users_db.items()), columns=['username', 'password']) users_df.to_sql('users', conn, if_exists='replace', index=False) posts_df.to_sql('posts', conn, if_exists='replace', index=False) comments_df.to_sql('comments', conn, if_exists='replace', index=False) return True, "Successfully saved to SQLite." elif storage_backend == "JSON": with open(DB_FILE_JSON, "w") as f: json.dump({"users": users_db, "posts": posts_df.to_dict('records'), "comments": comments_df.to_dict('records')}, f, indent=2) return True, "Successfully saved to JSON file." elif storage_backend == "HF_DATASET": if not all([HF_DATASETS_AVAILABLE, HF_TOKEN, HF_DATASET_REPO]): return False, "HF_DATASET backend is not configured correctly." try: print("Pushing data to Hugging Face Hub...") dataset_dict = DatasetDict({ 'users': Dataset.from_pandas(pd.DataFrame(list(users_db.items()), columns=['username', 'password'])), 'posts': Dataset.from_pandas(posts_df), 'comments': Dataset.from_pandas(comments_df) }) dataset_dict.push_to_hub(HF_DATASET_REPO, token=HF_TOKEN, private=True) dirty_operations_count = 0 return True, f"Successfully pushed data to {HF_DATASET_REPO}." except Exception as e: return False, f"Error pushing to Hugging Face Hub: {e}" return False, "Unknown backend." def handle_persistence_after_change(): global dirty_operations_count storage_backend = STORAGE_BACKEND_CONFIG if storage_backend in ["JSON", "SQLITE"]: force_persist_data() elif storage_backend == "HF_DATASET": with db_lock: dirty_operations_count += 1 print(f"HF_DATASET: {dirty_operations_count}/{HF_BACKUP_THRESHOLD} operations until next auto-backup.") if dirty_operations_count >= HF_BACKUP_THRESHOLD: print(f"Threshold of {HF_BACKUP_THRESHOLD} reached. Triggering auto-backup.") force_persist_data() def load_data(): global STORAGE_BACKEND_CONFIG storage_backend = STORAGE_BACKEND_CONFIG with db_lock: users, posts, comments = {"admin": "password"}, pd.DataFrame(columns=["post_id", "username", "content", "timestamp"]), pd.DataFrame(columns=["comment_id", "post_id", "username", "content", "timestamp", "reply_to_comment_id"]) if storage_backend == "SQLITE": try: with sqlite3.connect(DB_FILE_SQLITE) as conn: cursor = conn.cursor() cursor.execute("CREATE TABLE IF NOT EXISTS users (username TEXT PRIMARY KEY, password TEXT NOT NULL)") cursor.execute("CREATE TABLE IF NOT EXISTS posts (post_id INTEGER PRIMARY KEY, username TEXT, content TEXT, timestamp TEXT)") cursor.execute("CREATE TABLE IF NOT EXISTS comments (comment_id INTEGER PRIMARY KEY, post_id INTEGER, username TEXT, content TEXT, timestamp TEXT, reply_to_comment_id INTEGER)") cursor.execute("INSERT OR IGNORE INTO users (username, password) VALUES (?, ?)", ("admin", "password")) conn.commit() users = dict(conn.execute("SELECT username, password FROM users").fetchall()) posts = pd.read_sql_query("SELECT * FROM posts", conn) comments = pd.read_sql_query("SELECT * FROM comments", conn) except Exception as e: print(f"CRITICAL: Failed to load or create SQLite DB at '{DB_FILE_SQLITE}'. Falling back to RAM. Error: {e}") STORAGE_BACKEND_CONFIG = "RAM" elif storage_backend == "JSON": if os.path.exists(DB_FILE_JSON): try: with open(DB_FILE_JSON, "r") as f: data = json.load(f) users, posts, comments = data.get("users", users), pd.DataFrame(data.get("posts", [])), pd.DataFrame(data.get("comments", [])) except (json.JSONDecodeError, KeyError): print(f"Warning: JSON file '{DB_FILE_JSON}' is corrupted or empty. Starting with fresh data.") else: print(f"JSON file '{DB_FILE_JSON}' not found. Will be created on first change.") elif storage_backend == "HF_DATASET": if all([HF_DATASETS_AVAILABLE, HF_TOKEN, HF_DATASET_REPO]): try: print(f"Attempting to load data from HF Dataset: {HF_DATASET_REPO}") ds_dict = load_dataset(HF_DATASET_REPO, token=HF_TOKEN, trust_remote_code=True) users = dict(zip(ds_dict['users']['username'], ds_dict['users']['password'])) posts = ds_dict['posts'].to_pandas() comments = ds_dict['comments'].to_pandas() print("Successfully loaded data from HF Dataset.") except Exception as e: print(f"Could not load from HF Dataset '{HF_DATASET_REPO}'. Attempting to initialize a new one. Error: {e}") try: user_features = Features({'username': Value('string'), 'password': Value('string')}) post_features = Features({'post_id': Value('int64'), 'username': Value('string'), 'content': Value('string'), 'timestamp': Value('string')}) comment_features = Features({'comment_id': Value('int64'), 'post_id': Value('int64'), 'username': Value('string'), 'content': Value('string'), 'timestamp': Value('string'), 'reply_to_comment_id': Value('int64')}) dataset_dict = DatasetDict({ 'users': Dataset.from_pandas(pd.DataFrame(list(users.items()), columns=['username', 'password']), features=user_features), 'posts': Dataset.from_pandas(posts, features=post_features), 'comments': Dataset.from_pandas(comments, features=comment_features) }) dataset_dict.push_to_hub(HF_DATASET_REPO, token=HF_TOKEN, private=True) print(f"Successfully initialized new empty HF Dataset at {HF_DATASET_REPO}.") except Exception as e_push: print(f"CRITICAL: Failed to create new HF Dataset. Falling back to RAM for this session. Push Error: {e_push}") STORAGE_BACKEND_CONFIG = "RAM" else: print("HF_DATASET backend not fully configured (check env vars and library install). Falling back to RAM for this session.") STORAGE_BACKEND_CONFIG = "RAM" if "reply_to_comment_id" not in comments.columns: comments["reply_to_comment_id"] = None post_counter = int(posts['post_id'].max()) if not posts.empty else 0 comment_counter = int(comments['comment_id'].max()) if not comments.empty else 0 return users, posts, comments, post_counter, comment_counter users_db, posts_df, comments_df, post_counter, comment_counter = load_data() # --- API Functions --- def api_register(username, password): if not username or not password: return "[Auth API] Failed: Username/password cannot be empty." with db_lock: if username in users_db: return f"[Auth API] Failed: Username '{username}' already exists." users_db[username] = password handle_persistence_after_change() return f"[Auth API] Success: User '{username}' registered." def api_login(username, password): return f"{username}:{password}" if username in users_db and users_db.get(username) == password else "[Auth API] Failed: Invalid credentials." def _get_user_from_token(auth_token): if not auth_token or ':' not in auth_token: return None try: username, password = auth_token.split(':', 1) return username if username in users_db and users_db.get(username) == password else None except (ValueError, TypeError): return None def api_create_post(auth_token, content): global posts_df, post_counter username = _get_user_from_token(auth_token) if not username: return "[Post API] Failed: Invalid auth token." if not content or not content.strip(): return "[Post API] Failed: Post content cannot be empty." with db_lock: post_counter += 1 new_post = pd.DataFrame([{"post_id": post_counter, "username": username, "content": content, "timestamp": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")}]) posts_df = pd.concat([posts_df, new_post], ignore_index=True) handle_persistence_after_change() return f"[Post API] Success: Post created with ID {post_counter}." def api_create_comment(auth_token, post_id, content, reply_to_comment_id=None): global comments_df, comment_counter username = _get_user_from_token(auth_token) if not username: return "[Comment API] Failed: Invalid auth token." if not content or not content.strip(): return "[Comment API] Failed: Comment content cannot be empty." with db_lock: try: target_post_id = int(post_id) except (ValueError, TypeError): return f"[Comment API] Failed: Post ID must be a number." if target_post_id not in posts_df['post_id'].values: return f"[Comment API] Failed: Post with ID {post_id} not found." target_reply_id = None if reply_to_comment_id is not None: try: target_reply_id = int(reply_to_comment_id) except (ValueError, TypeError): return "[Comment API] Failed: Reply ID must be a number." if target_reply_id not in comments_df['comment_id'].values: return f"[Comment API] Failed: Comment to reply to (ID {target_reply_id}) not found." comment_counter += 1 new_comment_data = {"comment_id": comment_counter, "post_id": target_post_id, "username": username, "content": content, "timestamp": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), "reply_to_comment_id": target_reply_id} new_comment = pd.DataFrame([new_comment_data]) comments_df = pd.concat([comments_df, new_comment], ignore_index=True) handle_persistence_after_change() return f"[Comment API] Success: Comment created on post {post_id}." def _format_comments_threaded(post_id, all_comments_df, parent_id=None, depth=0): thread = [] # Match NaN correctly for top-level comments if parent_id is None: children = all_comments_df[(all_comments_df['post_id'] == post_id) & (all_comments_df['reply_to_comment_id'].isna())] else: children = all_comments_df[all_comments_df['reply_to_comment_id'] == parent_id] for _, comment in children.iterrows(): indent = " " * depth thread.append(f"{indent} - (ID: {comment['comment_id']}) @{comment['username']}: {comment['content']}") thread.extend(_format_comments_threaded(post_id, all_comments_df, parent_id=comment['comment_id'], depth=depth + 1)) return thread def api_get_feed(search_query: str = None): with db_lock: current_posts, current_comments = posts_df.copy(), comments_df.copy() if current_posts.empty: return pd.DataFrame(columns=["post_id", "username", "content", "timestamp", "comments"]) display_posts = current_posts[current_posts['content'].str.contains(search_query, case=False, na=False)] if search_query and not search_query.isspace() else current_posts sorted_posts = display_posts.sort_values(by="timestamp", ascending=False) feed_data = [] for _, post in sorted_posts.iterrows(): threaded_comments = _format_comments_threaded(post['post_id'], current_comments) feed_data.append({"post_id": post['post_id'], "username": post['username'], "content": post['content'], "timestamp": post['timestamp'], "comments": "\n".join(threaded_comments)}) return pd.DataFrame(feed_data) if feed_data else pd.DataFrame(columns=["post_id", "username", "content", "timestamp", "comments"]) # --- UI Helper Functions --- def ui_manual_post(username, password, content): if not username or not password: return "Username and password are required.", api_get_feed() auth_token = api_login(username, password) if "Failed" in auth_token: return "Login failed. Check credentials.", api_get_feed() result = api_create_post(auth_token, content) return result, api_get_feed() def ui_manual_comment(username, password, post_id, reply_id, content): if not username or not password: return "Username and password are required.", api_get_feed() auth_token = api_login(username, password) if "Failed" in auth_token: return "Login failed. Check credentials.", api_get_feed() result = api_create_comment(auth_token, post_id, content, reply_to_comment_id=reply_id) return result, api_get_feed() with gr.Blocks(theme=gr.themes.Soft(), title="Social App") as demo: gr.Markdown("# iLearnHub") gr.Markdown(f"This app provides an API for iLearn agents to interact with. **Storage Backend: `{STORAGE_BACKEND_CONFIG}`**") gr.Markdown(f"This Server address: https://broadfield-dev-ilearnhub.hf.space") with gr.Tabs(): with gr.TabItem("Live Feed"): feed_df_display = gr.DataFrame(label="Feed", headers=["post_id", "username", "content", "timestamp", "comments"], interactive=False, wrap=True) refresh_btn = gr.Button("Refresh Feed") with gr.TabItem("Manual Actions & Settings"): manual_action_status = gr.Textbox(label="Action Status", interactive=False) gr.Markdown("## DEMO_MODE", visible=True if DEMO_MODE else False) with gr.Row(visible=False if DEMO_MODE else True): with gr.Group(): gr.Markdown("### Manually Create Post") post_user = gr.Textbox(label="Username", value="admin") post_pass = gr.Textbox(label="Password", type="password", value="password") post_content = gr.Textbox(label="Post Content", lines=3, placeholder="What's on your mind?") post_button = gr.Button("Submit Post", variant="primary") with gr.Group(): gr.Markdown("### Manually Create Comment") comment_user = gr.Textbox(label="Username", value="admin") comment_pass = gr.Textbox(label="Password", type="password", value="password") comment_post_id = gr.Number(label="Target Post ID", precision=0) comment_reply_id = gr.Number(label="Reply to Comment ID (optional)", precision=0) comment_content = gr.Textbox(label="Comment Content", lines=2, placeholder="Add a comment...") comment_button = gr.Button("Submit Comment", variant="primary") with gr.Group(): gr.Markdown("### Settings") feed_refresh_interval_slider = gr.Slider(minimum=5, maximum=120, value=15, step=5, label="Feed Refresh Interval (seconds)") with gr.TabItem("Admin", visible=(STORAGE_BACKEND_CONFIG == "HF_DATASET")): gr.Markdown("### Hugging Face Dataset Control") backup_btn = gr.Button("Force Backup to Hugging Face Hub", visible=not DEMO_MODE) backup_status = gr.Textbox(label="Backup Status", interactive=False) # Event Handlers post_button.click( fn=ui_manual_post, inputs=[post_user, post_pass, post_content], outputs=[manual_action_status, feed_df_display] ) comment_button.click( fn=ui_manual_comment, inputs=[comment_user, comment_pass, comment_post_id, comment_reply_id, comment_content], outputs=[manual_action_status, feed_df_display] ) last_refresh_time = time.time() def timed_feed_refresh(interval): global last_refresh_time if time.time() - last_refresh_time > interval: last_refresh_time = time.time() return api_get_feed() return gr.update() gr.Timer(1).tick( fn=timed_feed_refresh, inputs=[feed_refresh_interval_slider], outputs=[feed_df_display] ) refresh_btn.click(api_get_feed, None, feed_df_display) def admin_backup_handler(): success, message = force_persist_data() return message if STORAGE_BACKEND_CONFIG == "HF_DATASET": backup_btn.click(admin_backup_handler, None, backup_status) demo.load(api_get_feed, None, feed_df_display) with gr.Column(visible=False if DEMO_MODE else True): gr.Interface(api_register, ["text", gr.Textbox(type="password")], "text", api_name="register", allow_flagging="never") gr.Interface(api_login, ["text", gr.Textbox(type="password")], "text", api_name="login", allow_flagging="never") gr.Interface(api_create_post, ["text", "text"], "text", api_name="create_post", allow_flagging="never") gr.Interface(api_create_comment, ["text", "number", "text", "number"], "text", api_name="create_comment", allow_flagging="never") gr.Interface(api_get_feed, ["text"], "dataframe", api_name="get_feed", allow_flagging="never") if __name__ == "__main__": print(f"Starting Social Media App server with {STORAGE_BACKEND_CONFIG} backend.") if STORAGE_BACKEND_CONFIG == "HF_DATASET" and not HF_DATASETS_AVAILABLE: print("\nWARNING: 'datasets' library not found. Please run `pip install datasets huggingface_hub` to use the HF_DATASET backend.\n") app_port = int(os.getenv("GRADIO_PORT", 7860)) demo.queue().launch(server_name="0.0.0.0", server_port=app_port, share=True, mcp_server=True, debug=True)