File size: 1,437 Bytes
c3bf538
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from celery_worker import celery
from core.database import SessionLocal
from models.analysis_job import AnalysisJob
from uuid import UUID

@celery.task(bind=True)
def coordinator_task(self, results, job_id: str):
    """
    This task receives the results from all previous tasks, assembles the
    final result, and saves it to the database ONCE.
    """
    print(f"Coordinator task started for job {job_id}...")
    with SessionLocal() as db:
        job = db.query(AnalysisJob).filter(AnalysisJob.id == UUID(job_id)).first()
        if not job:
            print(f"Job {job_id} not found in coordinator.")
            return

        try:
            # results[0] is from get_data_task
            # results[1] is from get_intelligence_task
            # results[2] is from get_llm_analysis_task
            
            final_result = {
                **results[0], # Unpack the dictionary from the data task
                "intelligence_briefing": results[1],
                "llm_analysis": results[2],
            }

            job.result = final_result
            job.status = "SUCCESS"
            db.commit()
            print(f"Coordinator task for job {job_id} successfully saved final result.")
        except Exception as e:
            print(f"Error in coordinator for job {job_id}: {e}")
            job.status = "FAILED"
            job.result = {"error": f"Final assembly failed: {str(e)}"}
            db.commit()