Spaces:
Running
Running
import os | |
from fastapi import FastAPI, HTTPException, UploadFile, File, BackgroundTasks | |
from fastapi.middleware.cors import CORSMiddleware | |
from pydantic import BaseModel, HttpUrl | |
import tempfile | |
import requests | |
from typing import Optional, List, Dict, Any | |
from dockling_parser import DocumentParser | |
from dockling_parser.exceptions import ParserError, UnsupportedFormatError | |
from dockling_parser.types import ParsedDocument | |
import logging | |
import aiofiles | |
import asyncio | |
from urllib.parse import urlparse | |
from mangum import Mangum | |
import httpx | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Initialize FastAPI app | |
app = FastAPI( | |
title="Document Parser API", | |
description="A scalable API for parsing various document formats", | |
version="1.0.0" | |
) | |
# Add CORS middleware | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Initialize document parser | |
parser = DocumentParser() | |
class URLInput(BaseModel): | |
url: HttpUrl | |
callback_url: Optional[HttpUrl] = None | |
class ErrorResponse(BaseModel): | |
error: str | |
detail: Optional[str] = None | |
code: str | |
class ParseResponse(BaseModel): | |
job_id: str | |
status: str | |
result: Optional[ParsedDocument] = None | |
error: Optional[str] = None | |
# In-memory job storage (replace with Redis/DB in production) | |
jobs = {} | |
async def process_document_async(job_id: str, file_path: str, callback_url: Optional[str] = None): | |
"""Process document asynchronously""" | |
try: | |
# Update job status | |
jobs[job_id] = {"status": "processing"} | |
# Parse document | |
result = parser.parse(file_path) | |
# Update job with result | |
jobs[job_id] = { | |
"status": "completed", | |
"result": result | |
} | |
# Call callback URL if provided | |
if callback_url: | |
try: | |
await notify_callback(callback_url, job_id, result) | |
except Exception as e: | |
logger.error(f"Failed to notify callback URL: {str(e)}") | |
except Exception as e: | |
logger.error(f"Error processing document: {str(e)}") | |
jobs[job_id] = { | |
"status": "failed", | |
"error": str(e) | |
} | |
finally: | |
# Cleanup temporary file | |
try: | |
if os.path.exists(file_path): | |
os.unlink(file_path) | |
except Exception as e: | |
logger.error(f"Error cleaning up file: {str(e)}") | |
async def notify_callback(callback_url: str, job_id: str, result: ParsedDocument): | |
"""Notify callback URL with results""" | |
try: | |
async with httpx.AsyncClient() as client: | |
await client.post( | |
callback_url, | |
json={ | |
"job_id": job_id, | |
"result": result.dict() | |
} | |
) | |
except Exception as e: | |
logger.error(f"Failed to send callback: {str(e)}") | |
async def parse_file( | |
background_tasks: BackgroundTasks, | |
file: UploadFile = File(...), | |
callback_url: Optional[HttpUrl] = None | |
): | |
""" | |
Parse a document from file upload | |
""" | |
try: | |
# Create temporary file in /tmp for Vercel | |
suffix = os.path.splitext(file.filename)[1] | |
tmp_dir = "/tmp" if os.path.exists("/tmp") else tempfile.gettempdir() | |
tmp_path = os.path.join(tmp_dir, f"upload_{os.urandom(8).hex()}{suffix}") | |
content = await file.read() | |
with open(tmp_path, "wb") as f: | |
f.write(content) | |
# Generate job ID | |
job_id = f"job_{len(jobs) + 1}" | |
# Start background processing | |
background_tasks.add_task( | |
process_document_async, | |
job_id, | |
tmp_path, | |
str(callback_url) if callback_url else None | |
) | |
return ParseResponse( | |
job_id=job_id, | |
status="queued" | |
) | |
except Exception as e: | |
logger.error(f"Error handling file upload: {str(e)}") | |
raise HTTPException( | |
status_code=500, | |
detail=str(e) | |
) | |
async def parse_url(input_data: URLInput, background_tasks: BackgroundTasks): | |
""" | |
Parse a document from URL | |
""" | |
try: | |
# Download file | |
async with httpx.AsyncClient() as client: | |
response = await client.get(str(input_data.url), follow_redirects=True) | |
response.raise_for_status() | |
# Get filename from URL or use default | |
filename = os.path.basename(urlparse(str(input_data.url)).path) | |
if not filename: | |
filename = "document.pdf" | |
# Save to temporary file in /tmp for Vercel | |
tmp_dir = "/tmp" if os.path.exists("/tmp") else tempfile.gettempdir() | |
tmp_path = os.path.join(tmp_dir, f"download_{os.urandom(8).hex()}{os.path.splitext(filename)[1]}") | |
with open(tmp_path, "wb") as f: | |
f.write(response.content) | |
# Generate job ID | |
job_id = f"job_{len(jobs) + 1}" | |
# Start background processing | |
background_tasks.add_task( | |
process_document_async, | |
job_id, | |
tmp_path, | |
str(input_data.callback_url) if input_data.callback_url else None | |
) | |
return ParseResponse( | |
job_id=job_id, | |
status="queued" | |
) | |
except httpx.RequestError as e: | |
logger.error(f"Error downloading file: {str(e)}") | |
raise HTTPException( | |
status_code=400, | |
detail=f"Error downloading file: {str(e)}" | |
) | |
except Exception as e: | |
logger.error(f"Error processing URL: {str(e)}") | |
raise HTTPException( | |
status_code=500, | |
detail=str(e) | |
) | |
async def get_status(job_id: str): | |
""" | |
Get the status of a parsing job | |
""" | |
if job_id not in jobs: | |
raise HTTPException( | |
status_code=404, | |
detail="Job not found" | |
) | |
job = jobs[job_id] | |
return ParseResponse( | |
job_id=job_id, | |
status=job["status"], | |
result=job.get("result"), | |
error=job.get("error") | |
) | |
async def health_check(): | |
""" | |
Health check endpoint | |
""" | |
return {"status": "healthy"} | |
# Handler for Vercel | |
handler = Mangum(app, lifespan="off") |