from celery_worker import celery | |
from core.database import SessionLocal | |
from models.analysis_job import AnalysisJob | |
from uuid import UUID | |
def coordinator_task(self, results, job_id: str): | |
""" | |
This task receives the results from all previous tasks, assembles the | |
final result, and saves it to the database ONCE. | |
""" | |
print(f"Coordinator task started for job {job_id}...") | |
with SessionLocal() as db: | |
job = db.query(AnalysisJob).filter(AnalysisJob.id == UUID(job_id)).first() | |
if not job: | |
print(f"Job {job_id} not found in coordinator.") | |
return | |
try: | |
# results[0] is from get_data_task | |
# results[1] is from get_intelligence_task | |
# results[2] is from get_llm_analysis_task | |
final_result = { | |
**results[0], # Unpack the dictionary from the data task | |
"intelligence_briefing": results[1], | |
"llm_analysis": results[2], | |
} | |
job.result = final_result | |
job.status = "SUCCESS" | |
db.commit() | |
print(f"Coordinator task for job {job_id} successfully saved final result.") | |
except Exception as e: | |
print(f"Error in coordinator for job {job_id}: {e}") | |
job.status = "FAILED" | |
job.result = {"error": f"Final assembly failed: {str(e)}"} | |
db.commit() |