Spaces:
Building
Building
import asyncio | |
import os | |
import time | |
import signal | |
import sys | |
from datetime import datetime | |
import traceback | |
import logging | |
from sqlalchemy.future import select | |
from sqlalchemy.ext.asyncio import AsyncSession | |
from sqlalchemy.exc import SQLAlchemyError | |
from app.database import AsyncSessionLocal, init_db, close_db | |
from app.models import VideoUpload | |
from app.utils import whisper_llm, pdf, s3, lightweight_agentic | |
# Setup logging with UTF-8 encoding for Windows compatibility | |
logging.basicConfig( | |
level=logging.INFO, | |
format='[%(asctime)s] %(levelname)s - %(name)s - %(message)s', | |
handlers=[ | |
logging.StreamHandler(sys.stdout), # Use stdout for better encoding | |
logging.FileHandler('worker.log', encoding='utf-8') | |
] | |
) | |
logger = logging.getLogger("worker.daemon") | |
POLL_INTERVAL = 60 # seconds | |
SHUTDOWN_EVENT = asyncio.Event() | |
def signal_handler(signum, frame): | |
"""Handle shutdown signals gracefully""" | |
logger.info(f"Received signal {signum}, initiating graceful shutdown...") | |
SHUTDOWN_EVENT.set() | |
async def process_pending_videos(): | |
"""Process all pending video uploads""" | |
async with AsyncSessionLocal() as session: | |
try: | |
# Query for pending videos | |
result = await session.execute( | |
select(VideoUpload).where(VideoUpload.status == "pending") | |
) | |
pending_videos = result.scalars().all() | |
if not pending_videos: | |
logger.info("No pending videos found") | |
return | |
logger.info(f"Found {len(pending_videos)} pending videos to process") | |
for video in pending_videos: | |
if SHUTDOWN_EVENT.is_set(): | |
logger.info("Shutdown requested, stopping video processing") | |
break | |
logger.info(f"Processing video ID {video.id} for user {video.user_id}") | |
try: | |
# Update status to processing | |
video.status = "processing" | |
video.updated_at = datetime.utcnow() | |
await session.commit() | |
# Process with Lightweight Agentic Analysis (Groq + Llama3) | |
try: | |
transcription, summary = await lightweight_agentic.analyze_with_lightweight_agentic( | |
video_url=video.video_url, | |
user_id=video.user_id, | |
db=session | |
) | |
logger.info(f"Lightweight agentic analysis completed for video {video.id}") | |
except Exception as agentic_error: | |
logger.warning(f"Lightweight agentic analysis failed, falling back to basic Whisper: {agentic_error}") | |
transcription, summary = await whisper_llm.analyze( | |
video_url=video.video_url, | |
user_id=video.user_id, | |
db=session | |
) | |
logger.info(f"Basic Whisper analysis completed for video {video.id}") | |
except Exception as e: | |
logger.error(f"Whisper failed for video {video.id}: {e}") | |
logger.debug(traceback.format_exc()) | |
# Update status to failed | |
video.status = "failed" | |
video.updated_at = datetime.utcnow() | |
await session.commit() | |
continue | |
try: | |
# Generate PDF | |
pdf_bytes = pdf.generate(transcription, summary) | |
logger.info(f"PDF generation completed for video {video.id}") | |
except Exception as e: | |
logger.error(f"PDF generation failed for video {video.id}: {e}") | |
logger.debug(traceback.format_exc()) | |
video.status = "failed" | |
video.updated_at = datetime.utcnow() | |
await session.commit() | |
continue | |
try: | |
# Upload to S3 | |
pdf_key = f"pdfs/{video.id}.pdf" | |
pdf_url = s3.upload_pdf_bytes(pdf_bytes, pdf_key) | |
logger.info(f"S3 upload completed for video {video.id}") | |
except Exception as e: | |
logger.error(f"Upload to S3 failed for video {video.id}: {e}") | |
logger.debug(traceback.format_exc()) | |
video.status = "failed" | |
video.updated_at = datetime.utcnow() | |
await session.commit() | |
continue | |
try: | |
# Mark as completed | |
video.status = "completed" | |
video.pdf_url = pdf_url | |
video.updated_at = datetime.utcnow() | |
await session.commit() | |
logger.info(f"Successfully completed video {video.id}") | |
except SQLAlchemyError as e: | |
logger.error(f"DB commit failed for video {video.id}: {e}") | |
logger.debug(traceback.format_exc()) | |
await session.rollback() | |
except SQLAlchemyError as e: | |
logger.error(f"Database error: {e}") | |
logger.debug(traceback.format_exc()) | |
except Exception as e: | |
logger.error(f"Unexpected error in process_pending_videos: {e}") | |
logger.debug(traceback.format_exc()) | |
async def run_worker(): | |
"""Main worker loop""" | |
logger.info("Async worker daemon started...") | |
# Initialize database | |
try: | |
await init_db() | |
logger.info("Database initialized successfully") | |
except Exception as e: | |
logger.error(f"Failed to initialize database: {e}") | |
return | |
cycle_count = 0 | |
while not SHUTDOWN_EVENT.is_set(): | |
cycle_count += 1 | |
logger.info(f"Worker cycle {cycle_count} - Checking for pending videos...") | |
try: | |
await process_pending_videos() | |
except Exception as e: | |
logger.error(f"Worker loop error: {e}") | |
logger.debug(traceback.format_exc()) | |
# Wait for next cycle or shutdown | |
try: | |
await asyncio.wait_for(SHUTDOWN_EVENT.wait(), timeout=POLL_INTERVAL) | |
except asyncio.TimeoutError: | |
# Normal timeout, continue to next cycle | |
pass | |
except Exception as e: | |
logger.error(f"Error in worker wait: {e}") | |
break | |
logger.info("Worker loop stopped, cleaning up...") | |
# Cleanup | |
try: | |
await close_db() | |
logger.info("Database connections closed") | |
except Exception as e: | |
logger.error(f"Error during cleanup: {e}") | |
async def main(): | |
"""Main entry point with signal handling""" | |
# Setup signal handlers | |
signal.signal(signal.SIGINT, signal_handler) | |
signal.signal(signal.SIGTERM, signal_handler) | |
try: | |
await run_worker() | |
except KeyboardInterrupt: | |
logger.info("Keyboard interrupt received") | |
except Exception as e: | |
logger.error(f"Fatal error in main: {e}") | |
logger.debug(traceback.format_exc()) | |
finally: | |
logger.info("Worker daemon shutdown complete") | |
if __name__ == "__main__": | |
try: | |
asyncio.run(main()) | |
except KeyboardInterrupt: | |
logger.info("Worker daemon interrupted by user") | |
except Exception as e: | |
logger.error(f"Fatal error: {e}") | |
sys.exit(1) | |