train-modle / src /core /token_manager.py
fokan's picture
Force Space rebuild v2.1.0 with incremental training
5be0e59
"""
Advanced Token Manager for Hugging Face authentication
Supports persistent storage with encryption and multiple token types
"""
import os
import sqlite3
import logging
import json
from typing import Dict, Any, List, Optional
from pathlib import Path
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
from datetime import datetime
logger = logging.getLogger(__name__)
class TokenManager:
"""
Advanced token manager with encryption and persistent storage
"""
def __init__(self, db_path: str = "database/tokens.db"):
"""
Initialize token manager
Args:
db_path: Path to SQLite database file
"""
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
# Initialize encryption
self.encryption_key = self._get_or_create_encryption_key()
self.cipher = Fernet(self.encryption_key)
# Initialize database
self._init_database()
# Load tokens from environment variables
self._load_env_tokens()
# Token type definitions
self.token_types = {
'read': {
'name': 'Read Token',
'description': 'رمز للقراءة فقط من المستودعات',
'permissions': ['read_public_repos', 'read_private_repos_with_access',
'download_models', 'download_datasets'],
'restrictions': ['cannot_upload', 'cannot_create_repos', 'cannot_modify_content'],
'use_cases': ['تحميل النماذج للتدريب', 'الوصول للبيانات الخاصة', 'التطوير والاختبار'],
'security_level': 'medium',
'recommended_for': 'development'
},
'write': {
'name': 'Write Token',
'description': 'رمز للقراءة والكتابة الكاملة',
'permissions': ['all_read_permissions', 'upload_files', 'create_repositories',
'modify_content', 'manage_repo_settings', 'delete_files'],
'restrictions': ['limited_by_account_permissions'],
'use_cases': ['رفع النماذج المدربة', 'مشاركة النتائج مع المجتمع', 'إدارة المشاريع الشخصية'],
'security_level': 'high',
'recommended_for': 'production'
},
'fine_grained': {
'name': 'Fine-grained Token',
'description': 'رمز بأذونات مخصصة ومحددة',
'permissions': ['custom_per_repository', 'granular_access_control',
'time_limited_access', 'ip_restricted_access'],
'restrictions': ['repository_specific', 'time_limited', 'ip_restricted'],
'use_cases': ['المشاريع التجارية', 'البيانات الحساسة', 'فرق العمل الكبيرة'],
'security_level': 'very_high',
'recommended_for': 'enterprise'
}
}
logger.info("Token Manager initialized")
def _load_env_tokens(self):
"""Load tokens from environment variables"""
env_tokens = {
'read_token': {
'token': os.getenv('HF_TOKEN_READ'),
'type': 'read',
'description': 'رمز القراءة من متغيرات البيئة - للتطوير والتعلم'
},
'write_token': {
'token': os.getenv('HF_TOKEN_WRITE'),
'type': 'write',
'description': 'رمز الكتابة من متغيرات البيئة - لمشاركة النماذج'
},
'fine_grained_token': {
'token': os.getenv('HF_TOKEN_FINE_GRAINED'),
'type': 'fine_grained',
'description': 'رمز مخصص من متغيرات البيئة - للمشاريع التجارية'
}
}
# Save tokens from environment if they exist
for name, token_info in env_tokens.items():
if token_info['token']:
# Check if token already exists
existing_token = self.get_token(name)
if not existing_token:
success = self.save_token(
name=name,
token=token_info['token'],
token_type=token_info['type'],
description=token_info['description'],
is_default=(token_info['type'] == 'read') # Set read as default
)
if success:
logger.info(f"Loaded {token_info['type']} token from environment")
def get_token_for_task(self, task_type: str = 'read') -> Optional[str]:
"""
Get appropriate token for specific task
Args:
task_type: Type of task (read, write, medical, private, upload, download)
Returns:
Appropriate token for the task
"""
# Map task types to token preferences
task_token_map = {
'read': ['read_token', 'fine_grained_token', 'write_token'],
'download': ['read_token', 'fine_grained_token', 'write_token'],
'write': ['write_token', 'fine_grained_token'],
'upload': ['write_token', 'fine_grained_token'],
'medical': ['fine_grained_token', 'write_token', 'read_token'],
'private': ['fine_grained_token', 'write_token'],
'commercial': ['fine_grained_token'],
'enterprise': ['fine_grained_token']
}
# Get preferred token order for task
preferred_tokens = task_token_map.get(task_type, ['read_token'])
# Try to get tokens in order of preference
for token_name in preferred_tokens:
token = self.get_token(token_name)
if token:
logger.debug(f"Using {token_name} for task: {task_type}")
return token
# Fallback to default token
default_token = self.get_token()
if default_token:
logger.debug(f"Using default token for task: {task_type}")
return default_token
# Last resort: try environment variables directly
env_fallbacks = {
'read': 'HF_TOKEN_READ',
'write': 'HF_TOKEN_WRITE',
'medical': 'HF_TOKEN_FINE_GRAINED',
'private': 'HF_TOKEN_FINE_GRAINED'
}
env_var = env_fallbacks.get(task_type, 'HF_TOKEN')
env_token = os.getenv(env_var)
if env_token:
logger.debug(f"Using environment token {env_var} for task: {task_type}")
return env_token
logger.warning(f"No suitable token found for task: {task_type}")
return None
def _get_or_create_encryption_key(self) -> bytes:
"""Get or create encryption key for token storage"""
key_file = self.db_path.parent / ".token_key"
if key_file.exists():
with open(key_file, 'rb') as f:
return f.read()
else:
# Generate new key
password = os.urandom(32) # Random password
salt = os.urandom(16)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(password))
# Save key securely
with open(key_file, 'wb') as f:
f.write(key)
# Set restrictive permissions
os.chmod(key_file, 0o600)
logger.info("Created new encryption key")
return key
def _init_database(self):
"""Initialize SQLite database"""
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
token_type TEXT NOT NULL,
encrypted_token TEXT NOT NULL,
is_default BOOLEAN DEFAULT FALSE,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_used TIMESTAMP,
usage_count INTEGER DEFAULT 0,
is_active BOOLEAN DEFAULT TRUE
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS token_usage_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
token_name TEXT NOT NULL,
operation TEXT NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
success BOOLEAN,
error_message TEXT
)
''')
conn.commit()
logger.info("Database initialized")
def save_token(self, name: str, token: str, token_type: str = 'read',
description: str = '', is_default: bool = False) -> bool:
"""
Save encrypted token to database
Args:
name: Token name/identifier
token: HF token string
token_type: Type of token (read/write/fine_grained)
description: Optional description
is_default: Whether this should be the default token
Returns:
Success status
"""
try:
# Validate token type
if token_type not in self.token_types:
raise ValueError(f"Invalid token type: {token_type}")
# Encrypt token
encrypted_token = self.cipher.encrypt(token.encode()).decode()
with sqlite3.connect(self.db_path) as conn:
# If setting as default, unset other defaults
if is_default:
conn.execute('UPDATE tokens SET is_default = FALSE')
# Insert or update token
conn.execute('''
INSERT OR REPLACE INTO tokens
(name, token_type, encrypted_token, is_default, description, created_at)
VALUES (?, ?, ?, ?, ?, ?)
''', (name, token_type, encrypted_token, is_default, description, datetime.now()))
conn.commit()
logger.info(f"Saved token '{name}' of type '{token_type}'")
return True
except Exception as e:
logger.error(f"Failed to save token '{name}': {e}")
return False
def get_token(self, name: Optional[str] = None) -> Optional[str]:
"""
Get decrypted token by name or default token
Args:
name: Token name (if None, returns default token)
Returns:
Decrypted token string or None
"""
try:
with sqlite3.connect(self.db_path) as conn:
if name:
cursor = conn.execute(
'SELECT encrypted_token FROM tokens WHERE name = ? AND is_active = TRUE',
(name,)
)
else:
cursor = conn.execute(
'SELECT encrypted_token, name FROM tokens WHERE is_default = TRUE AND is_active = TRUE'
)
result = cursor.fetchone()
if result:
encrypted_token = result[0]
token_name = result[1] if not name else name
# Decrypt token
decrypted_token = self.cipher.decrypt(encrypted_token.encode()).decode()
# Update usage statistics
self._update_token_usage(token_name)
return decrypted_token
return None
except Exception as e:
logger.error(f"Failed to get token '{name}': {e}")
return None
def list_tokens(self) -> List[Dict[str, Any]]:
"""
List all saved tokens (without decrypting them)
Returns:
List of token information
"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute('''
SELECT name, token_type, is_default, description, created_at,
last_used, usage_count, is_active
FROM tokens
ORDER BY is_default DESC, created_at DESC
''')
tokens = []
for row in cursor.fetchall():
token_info = {
'name': row[0],
'type': row[1],
'type_info': self.token_types.get(row[1], {}),
'is_default': bool(row[2]),
'description': row[3],
'created_at': row[4],
'last_used': row[5],
'usage_count': row[6],
'is_active': bool(row[7])
}
tokens.append(token_info)
return tokens
except Exception as e:
logger.error(f"Failed to list tokens: {e}")
return []
def delete_token(self, name: str) -> bool:
"""
Delete token from database
Args:
name: Token name to delete
Returns:
Success status
"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute('DELETE FROM tokens WHERE name = ?', (name,))
if cursor.rowcount > 0:
conn.commit()
logger.info(f"Deleted token '{name}'")
return True
else:
logger.warning(f"Token '{name}' not found")
return False
except Exception as e:
logger.error(f"Failed to delete token '{name}': {e}")
return False
def set_default_token(self, name: str) -> bool:
"""
Set a token as the default
Args:
name: Token name to set as default
Returns:
Success status
"""
try:
with sqlite3.connect(self.db_path) as conn:
# Check if token exists
cursor = conn.execute('SELECT id FROM tokens WHERE name = ?', (name,))
if not cursor.fetchone():
logger.error(f"Token '{name}' not found")
return False
# Unset all defaults
conn.execute('UPDATE tokens SET is_default = FALSE')
# Set new default
conn.execute('UPDATE tokens SET is_default = TRUE WHERE name = ?', (name,))
conn.commit()
logger.info(f"Set '{name}' as default token")
return True
except Exception as e:
logger.error(f"Failed to set default token '{name}': {e}")
return False
def validate_token(self, token: str) -> Dict[str, Any]:
"""
Validate HF token by testing API access
Args:
token: Token to validate
Returns:
Validation result
"""
try:
from huggingface_hub import HfApi
api = HfApi(token=token)
user_info = api.whoami()
return {
'valid': True,
'username': user_info.get('name', 'unknown'),
'email': user_info.get('email', ''),
'plan': user_info.get('plan', 'free'),
'message': 'Token is valid and working'
}
except Exception as e:
return {
'valid': False,
'error': str(e),
'message': 'Token validation failed'
}
def _update_token_usage(self, token_name: str):
"""Update token usage statistics"""
try:
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
UPDATE tokens
SET last_used = ?, usage_count = usage_count + 1
WHERE name = ?
''', (datetime.now(), token_name))
conn.commit()
except Exception as e:
logger.error(f"Failed to update token usage: {e}")
def log_token_usage(self, token_name: str, operation: str,
success: bool, error_message: str = ''):
"""Log token usage for auditing"""
try:
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
INSERT INTO token_usage_log
(token_name, operation, success, error_message)
VALUES (?, ?, ?, ?)
''', (token_name, operation, success, error_message))
conn.commit()
except Exception as e:
logger.error(f"Failed to log token usage: {e}")
def get_token_recommendations(self, intended_use: str) -> Dict[str, Any]:
"""
Get token type recommendations based on intended use
Args:
intended_use: Description of intended use
Returns:
Recommendation information
"""
use_lower = intended_use.lower()
if any(word in use_lower for word in ['learn', 'study', 'test', 'develop']):
recommended_type = 'read'
elif any(word in use_lower for word in ['share', 'upload', 'publish', 'create']):
recommended_type = 'write'
elif any(word in use_lower for word in ['commercial', 'enterprise', 'team', 'sensitive']):
recommended_type = 'fine_grained'
else:
recommended_type = 'read' # Default to read
return {
'recommended_type': recommended_type,
'type_info': self.token_types[recommended_type],
'explanation': f"Based on your intended use ('{intended_use}'), we recommend a {recommended_type} token."
}