Spaces:
Running
Running
""" | |
Advanced Token Manager for Hugging Face authentication | |
Supports persistent storage with encryption and multiple token types | |
""" | |
import os | |
import sqlite3 | |
import logging | |
import json | |
from typing import Dict, Any, List, Optional | |
from pathlib import Path | |
from cryptography.fernet import Fernet | |
from cryptography.hazmat.primitives import hashes | |
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC | |
import base64 | |
from datetime import datetime | |
logger = logging.getLogger(__name__) | |
class TokenManager: | |
""" | |
Advanced token manager with encryption and persistent storage | |
""" | |
def __init__(self, db_path: str = "database/tokens.db"): | |
""" | |
Initialize token manager | |
Args: | |
db_path: Path to SQLite database file | |
""" | |
self.db_path = Path(db_path) | |
self.db_path.parent.mkdir(parents=True, exist_ok=True) | |
# Initialize encryption | |
self.encryption_key = self._get_or_create_encryption_key() | |
self.cipher = Fernet(self.encryption_key) | |
# Initialize database | |
self._init_database() | |
# Load tokens from environment variables | |
self._load_env_tokens() | |
# Token type definitions | |
self.token_types = { | |
'read': { | |
'name': 'Read Token', | |
'description': 'رمز للقراءة فقط من المستودعات', | |
'permissions': ['read_public_repos', 'read_private_repos_with_access', | |
'download_models', 'download_datasets'], | |
'restrictions': ['cannot_upload', 'cannot_create_repos', 'cannot_modify_content'], | |
'use_cases': ['تحميل النماذج للتدريب', 'الوصول للبيانات الخاصة', 'التطوير والاختبار'], | |
'security_level': 'medium', | |
'recommended_for': 'development' | |
}, | |
'write': { | |
'name': 'Write Token', | |
'description': 'رمز للقراءة والكتابة الكاملة', | |
'permissions': ['all_read_permissions', 'upload_files', 'create_repositories', | |
'modify_content', 'manage_repo_settings', 'delete_files'], | |
'restrictions': ['limited_by_account_permissions'], | |
'use_cases': ['رفع النماذج المدربة', 'مشاركة النتائج مع المجتمع', 'إدارة المشاريع الشخصية'], | |
'security_level': 'high', | |
'recommended_for': 'production' | |
}, | |
'fine_grained': { | |
'name': 'Fine-grained Token', | |
'description': 'رمز بأذونات مخصصة ومحددة', | |
'permissions': ['custom_per_repository', 'granular_access_control', | |
'time_limited_access', 'ip_restricted_access'], | |
'restrictions': ['repository_specific', 'time_limited', 'ip_restricted'], | |
'use_cases': ['المشاريع التجارية', 'البيانات الحساسة', 'فرق العمل الكبيرة'], | |
'security_level': 'very_high', | |
'recommended_for': 'enterprise' | |
} | |
} | |
logger.info("Token Manager initialized") | |
def _load_env_tokens(self): | |
"""Load tokens from environment variables""" | |
env_tokens = { | |
'read_token': { | |
'token': os.getenv('HF_TOKEN_READ'), | |
'type': 'read', | |
'description': 'رمز القراءة من متغيرات البيئة - للتطوير والتعلم' | |
}, | |
'write_token': { | |
'token': os.getenv('HF_TOKEN_WRITE'), | |
'type': 'write', | |
'description': 'رمز الكتابة من متغيرات البيئة - لمشاركة النماذج' | |
}, | |
'fine_grained_token': { | |
'token': os.getenv('HF_TOKEN_FINE_GRAINED'), | |
'type': 'fine_grained', | |
'description': 'رمز مخصص من متغيرات البيئة - للمشاريع التجارية' | |
} | |
} | |
# Save tokens from environment if they exist | |
for name, token_info in env_tokens.items(): | |
if token_info['token']: | |
# Check if token already exists | |
existing_token = self.get_token(name) | |
if not existing_token: | |
success = self.save_token( | |
name=name, | |
token=token_info['token'], | |
token_type=token_info['type'], | |
description=token_info['description'], | |
is_default=(token_info['type'] == 'read') # Set read as default | |
) | |
if success: | |
logger.info(f"Loaded {token_info['type']} token from environment") | |
def get_token_for_task(self, task_type: str = 'read') -> Optional[str]: | |
""" | |
Get appropriate token for specific task | |
Args: | |
task_type: Type of task (read, write, medical, private, upload, download) | |
Returns: | |
Appropriate token for the task | |
""" | |
# Map task types to token preferences | |
task_token_map = { | |
'read': ['read_token', 'fine_grained_token', 'write_token'], | |
'download': ['read_token', 'fine_grained_token', 'write_token'], | |
'write': ['write_token', 'fine_grained_token'], | |
'upload': ['write_token', 'fine_grained_token'], | |
'medical': ['fine_grained_token', 'write_token', 'read_token'], | |
'private': ['fine_grained_token', 'write_token'], | |
'commercial': ['fine_grained_token'], | |
'enterprise': ['fine_grained_token'] | |
} | |
# Get preferred token order for task | |
preferred_tokens = task_token_map.get(task_type, ['read_token']) | |
# Try to get tokens in order of preference | |
for token_name in preferred_tokens: | |
token = self.get_token(token_name) | |
if token: | |
logger.debug(f"Using {token_name} for task: {task_type}") | |
return token | |
# Fallback to default token | |
default_token = self.get_token() | |
if default_token: | |
logger.debug(f"Using default token for task: {task_type}") | |
return default_token | |
# Last resort: try environment variables directly | |
env_fallbacks = { | |
'read': 'HF_TOKEN_READ', | |
'write': 'HF_TOKEN_WRITE', | |
'medical': 'HF_TOKEN_FINE_GRAINED', | |
'private': 'HF_TOKEN_FINE_GRAINED' | |
} | |
env_var = env_fallbacks.get(task_type, 'HF_TOKEN') | |
env_token = os.getenv(env_var) | |
if env_token: | |
logger.debug(f"Using environment token {env_var} for task: {task_type}") | |
return env_token | |
logger.warning(f"No suitable token found for task: {task_type}") | |
return None | |
def _get_or_create_encryption_key(self) -> bytes: | |
"""Get or create encryption key for token storage""" | |
key_file = self.db_path.parent / ".token_key" | |
if key_file.exists(): | |
with open(key_file, 'rb') as f: | |
return f.read() | |
else: | |
# Generate new key | |
password = os.urandom(32) # Random password | |
salt = os.urandom(16) | |
kdf = PBKDF2HMAC( | |
algorithm=hashes.SHA256(), | |
length=32, | |
salt=salt, | |
iterations=100000, | |
) | |
key = base64.urlsafe_b64encode(kdf.derive(password)) | |
# Save key securely | |
with open(key_file, 'wb') as f: | |
f.write(key) | |
# Set restrictive permissions | |
os.chmod(key_file, 0o600) | |
logger.info("Created new encryption key") | |
return key | |
def _init_database(self): | |
"""Initialize SQLite database""" | |
with sqlite3.connect(self.db_path) as conn: | |
conn.execute(''' | |
CREATE TABLE IF NOT EXISTS tokens ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
name TEXT UNIQUE NOT NULL, | |
token_type TEXT NOT NULL, | |
encrypted_token TEXT NOT NULL, | |
is_default BOOLEAN DEFAULT FALSE, | |
description TEXT, | |
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
last_used TIMESTAMP, | |
usage_count INTEGER DEFAULT 0, | |
is_active BOOLEAN DEFAULT TRUE | |
) | |
''') | |
conn.execute(''' | |
CREATE TABLE IF NOT EXISTS token_usage_log ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
token_name TEXT NOT NULL, | |
operation TEXT NOT NULL, | |
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
success BOOLEAN, | |
error_message TEXT | |
) | |
''') | |
conn.commit() | |
logger.info("Database initialized") | |
def save_token(self, name: str, token: str, token_type: str = 'read', | |
description: str = '', is_default: bool = False) -> bool: | |
""" | |
Save encrypted token to database | |
Args: | |
name: Token name/identifier | |
token: HF token string | |
token_type: Type of token (read/write/fine_grained) | |
description: Optional description | |
is_default: Whether this should be the default token | |
Returns: | |
Success status | |
""" | |
try: | |
# Validate token type | |
if token_type not in self.token_types: | |
raise ValueError(f"Invalid token type: {token_type}") | |
# Encrypt token | |
encrypted_token = self.cipher.encrypt(token.encode()).decode() | |
with sqlite3.connect(self.db_path) as conn: | |
# If setting as default, unset other defaults | |
if is_default: | |
conn.execute('UPDATE tokens SET is_default = FALSE') | |
# Insert or update token | |
conn.execute(''' | |
INSERT OR REPLACE INTO tokens | |
(name, token_type, encrypted_token, is_default, description, created_at) | |
VALUES (?, ?, ?, ?, ?, ?) | |
''', (name, token_type, encrypted_token, is_default, description, datetime.now())) | |
conn.commit() | |
logger.info(f"Saved token '{name}' of type '{token_type}'") | |
return True | |
except Exception as e: | |
logger.error(f"Failed to save token '{name}': {e}") | |
return False | |
def get_token(self, name: Optional[str] = None) -> Optional[str]: | |
""" | |
Get decrypted token by name or default token | |
Args: | |
name: Token name (if None, returns default token) | |
Returns: | |
Decrypted token string or None | |
""" | |
try: | |
with sqlite3.connect(self.db_path) as conn: | |
if name: | |
cursor = conn.execute( | |
'SELECT encrypted_token FROM tokens WHERE name = ? AND is_active = TRUE', | |
(name,) | |
) | |
else: | |
cursor = conn.execute( | |
'SELECT encrypted_token, name FROM tokens WHERE is_default = TRUE AND is_active = TRUE' | |
) | |
result = cursor.fetchone() | |
if result: | |
encrypted_token = result[0] | |
token_name = result[1] if not name else name | |
# Decrypt token | |
decrypted_token = self.cipher.decrypt(encrypted_token.encode()).decode() | |
# Update usage statistics | |
self._update_token_usage(token_name) | |
return decrypted_token | |
return None | |
except Exception as e: | |
logger.error(f"Failed to get token '{name}': {e}") | |
return None | |
def list_tokens(self) -> List[Dict[str, Any]]: | |
""" | |
List all saved tokens (without decrypting them) | |
Returns: | |
List of token information | |
""" | |
try: | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.execute(''' | |
SELECT name, token_type, is_default, description, created_at, | |
last_used, usage_count, is_active | |
FROM tokens | |
ORDER BY is_default DESC, created_at DESC | |
''') | |
tokens = [] | |
for row in cursor.fetchall(): | |
token_info = { | |
'name': row[0], | |
'type': row[1], | |
'type_info': self.token_types.get(row[1], {}), | |
'is_default': bool(row[2]), | |
'description': row[3], | |
'created_at': row[4], | |
'last_used': row[5], | |
'usage_count': row[6], | |
'is_active': bool(row[7]) | |
} | |
tokens.append(token_info) | |
return tokens | |
except Exception as e: | |
logger.error(f"Failed to list tokens: {e}") | |
return [] | |
def delete_token(self, name: str) -> bool: | |
""" | |
Delete token from database | |
Args: | |
name: Token name to delete | |
Returns: | |
Success status | |
""" | |
try: | |
with sqlite3.connect(self.db_path) as conn: | |
cursor = conn.execute('DELETE FROM tokens WHERE name = ?', (name,)) | |
if cursor.rowcount > 0: | |
conn.commit() | |
logger.info(f"Deleted token '{name}'") | |
return True | |
else: | |
logger.warning(f"Token '{name}' not found") | |
return False | |
except Exception as e: | |
logger.error(f"Failed to delete token '{name}': {e}") | |
return False | |
def set_default_token(self, name: str) -> bool: | |
""" | |
Set a token as the default | |
Args: | |
name: Token name to set as default | |
Returns: | |
Success status | |
""" | |
try: | |
with sqlite3.connect(self.db_path) as conn: | |
# Check if token exists | |
cursor = conn.execute('SELECT id FROM tokens WHERE name = ?', (name,)) | |
if not cursor.fetchone(): | |
logger.error(f"Token '{name}' not found") | |
return False | |
# Unset all defaults | |
conn.execute('UPDATE tokens SET is_default = FALSE') | |
# Set new default | |
conn.execute('UPDATE tokens SET is_default = TRUE WHERE name = ?', (name,)) | |
conn.commit() | |
logger.info(f"Set '{name}' as default token") | |
return True | |
except Exception as e: | |
logger.error(f"Failed to set default token '{name}': {e}") | |
return False | |
def validate_token(self, token: str) -> Dict[str, Any]: | |
""" | |
Validate HF token by testing API access | |
Args: | |
token: Token to validate | |
Returns: | |
Validation result | |
""" | |
try: | |
from huggingface_hub import HfApi | |
api = HfApi(token=token) | |
user_info = api.whoami() | |
return { | |
'valid': True, | |
'username': user_info.get('name', 'unknown'), | |
'email': user_info.get('email', ''), | |
'plan': user_info.get('plan', 'free'), | |
'message': 'Token is valid and working' | |
} | |
except Exception as e: | |
return { | |
'valid': False, | |
'error': str(e), | |
'message': 'Token validation failed' | |
} | |
def _update_token_usage(self, token_name: str): | |
"""Update token usage statistics""" | |
try: | |
with sqlite3.connect(self.db_path) as conn: | |
conn.execute(''' | |
UPDATE tokens | |
SET last_used = ?, usage_count = usage_count + 1 | |
WHERE name = ? | |
''', (datetime.now(), token_name)) | |
conn.commit() | |
except Exception as e: | |
logger.error(f"Failed to update token usage: {e}") | |
def log_token_usage(self, token_name: str, operation: str, | |
success: bool, error_message: str = ''): | |
"""Log token usage for auditing""" | |
try: | |
with sqlite3.connect(self.db_path) as conn: | |
conn.execute(''' | |
INSERT INTO token_usage_log | |
(token_name, operation, success, error_message) | |
VALUES (?, ?, ?, ?) | |
''', (token_name, operation, success, error_message)) | |
conn.commit() | |
except Exception as e: | |
logger.error(f"Failed to log token usage: {e}") | |
def get_token_recommendations(self, intended_use: str) -> Dict[str, Any]: | |
""" | |
Get token type recommendations based on intended use | |
Args: | |
intended_use: Description of intended use | |
Returns: | |
Recommendation information | |
""" | |
use_lower = intended_use.lower() | |
if any(word in use_lower for word in ['learn', 'study', 'test', 'develop']): | |
recommended_type = 'read' | |
elif any(word in use_lower for word in ['share', 'upload', 'publish', 'create']): | |
recommended_type = 'write' | |
elif any(word in use_lower for word in ['commercial', 'enterprise', 'team', 'sensitive']): | |
recommended_type = 'fine_grained' | |
else: | |
recommended_type = 'read' # Default to read | |
return { | |
'recommended_type': recommended_type, | |
'type_info': self.token_types[recommended_type], | |
'explanation': f"Based on your intended use ('{intended_use}'), we recommend a {recommended_type} token." | |
} | |