dataset-tool / app /core /celery_app.py
iaroy's picture
Deploy full application code
fdc5d7a
"""Celery configuration for task processing."""
import logging
from celery import Celery
from celery.signals import task_failure, task_success, worker_ready, worker_shutdown
from app.core.config import settings
# Configure logging
logger = logging.getLogger(__name__)
# Celery configuration
celery_app = Celery(
"dataset_impacts",
broker=settings.REDIS_URL,
backend=settings.REDIS_URL,
)
# Configure Celery settings
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
worker_concurrency=settings.WORKER_CONCURRENCY,
task_acks_late=True, # Tasks are acknowledged after execution
task_reject_on_worker_lost=True, # Tasks are rejected if worker is terminated during execution
task_time_limit=3600, # 1 hour timeout per task
task_soft_time_limit=3000, # Soft timeout (30 minutes) - allows for graceful shutdown
worker_prefetch_multiplier=1, # Single prefetch - improves fair distribution of tasks
broker_connection_retry=True,
broker_connection_retry_on_startup=True,
broker_connection_max_retries=10,
broker_pool_limit=10, # Connection pool size
result_expires=60 * 60 * 24, # Results expire after 24 hours
task_track_started=True, # Track when tasks are started
)
# Set up task routes for different task types
celery_app.conf.task_routes = {
"app.tasks.dataset_tasks.*": {"queue": "dataset_impacts"},
"app.tasks.maintenance.*": {"queue": "maintenance"},
}
# Configure retry settings
celery_app.conf.task_default_retry_delay = 30 # 30 seconds
celery_app.conf.task_max_retries = 3
# Setup beat schedule for periodic tasks if enabled
celery_app.conf.beat_schedule = {
"cleanup-stale-tasks": {
"task": "app.tasks.maintenance.cleanup_stale_tasks",
"schedule": 3600.0, # Run every hour
},
"health-check": {
"task": "app.tasks.maintenance.health_check",
"schedule": 300.0, # Run every 5 minutes
},
"refresh-hf-datasets-cache": {
"task": "app.tasks.dataset_tasks.refresh_hf_datasets_cache",
"schedule": 3600.0, # Run every hour
},
}
# Signal handlers for monitoring and logging
@task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, **kwargs):
"""Log failed tasks."""
logger.error(f"Task {task_id} {sender.name} failed: {exception}")
@task_success.connect
def task_success_handler(sender=None, result=None, **kwargs):
"""Log successful tasks."""
task_name = sender.name if sender else "Unknown"
logger.info(f"Task {task_name} completed successfully")
@worker_ready.connect
def worker_ready_handler(**kwargs):
"""Log when worker is ready."""
logger.info(f"Celery worker ready: {kwargs.get('hostname')}")
@worker_shutdown.connect
def worker_shutdown_handler(**kwargs):
"""Log when worker is shutting down."""
logger.info(f"Celery worker shutting down: {kwargs.get('hostname')}")
def get_celery_app():
"""Get the Celery app instance."""
# Import all tasks to ensure they're registered
try:
# Using the improved app.tasks module which properly imports all tasks
import app.tasks
logger.info(f"Tasks successfully imported; registered {len(celery_app.tasks)} tasks")
except ImportError as e:
logger.error(f"Error importing tasks: {e}")
return celery_app