""" Advanced Token Manager for Hugging Face authentication Supports persistent storage with encryption and multiple token types """ import os import sqlite3 import logging import json from typing import Dict, Any, List, Optional from pathlib import Path from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC import base64 from datetime import datetime logger = logging.getLogger(__name__) class TokenManager: """ Advanced token manager with encryption and persistent storage """ def __init__(self, db_path: str = "database/tokens.db"): """ Initialize token manager Args: db_path: Path to SQLite database file """ self.db_path = Path(db_path) self.db_path.parent.mkdir(parents=True, exist_ok=True) # Initialize encryption self.encryption_key = self._get_or_create_encryption_key() self.cipher = Fernet(self.encryption_key) # Initialize database self._init_database() # Load tokens from environment variables self._load_env_tokens() # Token type definitions self.token_types = { 'read': { 'name': 'Read Token', 'description': 'رمز للقراءة فقط من المستودعات', 'permissions': ['read_public_repos', 'read_private_repos_with_access', 'download_models', 'download_datasets'], 'restrictions': ['cannot_upload', 'cannot_create_repos', 'cannot_modify_content'], 'use_cases': ['تحميل النماذج للتدريب', 'الوصول للبيانات الخاصة', 'التطوير والاختبار'], 'security_level': 'medium', 'recommended_for': 'development' }, 'write': { 'name': 'Write Token', 'description': 'رمز للقراءة والكتابة الكاملة', 'permissions': ['all_read_permissions', 'upload_files', 'create_repositories', 'modify_content', 'manage_repo_settings', 'delete_files'], 'restrictions': ['limited_by_account_permissions'], 'use_cases': ['رفع النماذج المدربة', 'مشاركة النتائج مع المجتمع', 'إدارة المشاريع الشخصية'], 'security_level': 'high', 'recommended_for': 'production' }, 'fine_grained': { 'name': 'Fine-grained Token', 'description': 'رمز بأذونات مخصصة ومحددة', 'permissions': ['custom_per_repository', 'granular_access_control', 'time_limited_access', 'ip_restricted_access'], 'restrictions': ['repository_specific', 'time_limited', 'ip_restricted'], 'use_cases': ['المشاريع التجارية', 'البيانات الحساسة', 'فرق العمل الكبيرة'], 'security_level': 'very_high', 'recommended_for': 'enterprise' } } logger.info("Token Manager initialized") def _load_env_tokens(self): """Load tokens from environment variables""" env_tokens = { 'read_token': { 'token': os.getenv('HF_TOKEN_READ'), 'type': 'read', 'description': 'رمز القراءة من متغيرات البيئة - للتطوير والتعلم' }, 'write_token': { 'token': os.getenv('HF_TOKEN_WRITE'), 'type': 'write', 'description': 'رمز الكتابة من متغيرات البيئة - لمشاركة النماذج' }, 'fine_grained_token': { 'token': os.getenv('HF_TOKEN_FINE_GRAINED'), 'type': 'fine_grained', 'description': 'رمز مخصص من متغيرات البيئة - للمشاريع التجارية' } } # Save tokens from environment if they exist for name, token_info in env_tokens.items(): if token_info['token']: # Check if token already exists existing_token = self.get_token(name) if not existing_token: success = self.save_token( name=name, token=token_info['token'], token_type=token_info['type'], description=token_info['description'], is_default=(token_info['type'] == 'read') # Set read as default ) if success: logger.info(f"Loaded {token_info['type']} token from environment") def get_token_for_task(self, task_type: str = 'read') -> Optional[str]: """ Get appropriate token for specific task Args: task_type: Type of task (read, write, medical, private, upload, download) Returns: Appropriate token for the task """ # Map task types to token preferences task_token_map = { 'read': ['read_token', 'fine_grained_token', 'write_token'], 'download': ['read_token', 'fine_grained_token', 'write_token'], 'write': ['write_token', 'fine_grained_token'], 'upload': ['write_token', 'fine_grained_token'], 'medical': ['fine_grained_token', 'write_token', 'read_token'], 'private': ['fine_grained_token', 'write_token'], 'commercial': ['fine_grained_token'], 'enterprise': ['fine_grained_token'] } # Get preferred token order for task preferred_tokens = task_token_map.get(task_type, ['read_token']) # Try to get tokens in order of preference for token_name in preferred_tokens: token = self.get_token(token_name) if token: logger.debug(f"Using {token_name} for task: {task_type}") return token # Fallback to default token default_token = self.get_token() if default_token: logger.debug(f"Using default token for task: {task_type}") return default_token # Last resort: try environment variables directly env_fallbacks = { 'read': 'HF_TOKEN_READ', 'write': 'HF_TOKEN_WRITE', 'medical': 'HF_TOKEN_FINE_GRAINED', 'private': 'HF_TOKEN_FINE_GRAINED' } env_var = env_fallbacks.get(task_type, 'HF_TOKEN') env_token = os.getenv(env_var) if env_token: logger.debug(f"Using environment token {env_var} for task: {task_type}") return env_token logger.warning(f"No suitable token found for task: {task_type}") return None def _get_or_create_encryption_key(self) -> bytes: """Get or create encryption key for token storage""" key_file = self.db_path.parent / ".token_key" if key_file.exists(): with open(key_file, 'rb') as f: return f.read() else: # Generate new key password = os.urandom(32) # Random password salt = os.urandom(16) kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, ) key = base64.urlsafe_b64encode(kdf.derive(password)) # Save key securely with open(key_file, 'wb') as f: f.write(key) # Set restrictive permissions os.chmod(key_file, 0o600) logger.info("Created new encryption key") return key def _init_database(self): """Initialize SQLite database""" with sqlite3.connect(self.db_path) as conn: conn.execute(''' CREATE TABLE IF NOT EXISTS tokens ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE NOT NULL, token_type TEXT NOT NULL, encrypted_token TEXT NOT NULL, is_default BOOLEAN DEFAULT FALSE, description TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_used TIMESTAMP, usage_count INTEGER DEFAULT 0, is_active BOOLEAN DEFAULT TRUE ) ''') conn.execute(''' CREATE TABLE IF NOT EXISTS token_usage_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, token_name TEXT NOT NULL, operation TEXT NOT NULL, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, success BOOLEAN, error_message TEXT ) ''') conn.commit() logger.info("Database initialized") def save_token(self, name: str, token: str, token_type: str = 'read', description: str = '', is_default: bool = False) -> bool: """ Save encrypted token to database Args: name: Token name/identifier token: HF token string token_type: Type of token (read/write/fine_grained) description: Optional description is_default: Whether this should be the default token Returns: Success status """ try: # Validate token type if token_type not in self.token_types: raise ValueError(f"Invalid token type: {token_type}") # Encrypt token encrypted_token = self.cipher.encrypt(token.encode()).decode() with sqlite3.connect(self.db_path) as conn: # If setting as default, unset other defaults if is_default: conn.execute('UPDATE tokens SET is_default = FALSE') # Insert or update token conn.execute(''' INSERT OR REPLACE INTO tokens (name, token_type, encrypted_token, is_default, description, created_at) VALUES (?, ?, ?, ?, ?, ?) ''', (name, token_type, encrypted_token, is_default, description, datetime.now())) conn.commit() logger.info(f"Saved token '{name}' of type '{token_type}'") return True except Exception as e: logger.error(f"Failed to save token '{name}': {e}") return False def get_token(self, name: Optional[str] = None) -> Optional[str]: """ Get decrypted token by name or default token Args: name: Token name (if None, returns default token) Returns: Decrypted token string or None """ try: with sqlite3.connect(self.db_path) as conn: if name: cursor = conn.execute( 'SELECT encrypted_token FROM tokens WHERE name = ? AND is_active = TRUE', (name,) ) else: cursor = conn.execute( 'SELECT encrypted_token, name FROM tokens WHERE is_default = TRUE AND is_active = TRUE' ) result = cursor.fetchone() if result: encrypted_token = result[0] token_name = result[1] if not name else name # Decrypt token decrypted_token = self.cipher.decrypt(encrypted_token.encode()).decode() # Update usage statistics self._update_token_usage(token_name) return decrypted_token return None except Exception as e: logger.error(f"Failed to get token '{name}': {e}") return None def list_tokens(self) -> List[Dict[str, Any]]: """ List all saved tokens (without decrypting them) Returns: List of token information """ try: with sqlite3.connect(self.db_path) as conn: cursor = conn.execute(''' SELECT name, token_type, is_default, description, created_at, last_used, usage_count, is_active FROM tokens ORDER BY is_default DESC, created_at DESC ''') tokens = [] for row in cursor.fetchall(): token_info = { 'name': row[0], 'type': row[1], 'type_info': self.token_types.get(row[1], {}), 'is_default': bool(row[2]), 'description': row[3], 'created_at': row[4], 'last_used': row[5], 'usage_count': row[6], 'is_active': bool(row[7]) } tokens.append(token_info) return tokens except Exception as e: logger.error(f"Failed to list tokens: {e}") return [] def delete_token(self, name: str) -> bool: """ Delete token from database Args: name: Token name to delete Returns: Success status """ try: with sqlite3.connect(self.db_path) as conn: cursor = conn.execute('DELETE FROM tokens WHERE name = ?', (name,)) if cursor.rowcount > 0: conn.commit() logger.info(f"Deleted token '{name}'") return True else: logger.warning(f"Token '{name}' not found") return False except Exception as e: logger.error(f"Failed to delete token '{name}': {e}") return False def set_default_token(self, name: str) -> bool: """ Set a token as the default Args: name: Token name to set as default Returns: Success status """ try: with sqlite3.connect(self.db_path) as conn: # Check if token exists cursor = conn.execute('SELECT id FROM tokens WHERE name = ?', (name,)) if not cursor.fetchone(): logger.error(f"Token '{name}' not found") return False # Unset all defaults conn.execute('UPDATE tokens SET is_default = FALSE') # Set new default conn.execute('UPDATE tokens SET is_default = TRUE WHERE name = ?', (name,)) conn.commit() logger.info(f"Set '{name}' as default token") return True except Exception as e: logger.error(f"Failed to set default token '{name}': {e}") return False def validate_token(self, token: str) -> Dict[str, Any]: """ Validate HF token by testing API access Args: token: Token to validate Returns: Validation result """ try: from huggingface_hub import HfApi api = HfApi(token=token) user_info = api.whoami() return { 'valid': True, 'username': user_info.get('name', 'unknown'), 'email': user_info.get('email', ''), 'plan': user_info.get('plan', 'free'), 'message': 'Token is valid and working' } except Exception as e: return { 'valid': False, 'error': str(e), 'message': 'Token validation failed' } def _update_token_usage(self, token_name: str): """Update token usage statistics""" try: with sqlite3.connect(self.db_path) as conn: conn.execute(''' UPDATE tokens SET last_used = ?, usage_count = usage_count + 1 WHERE name = ? ''', (datetime.now(), token_name)) conn.commit() except Exception as e: logger.error(f"Failed to update token usage: {e}") def log_token_usage(self, token_name: str, operation: str, success: bool, error_message: str = ''): """Log token usage for auditing""" try: with sqlite3.connect(self.db_path) as conn: conn.execute(''' INSERT INTO token_usage_log (token_name, operation, success, error_message) VALUES (?, ?, ?, ?) ''', (token_name, operation, success, error_message)) conn.commit() except Exception as e: logger.error(f"Failed to log token usage: {e}") def get_token_recommendations(self, intended_use: str) -> Dict[str, Any]: """ Get token type recommendations based on intended use Args: intended_use: Description of intended use Returns: Recommendation information """ use_lower = intended_use.lower() if any(word in use_lower for word in ['learn', 'study', 'test', 'develop']): recommended_type = 'read' elif any(word in use_lower for word in ['share', 'upload', 'publish', 'create']): recommended_type = 'write' elif any(word in use_lower for word in ['commercial', 'enterprise', 'team', 'sensitive']): recommended_type = 'fine_grained' else: recommended_type = 'read' # Default to read return { 'recommended_type': recommended_type, 'type_info': self.token_types[recommended_type], 'explanation': f"Based on your intended use ('{intended_use}'), we recommend a {recommended_type} token." }