Spaces:
Running
Running
""" | |
Database Management System for Knowledge Distillation Platform | |
نظام إدارة قواعد البيانات لمنصة تقطير المعرفة | |
""" | |
import json | |
import logging | |
import os | |
from pathlib import Path | |
from typing import Dict, List, Any, Optional | |
from datetime import datetime | |
import asyncio | |
from datasets import load_dataset, Dataset | |
from huggingface_hub import list_datasets | |
logger = logging.getLogger(__name__) | |
class DatabaseManager: | |
""" | |
Comprehensive database management system for the platform | |
نظام إدارة قواعد البيانات الشامل للمنصة | |
""" | |
def __init__(self, storage_path: str = "data/databases"): | |
self.storage_path = Path(storage_path) | |
self.storage_path.mkdir(parents=True, exist_ok=True) | |
self.config_file = self.storage_path / "databases_config.json" | |
self.selected_databases_file = self.storage_path / "selected_databases.json" | |
# Load existing configuration | |
self.databases_config = self._load_config() | |
self.selected_databases = self._load_selected_databases() | |
logger.info(f"Database Manager initialized with {len(self.databases_config)} configured databases") | |
def _load_config(self) -> Dict[str, Any]: | |
"""Load databases configuration""" | |
try: | |
if self.config_file.exists(): | |
with open(self.config_file, 'r', encoding='utf-8') as f: | |
return json.load(f) | |
else: | |
# Initialize with default medical datasets | |
default_config = self._get_default_medical_datasets() | |
self._save_config(default_config) | |
return default_config | |
except Exception as e: | |
logger.error(f"Error loading databases config: {e}") | |
return {} | |
def _save_config(self, config: Dict[str, Any]): | |
"""Save databases configuration""" | |
try: | |
with open(self.config_file, 'w', encoding='utf-8') as f: | |
json.dump(config, f, indent=2, ensure_ascii=False) | |
except Exception as e: | |
logger.error(f"Error saving databases config: {e}") | |
def _load_selected_databases(self) -> List[str]: | |
"""Load selected databases list""" | |
try: | |
if self.selected_databases_file.exists(): | |
with open(self.selected_databases_file, 'r', encoding='utf-8') as f: | |
return json.load(f) | |
else: | |
return [] | |
except Exception as e: | |
logger.error(f"Error loading selected databases: {e}") | |
return [] | |
def _save_selected_databases(self): | |
"""Save selected databases list""" | |
try: | |
with open(self.selected_databases_file, 'w', encoding='utf-8') as f: | |
json.dump(self.selected_databases, f, indent=2, ensure_ascii=False) | |
except Exception as e: | |
logger.error(f"Error saving selected databases: {e}") | |
def _get_default_medical_datasets(self) -> Dict[str, Any]: | |
"""Get default medical datasets configuration""" | |
return { | |
"medical_meadow_medical_flashcards": { | |
"name": "Medical Meadow Medical Flashcards", | |
"name_ar": "بطاقات تعليمية طبية", | |
"dataset_id": "medalpaca/medical_meadow_medical_flashcards", | |
"category": "medical", | |
"description": "Medical flashcards for educational purposes", | |
"description_ar": "بطاقات تعليمية طبية لأغراض التعليم", | |
"size": "~50MB", | |
"language": "English", | |
"modality": "text", | |
"license": "Apache 2.0", | |
"added_date": datetime.now().isoformat(), | |
"status": "available" | |
}, | |
"pubmed_qa": { | |
"name": "PubMed QA", | |
"name_ar": "أسئلة وأجوبة PubMed", | |
"dataset_id": "pubmed_qa", | |
"category": "medical", | |
"description": "Question answering dataset based on PubMed abstracts", | |
"description_ar": "مجموعة بيانات أسئلة وأجوبة مبنية على ملخصات PubMed", | |
"size": "~100MB", | |
"language": "English", | |
"modality": "text", | |
"license": "MIT", | |
"added_date": datetime.now().isoformat(), | |
"status": "available" | |
}, | |
"medical_dialog": { | |
"name": "Medical Dialog", | |
"name_ar": "حوارات طبية", | |
"dataset_id": "medical_dialog", | |
"category": "medical", | |
"description": "Medical conversation dataset", | |
"description_ar": "مجموعة بيانات المحادثات الطبية", | |
"size": "~200MB", | |
"language": "English/Chinese", | |
"modality": "text", | |
"license": "CC BY 4.0", | |
"added_date": datetime.now().isoformat(), | |
"status": "available" | |
} | |
} | |
async def search_huggingface_datasets(self, query: str, limit: int = 20) -> List[Dict[str, Any]]: | |
"""Search for datasets on Hugging Face""" | |
try: | |
logger.info(f"Searching Hugging Face for datasets: {query}") | |
# Search datasets | |
datasets = list_datasets(search=query, limit=limit) | |
results = [] | |
for dataset in datasets: | |
try: | |
dataset_info = { | |
"id": dataset.id, | |
"name": dataset.id.split('/')[-1], | |
"author": dataset.author if hasattr(dataset, 'author') else dataset.id.split('/')[0], | |
"description": getattr(dataset, 'description', 'No description available'), | |
"tags": getattr(dataset, 'tags', []), | |
"downloads": getattr(dataset, 'downloads', 0), | |
"likes": getattr(dataset, 'likes', 0), | |
"created_at": getattr(dataset, 'created_at', None), | |
"last_modified": getattr(dataset, 'last_modified', None) | |
} | |
results.append(dataset_info) | |
except Exception as e: | |
logger.warning(f"Error processing dataset {dataset.id}: {e}") | |
continue | |
logger.info(f"Found {len(results)} datasets") | |
return results | |
except Exception as e: | |
logger.error(f"Error searching Hugging Face datasets: {e}") | |
return [] | |
async def add_database(self, database_info: Dict[str, Any]) -> bool: | |
"""Add a new database to the configuration""" | |
try: | |
database_id = database_info.get('dataset_id') or database_info.get('id') | |
if not database_id: | |
raise ValueError("Database ID is required") | |
# Validate dataset exists and is accessible | |
validation_result = await self.validate_dataset(database_id) | |
if not validation_result['valid']: | |
raise ValueError(f"Dataset validation failed: {validation_result['error']}") | |
# Prepare database configuration | |
config = { | |
"name": database_info.get('name', database_id.split('/')[-1]), | |
"name_ar": database_info.get('name_ar', ''), | |
"dataset_id": database_id, | |
"category": database_info.get('category', 'general'), | |
"description": database_info.get('description', ''), | |
"description_ar": database_info.get('description_ar', ''), | |
"size": database_info.get('size', 'Unknown'), | |
"language": database_info.get('language', 'Unknown'), | |
"modality": database_info.get('modality', 'text'), | |
"license": database_info.get('license', 'Unknown'), | |
"added_date": datetime.now().isoformat(), | |
"status": "available", | |
"validation": validation_result | |
} | |
# Add to configuration | |
self.databases_config[database_id] = config | |
self._save_config(self.databases_config) | |
logger.info(f"Added database: {database_id}") | |
return True | |
except Exception as e: | |
logger.error(f"Error adding database: {e}") | |
return False | |
async def validate_dataset(self, dataset_id: str) -> Dict[str, Any]: | |
"""Validate that a dataset exists and is accessible""" | |
try: | |
logger.info(f"Validating dataset: {dataset_id}") | |
# Try to load dataset info | |
dataset = load_dataset(dataset_id, split="train", streaming=True) | |
# Get basic info | |
sample = next(iter(dataset)) | |
features = list(sample.keys()) if sample else [] | |
return { | |
"valid": True, | |
"features": features, | |
"sample_keys": features, | |
"accessible": True, | |
"error": None | |
} | |
except Exception as e: | |
logger.warning(f"Dataset validation failed for {dataset_id}: {e}") | |
return { | |
"valid": False, | |
"features": [], | |
"sample_keys": [], | |
"accessible": False, | |
"error": str(e) | |
} | |
def get_all_databases(self) -> Dict[str, Any]: | |
"""Get all configured databases""" | |
return self.databases_config | |
def get_selected_databases(self) -> List[str]: | |
"""Get list of selected database IDs""" | |
return self.selected_databases | |
def select_database(self, database_id: str) -> bool: | |
"""Select a database for use""" | |
try: | |
if database_id not in self.databases_config: | |
raise ValueError(f"Database {database_id} not found in configuration") | |
if database_id not in self.selected_databases: | |
self.selected_databases.append(database_id) | |
self._save_selected_databases() | |
logger.info(f"Selected database: {database_id}") | |
return True | |
except Exception as e: | |
logger.error(f"Error selecting database: {e}") | |
return False | |
def deselect_database(self, database_id: str) -> bool: | |
"""Deselect a database""" | |
try: | |
if database_id in self.selected_databases: | |
self.selected_databases.remove(database_id) | |
self._save_selected_databases() | |
logger.info(f"Deselected database: {database_id}") | |
return True | |
except Exception as e: | |
logger.error(f"Error deselecting database: {e}") | |
return False | |
def remove_database(self, database_id: str) -> bool: | |
"""Remove a database from configuration""" | |
try: | |
if database_id in self.databases_config: | |
del self.databases_config[database_id] | |
self._save_config(self.databases_config) | |
if database_id in self.selected_databases: | |
self.selected_databases.remove(database_id) | |
self._save_selected_databases() | |
logger.info(f"Removed database: {database_id}") | |
return True | |
except Exception as e: | |
logger.error(f"Error removing database: {e}") | |
return False | |
def get_database_info(self, database_id: str) -> Optional[Dict[str, Any]]: | |
"""Get detailed information about a specific database""" | |
return self.databases_config.get(database_id) | |
def get_databases_by_category(self, category: str) -> Dict[str, Any]: | |
"""Get databases filtered by category""" | |
return { | |
db_id: db_info | |
for db_id, db_info in self.databases_config.items() | |
if db_info.get('category') == category | |
} | |
async def load_selected_datasets(self, max_samples: int = 1000) -> Dict[str, Any]: | |
"""Load data from selected datasets""" | |
loaded_datasets = {} | |
for database_id in self.selected_databases: | |
try: | |
logger.info(f"Loading dataset: {database_id}") | |
dataset = load_dataset(database_id, split="train", streaming=True) | |
samples = list(dataset.take(max_samples)) | |
loaded_datasets[database_id] = { | |
"samples": samples, | |
"count": len(samples), | |
"info": self.databases_config.get(database_id, {}) | |
} | |
logger.info(f"Loaded {len(samples)} samples from {database_id}") | |
except Exception as e: | |
logger.error(f"Error loading dataset {database_id}: {e}") | |
loaded_datasets[database_id] = { | |
"samples": [], | |
"count": 0, | |
"error": str(e), | |
"info": self.databases_config.get(database_id, {}) | |
} | |
return loaded_datasets | |