import logging import json from typing import Any, List, Optional, Dict, Tuple import requests from huggingface_hub import HfApi from app.core.config import settings from app.schemas.dataset_common import ImpactLevel from app.services.redis_client import sync_cache_set, sync_cache_get, generate_cache_key, get_redis_sync import time import asyncio import redis import gzip from datetime import datetime, timezone import os from app.schemas.dataset import ImpactAssessment from app.schemas.dataset_common import DatasetMetrics import httpx import redis.asyncio as aioredis log = logging.getLogger(__name__) api = HfApi() redis_client = redis.Redis(host="redis", port=6379, decode_responses=True) # Thresholds for impact categorization SIZE_THRESHOLD_LOW = 100 * 1024 * 1024 # 100 MB SIZE_THRESHOLD_MEDIUM = 1024 * 1024 * 1024 # 1 GB DOWNLOADS_THRESHOLD_LOW = 1000 DOWNLOADS_THRESHOLD_MEDIUM = 10000 LIKES_THRESHOLD_LOW = 10 LIKES_THRESHOLD_MEDIUM = 100 HF_API_URL = "https://huggingface.co/api/datasets" DATASET_CACHE_TTL = 60 * 60 # 1 hour # Redis and HuggingFace API setup REDIS_KEY = "hf:datasets:all:compressed" REDIS_META_KEY = "hf:datasets:meta" REDIS_TTL = 60 * 60 # 1 hour # Impact thresholds (in bytes) SIZE_LOW = 100 * 1024 * 1024 SIZE_MEDIUM = 1024 * 1024 * 1024 def get_hf_token(): token = os.environ.get("HUGGINGFACEHUB_API_TOKEN") if not token: raise RuntimeError("HUGGINGFACEHUB_API_TOKEN environment variable is not set. Please set it securely.") return token def get_dataset_commits(dataset_id: str, limit: int = 20): from huggingface_hub import HfApi import logging log = logging.getLogger(__name__) api = HfApi() log.info(f"[get_dataset_commits] Fetching commits for dataset_id={dataset_id}") try: commits = api.list_repo_commits(repo_id=dataset_id, repo_type="dataset") log.info(f"[get_dataset_commits] Received {len(commits)} commits for {dataset_id}") except Exception as e: log.error(f"[get_dataset_commits] Error fetching commits for {dataset_id}: {e}", exc_info=True) raise # Let the API layer catch and handle this result = [] for c in commits[:limit]: try: commit_id = getattr(c, "commit_id", "") title = getattr(c, "title", "") message = getattr(c, "message", title) authors = getattr(c, "authors", []) author_name = authors[0] if authors and isinstance(authors, list) else "" created_at = getattr(c, "created_at", None) if created_at: if hasattr(created_at, "isoformat"): date = created_at.isoformat() else: date = str(created_at) else: date = "" result.append({ "id": commit_id or "", "title": title or message or "", "message": message or title or "", "author": {"name": author_name, "email": ""}, "date": date, }) except Exception as e: log.error(f"[get_dataset_commits] Error parsing commit: {e} | Commit: {getattr(c, '__dict__', str(c))}", exc_info=True) log.info(f"[get_dataset_commits] Returning {len(result)} parsed commits for {dataset_id}") return result def get_dataset_files(dataset_id: str) -> List[str]: return api.list_repo_files(repo_id=dataset_id, repo_type="dataset") def get_file_url(dataset_id: str, filename: str, revision: Optional[str] = None) -> str: from huggingface_hub import hf_hub_url return hf_hub_url(repo_id=dataset_id, filename=filename, repo_type="dataset", revision=revision) def get_datasets_page_from_zset(offset: int = 0, limit: int = 10, search: str = None) -> dict: import redis import json redis_client = redis.Redis(host="redis", port=6379, db=0, decode_responses=True) zset_key = "hf:datasets:all:zset" hash_key = "hf:datasets:all:hash" # Get total count total = redis_client.zcard(zset_key) # Get dataset IDs for the page ids = redis_client.zrange(zset_key, offset, offset + limit - 1) # Fetch metadata for those IDs if not ids: return {"items": [], "count": total} items = redis_client.hmget(hash_key, ids) # Parse JSON and filter/search if needed parsed = [] for raw in items: if not raw: continue try: item = json.loads(raw) parsed.append(item) except Exception: continue if search: parsed = [d for d in parsed if search.lower() in (d.get("id") or "").lower()] return {"items": parsed, "count": total} async def _fetch_size(session: httpx.AsyncClient, dataset_id: str) -> Optional[int]: """Fetch dataset size from the datasets server asynchronously.""" url = f"https://datasets-server.huggingface.co/size?dataset={dataset_id}" try: resp = await session.get(url, timeout=30) if resp.status_code == 200: data = resp.json() return data.get("size", {}).get("dataset", {}).get("num_bytes_original_files") except Exception as e: log.warning(f"Could not fetch size for {dataset_id}: {e}") return None async def _fetch_sizes(dataset_ids: List[str]) -> Dict[str, Optional[int]]: """Fetch dataset sizes in parallel.""" results: Dict[str, Optional[int]] = {} async with httpx.AsyncClient() as session: tasks = {dataset_id: asyncio.create_task(_fetch_size(session, dataset_id)) for dataset_id in dataset_ids} for dataset_id, task in tasks.items(): results[dataset_id] = await task return results def process_datasets_page(offset, limit): """ Fetch and process a single page of datasets from Hugging Face and cache them in Redis. """ import redis import os import json import asyncio log = logging.getLogger(__name__) log.info(f"[process_datasets_page] ENTRY: offset={offset}, limit={limit}") token = os.environ.get("HUGGINGFACEHUB_API_TOKEN") if not token: log.error("[process_datasets_page] HUGGINGFACEHUB_API_TOKEN environment variable is not set.") raise RuntimeError("HUGGINGFACEHUB_API_TOKEN environment variable is not set. Please set it securely.") headers = { "Authorization": f"Bearer {token}", "User-Agent": "Mozilla/5.0 (compatible; CollinearTool/1.0; +https://yourdomain.com)" } params = {"limit": limit, "offset": offset, "full": "True"} redis_client = redis.Redis(host="redis", port=6379, db=0, decode_responses=True) stream_key = "hf:datasets:all:stream" zset_key = "hf:datasets:all:zset" hash_key = "hf:datasets:all:hash" try: log.info(f"[process_datasets_page] Requesting {HF_API_URL} with params={params}") response = requests.get(HF_API_URL, headers=headers, params=params, timeout=120) response.raise_for_status() page_items = response.json() log.info(f"[process_datasets_page] Received {len(page_items)} datasets at offset {offset}") dataset_ids = [ds.get("id") for ds in page_items] size_map = asyncio.run(_fetch_sizes(dataset_ids)) for ds in page_items: dataset_id = ds.get("id") size_bytes = size_map.get(dataset_id) downloads = ds.get("downloads") likes = ds.get("likes") impact_level, assessment_method = determine_impact_level_by_criteria(size_bytes, downloads, likes) metrics = DatasetMetrics(size_bytes=size_bytes, downloads=downloads, likes=likes) thresholds = { "size_bytes": { "low": str(100 * 1024 * 1024), "medium": str(1 * 1024 * 1024 * 1024), "high": str(10 * 1024 * 1024 * 1024) } } impact_assessment = ImpactAssessment( dataset_id=dataset_id, impact_level=impact_level, assessment_method=assessment_method, metrics=metrics, thresholds=thresholds ).model_dump() item = { "id": dataset_id, "name": ds.get("name"), "description": ds.get("description"), "size_bytes": size_bytes, "impact_level": impact_level.value if isinstance(impact_level, ImpactLevel) else impact_level, "downloads": downloads, "likes": likes, "tags": ds.get("tags", []), "impact_assessment": json.dumps(impact_assessment) } final_item = {} for k, v in item.items(): if isinstance(v, list) or isinstance(v, dict): final_item[k] = json.dumps(v) elif v is None: final_item[k] = 'null' else: final_item[k] = str(v) redis_client.xadd(stream_key, final_item) redis_client.zadd(zset_key, {dataset_id: offset}) redis_client.hset(hash_key, dataset_id, json.dumps(item)) log.info(f"[process_datasets_page] EXIT: Cached {len(page_items)} datasets at offset {offset}") return len(page_items) except Exception as exc: log.error(f"[process_datasets_page] ERROR: offset={offset}, limit={limit}, exc={exc}", exc_info=True) raise def refresh_datasets_cache(): """ Orchestrator: Enqueue Celery tasks to fetch all Hugging Face datasets in parallel. Uses direct calls to HF API. """ import requests log.info("[refresh_datasets_cache] Orchestrating dataset fetch tasks using direct HF API calls.") token = os.environ.get("HUGGINGFACEHUB_API_TOKEN") if not token: log.error("[refresh_datasets_cache] HUGGINGFACEHUB_API_TOKEN environment variable is not set.") raise RuntimeError("HUGGINGFACEHUB_API_TOKEN environment variable is not set. Please set it securely.") headers = { "Authorization": f"Bearer {token}", "User-Agent": "Mozilla/5.0 (compatible; CollinearTool/1.0; +https://yourdomain.com)" } limit = 500 params = {"limit": 1, "offset": 0} try: response = requests.get(HF_API_URL, headers=headers, params=params, timeout=120) response.raise_for_status() total_str = response.headers.get('X-Total-Count') if not total_str: log.error("[refresh_datasets_cache] 'X-Total-Count' header not found in HF API response.") raise ValueError("'X-Total-Count' header missing from Hugging Face API response.") total = int(total_str) log.info(f"[refresh_datasets_cache] Total datasets reported by HF API: {total}") except requests.RequestException as e: log.error(f"[refresh_datasets_cache] Error fetching total dataset count from HF API: {e}") raise except ValueError as e: log.error(f"[refresh_datasets_cache] Error parsing total dataset count: {e}") raise num_pages = (total + limit - 1) // limit from app.tasks.dataset_tasks import fetch_datasets_page from celery import group tasks = [] for page_num in range(num_pages): offset = page_num * limit tasks.append(fetch_datasets_page.s(offset, limit)) log.info(f"[refresh_datasets_cache] Scheduled page at offset {offset}, limit {limit}.") if tasks: group(tasks).apply_async() log.info(f"[refresh_datasets_cache] Enqueued {len(tasks)} fetch tasks.") else: log.warning("[refresh_datasets_cache] No dataset pages found to schedule.") def determine_impact_level_by_criteria(size_bytes, downloads=None, likes=None): try: size = int(size_bytes) if size_bytes not in (None, 'null') else 0 except Exception: size = 0 # Prefer size_bytes if available if size >= 10 * 1024 * 1024 * 1024: return ("high", "large_size") elif size >= 1 * 1024 * 1024 * 1024: return ("medium", "medium_size") elif size >= 100 * 1024 * 1024: return ("low", "small_size") # Fallback to downloads if size_bytes is missing or too small if downloads is not None: try: downloads = int(downloads) if downloads >= 100000: return ("high", "downloads") elif downloads >= 10000: return ("medium", "downloads") elif downloads >= 1000: return ("low", "downloads") except Exception: pass # Fallback to likes if downloads is missing if likes is not None: try: likes = int(likes) if likes >= 1000: return ("high", "likes") elif likes >= 100: return ("medium", "likes") elif likes >= 10: return ("low", "likes") except Exception: pass return ("not_available", "size_and_downloads_and_likes_unknown") def get_dataset_size(dataset: dict, dataset_id: str = None): """ Extract the size in bytes from a dataset dictionary. Tries multiple locations based on possible HuggingFace API responses. """ # Try top-level key size_bytes = dataset.get("size_bytes") if size_bytes not in (None, 'null'): return size_bytes # Try nested structure from the size API size_bytes = ( dataset.get("size", {}) .get("dataset", {}) .get("num_bytes_original_files") ) if size_bytes not in (None, 'null'): return size_bytes # Try metrics or info sub-dictionaries if present for key in ["metrics", "info"]: sub = dataset.get(key, {}) if isinstance(sub, dict): size_bytes = sub.get("size_bytes") if size_bytes not in (None, 'null'): return size_bytes # Not found return None async def get_datasets_page_from_zset_async(offset: int = 0, limit: int = 10, search: str = None) -> dict: redis_client = aioredis.Redis(host="redis", port=6379, db=0, decode_responses=True) zset_key = "hf:datasets:all:zset" hash_key = "hf:datasets:all:hash" total = await redis_client.zcard(zset_key) ids = await redis_client.zrange(zset_key, offset, offset + limit - 1) if not ids: return {"items": [], "count": total} items = await redis_client.hmget(hash_key, ids) parsed = [] for raw in items: if not raw: continue try: item = json.loads(raw) parsed.append(item) except Exception: continue if search: parsed = [d for d in parsed if search.lower() in (d.get("id") or "").lower()] return {"items": parsed, "count": total} async def get_dataset_commits_async(dataset_id: str, limit: int = 20): from huggingface_hub import HfApi import logging log = logging.getLogger(__name__) api = HfApi() log.info(f"[get_dataset_commits_async] Fetching commits for dataset_id={dataset_id}") try: # huggingface_hub is sync, so run in threadpool import anyio commits = await anyio.to_thread.run_sync(api.list_repo_commits, repo_id=dataset_id, repo_type="dataset") log.info(f"[get_dataset_commits_async] Received {len(commits)} commits for {dataset_id}") except Exception as e: log.error(f"[get_dataset_commits_async] Error fetching commits for {dataset_id}: {e}", exc_info=True) raise result = [] for c in commits[:limit]: try: commit_id = getattr(c, "commit_id", "") title = getattr(c, "title", "") message = getattr(c, "message", title) authors = getattr(c, "authors", []) author_name = authors[0] if authors and isinstance(authors, list) else "" created_at = getattr(c, "created_at", None) if created_at: if hasattr(created_at, "isoformat"): date = created_at.isoformat() else: date = str(created_at) else: date = "" result.append({ "id": commit_id or "", "title": title or message or "", "message": message or title or "", "author": {"name": author_name, "email": ""}, "date": date, }) except Exception as e: log.error(f"[get_dataset_commits_async] Error parsing commit: {e} | Commit: {getattr(c, '__dict__', str(c))}", exc_info=True) log.info(f"[get_dataset_commits_async] Returning {len(result)} parsed commits for {dataset_id}") return result async def get_dataset_files_async(dataset_id: str) -> List[str]: from huggingface_hub import HfApi import anyio api = HfApi() # huggingface_hub is sync, so run in threadpool return await anyio.to_thread.run_sync(api.list_repo_files, repo_id=dataset_id, repo_type="dataset") async def get_file_url_async(dataset_id: str, filename: str, revision: Optional[str] = None) -> str: from huggingface_hub import hf_hub_url import anyio # huggingface_hub is sync, so run in threadpool return await anyio.to_thread.run_sync(hf_hub_url, repo_id=dataset_id, filename=filename, repo_type="dataset", revision=revision) # Fetch and cache all datasets class EnhancedJSONEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime): return obj.isoformat() return super().default(obj) async def fetch_size(session, dataset_id, token=None): url = f"https://datasets-server.huggingface.co/size?dataset={dataset_id}" headers = {"Authorization": f"Bearer {token}"} if token else {} try: resp = await session.get(url, headers=headers, timeout=30) if resp.status_code == 200: data = resp.json() return dataset_id, data.get("size", {}).get("dataset", {}).get("num_bytes_original_files") except Exception as e: log.warning(f"Could not fetch size for {dataset_id}: {e}") return dataset_id, None async def fetch_all_sizes(dataset_ids, token=None, batch_size=50): results = {} async with httpx.AsyncClient() as session: for i in range(0, len(dataset_ids), batch_size): batch = dataset_ids[i:i+batch_size] tasks = [fetch_size(session, ds_id, token) for ds_id in batch] batch_results = await asyncio.gather(*tasks) for ds_id, size in batch_results: results[ds_id] = size return results def fetch_and_cache_all_datasets(token: str): api = HfApi(token=token) log.info("Fetching all datasets from Hugging Face Hub...") all_datasets = list(api.list_datasets()) all_datasets_dicts = [] dataset_ids = [d.id for d in all_datasets] # Fetch all sizes in batches sizes = asyncio.run(fetch_all_sizes(dataset_ids, token=token, batch_size=50)) for d in all_datasets: data = d.__dict__ size_bytes = sizes.get(d.id) downloads = data.get("downloads") likes = data.get("likes") data["size_bytes"] = size_bytes impact_level, _ = determine_impact_level_by_criteria(size_bytes, downloads, likes) data["impact_level"] = impact_level all_datasets_dicts.append(data) compressed = gzip.compress(json.dumps(all_datasets_dicts, cls=EnhancedJSONEncoder).encode("utf-8")) r = redis.Redis(host="redis", port=6379, decode_responses=False) r.set(REDIS_KEY, compressed) log.info(f"Cached {len(all_datasets_dicts)} datasets in Redis under {REDIS_KEY}") return len(all_datasets_dicts) # Native pagination from cache def get_datasets_page_from_cache(limit: int, offset: int): r = redis.Redis(host="redis", port=6379, decode_responses=False) compressed = r.get(REDIS_KEY) if not compressed: return {"error": "Cache not found. Please refresh datasets."}, 404 all_datasets = json.loads(gzip.decompress(compressed).decode("utf-8")) total = len(all_datasets) if offset < 0 or offset >= total: return {"error": "Offset out of range.", "total": total}, 400 page = all_datasets[offset:offset+limit] total_pages = (total + limit - 1) // limit current_page = (offset // limit) + 1 next_page = current_page + 1 if offset + limit < total else None prev_page = current_page - 1 if current_page > 1 else None return { "total": total, "current_page": current_page, "total_pages": total_pages, "next_page": next_page, "prev_page": prev_page, "items": page }, 200