Spaces:
Running
Running
File size: 7,667 Bytes
bd161ec |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
"""
Celery tasks for document export functionality.
"""
import asyncio
from datetime import datetime, timezone
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update
import logging
import os
import json
from docx import Document
from docx.shared import Inches
from tasks.celery_app import celery_app
from database import AsyncSessionLocal
from models import ExportTask, Project, Phase, ExportStatus, ExportFormat
from services.export_service import ExportService
logger = logging.getLogger(__name__)
@celery_app.task(bind=True)
def generate_export(self, export_task_id: str):
"""Generate export file for a project."""
try:
# Run async function in event loop
return asyncio.run(_generate_export_async(export_task_id, self.request.id))
except Exception as e:
logger.error(f"Export task {export_task_id} failed: {e}")
# Update task status to failed
asyncio.run(_mark_export_failed(export_task_id, str(e)))
raise
async def generate_export(export_task_id: str):
"""Simple export function that can be called directly without Celery."""
try:
return await _generate_export_async(export_task_id, None)
except Exception as e:
logger.error(f"Export task {export_task_id} failed: {e}")
await _mark_export_failed(export_task_id, str(e))
raise
async def _generate_export_async(export_task_id: str, celery_task_id: str = None):
"""Async function to generate export."""
async with AsyncSessionLocal() as db:
try:
# Get export task
result = await db.execute(
select(ExportTask).where(ExportTask.id == export_task_id)
)
export_task = result.scalar_one_or_none()
if not export_task:
raise Exception("Export task not found")
# Update status to in progress
await db.execute(
update(ExportTask)
.where(ExportTask.id == export_task_id)
.values(
status=ExportStatus.IN_PROGRESS,
celery_task_id=celery_task_id
)
)
await db.commit()
# Get project data manually to avoid selectinload issues
project_result = await db.execute(
select(Project).where(Project.id == export_task.project_id)
)
project = project_result.scalar_one()
if not project:
raise Exception("Project not found")
# Get project owner
owner_result = await db.execute(
select(Project.owner).where(Project.id == project.id)
)
owner = owner_result.scalar_one_or_none()
# Get project phases
phases_result = await db.execute(
select(Phase).where(Phase.project_id == project.id).order_by(Phase.phase_number)
)
phases = phases_result.scalars().all()
# Get drafts for each phase
for phase in phases:
drafts_result = await db.execute(
select(Phase.drafts).where(Phase.id == phase.id)
)
phase.drafts = drafts_result.scalars().all()
# Prepare project data for export
project_data = await _prepare_project_data(project, owner, phases)
# Generate file path
file_path = ExportService.get_export_file_path(
export_task_id,
export_task.format.value
)
# Export based on format
if export_task.format == ExportFormat.PDF:
await ExportService.export_to_pdf(project_data, file_path)
elif export_task.format == ExportFormat.WORD:
await ExportService.export_to_word(project_data, file_path)
elif export_task.format == ExportFormat.JSON:
await ExportService.export_to_json(project_data, file_path)
else:
raise Exception(f"Unsupported export format: {export_task.format}")
# Update task as completed
await db.execute(
update(ExportTask)
.where(ExportTask.id == export_task_id)
.values(
status=ExportStatus.COMPLETED,
file_path=file_path,
completed_at=datetime.now(timezone.utc)
)
)
await db.commit()
logger.info(f"Export task {export_task_id} completed successfully")
return {"status": "completed", "file_path": file_path}
except Exception as e:
logger.error(f"Export generation error: {e}")
await _mark_export_failed_in_session(db, export_task_id, str(e))
raise
async def _prepare_project_data(project: Project, owner, phases) -> dict:
"""Prepare project data for export."""
# Sort phases by phase number
sorted_phases = sorted(phases, key=lambda p: p.phase_number)
# Prepare phases data
phases_data = []
for phase in sorted_phases:
phase_data = {
"phase_number": phase.phase_number,
"title": phase.title,
"description": phase.description,
"user_input": phase.user_input,
"ai_response": phase.ai_response,
"status": phase.status.value,
"created_at": phase.created_at.isoformat() if phase.created_at else None,
"updated_at": phase.updated_at.isoformat() if phase.updated_at else None,
"drafts": [
{
"version": draft.version,
"content": draft.content,
"created_at": draft.created_at.isoformat() if draft.created_at else None
}
for draft in sorted(phase.drafts, key=lambda d: d.version, reverse=True)
]
}
phases_data.append(phase_data)
# Prepare project data
project_data = {
"id": project.id,
"title": project.title,
"description": project.description,
"created_at": project.created_at.isoformat() if project.created_at else None,
"updated_at": project.updated_at.isoformat() if project.updated_at else None,
"owner": {
"id": owner.id,
"full_name": owner.full_name,
"email": owner.email
} if owner else None,
"phases": phases_data,
"total_phases": len(phases_data),
"completed_phases": len([p for p in phases_data if p["status"] == "completed"])
}
return project_data
async def _mark_export_failed(export_task_id: str, error_message: str):
"""Mark export task as failed."""
async with AsyncSessionLocal() as db:
await _mark_export_failed_in_session(db, export_task_id, error_message)
async def _mark_export_failed_in_session(db: AsyncSession, export_task_id: str, error_message: str):
"""Mark export task as failed within existing session."""
try:
await db.execute(
update(ExportTask)
.where(ExportTask.id == export_task_id)
.values(
status=ExportStatus.FAILED,
error_message=error_message,
completed_at=datetime.now(timezone.utc)
)
)
await db.commit()
except Exception as e:
logger.error(f"Failed to mark export as failed: {e}")
|