dataset-tool / app /tasks /dataset_tasks.py
iaroy's picture
Deploy full application code
fdc5d7a
import logging
import time
import asyncio
from datetime import datetime, timezone
from typing import Dict, List, Any, Optional, Tuple
from celery import Task, shared_task
from app.core.celery_app import get_celery_app
from app.services.hf_datasets import (
determine_impact_level_by_criteria,
get_hf_token,
get_dataset_size,
refresh_datasets_cache,
fetch_and_cache_all_datasets,
)
from app.services.redis_client import sync_cache_set, sync_cache_get, generate_cache_key
from app.core.config import settings
import requests
import os
# Configure logging
logger = logging.getLogger(__name__)
# Get Celery app instance
celery_app = get_celery_app()
# Constants
DATASET_CACHE_TTL = 60 * 60 * 24 * 30 # 30 days
BATCH_PROGRESS_CACHE_TTL = 60 * 60 * 24 * 7 # 7 days for batch progress
DATASET_SIZE_CACHE_TTL = 60 * 60 * 24 * 30 # 30 days for size info
@celery_app.task(name="app.tasks.dataset_tasks.refresh_hf_datasets_cache")
def refresh_hf_datasets_cache():
"""Celery task to refresh the HuggingFace datasets cache in Redis."""
logger.info("Starting refresh of HuggingFace datasets cache via Celery task.")
try:
refresh_datasets_cache()
logger.info("Successfully refreshed HuggingFace datasets cache.")
return {"status": "success"}
except Exception as e:
logger.error(f"Failed to refresh HuggingFace datasets cache: {e}")
return {"status": "error", "error": str(e)}
@shared_task(bind=True, max_retries=3, default_retry_delay=10)
def fetch_datasets_page(self, offset, limit):
"""
Celery task to fetch and cache a single page of datasets from Hugging Face.
Retries on failure.
"""
logger.info(f"[fetch_datasets_page] ENTRY: offset={offset}, limit={limit}")
try:
from app.services.hf_datasets import process_datasets_page
logger.info(f"[fetch_datasets_page] Calling process_datasets_page with offset={offset}, limit={limit}")
result = process_datasets_page(offset, limit)
logger.info(f"[fetch_datasets_page] SUCCESS: offset={offset}, limit={limit}, result={result}")
return result
except Exception as exc:
logger.error(f"[fetch_datasets_page] ERROR: offset={offset}, limit={limit}, exc={exc}", exc_info=True)
raise self.retry(exc=exc)
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def refresh_hf_datasets_full_cache(self):
logger.info("[refresh_hf_datasets_full_cache] Starting full Hugging Face datasets cache refresh.")
try:
token = os.environ.get("HUGGINGFACEHUB_API_TOKEN")
if not token:
logger.error("[refresh_hf_datasets_full_cache] HUGGINGFACEHUB_API_TOKEN not set.")
return {"status": "error", "error": "HUGGINGFACEHUB_API_TOKEN not set"}
count = fetch_and_cache_all_datasets(token)
logger.info(f"[refresh_hf_datasets_full_cache] Cached {count} datasets.")
return {"status": "ok", "cached": count}
except Exception as exc:
logger.error(f"[refresh_hf_datasets_full_cache] ERROR: {exc}", exc_info=True)
raise self.retry(exc=exc)