Spaces:
Running
Running
import runpod | |
import tempfile | |
import os | |
import sys | |
import json | |
import base64 | |
from pathlib import Path | |
from loguru import logger | |
# Add current directory to path | |
sys.path.append(os.path.dirname(os.path.abspath(__file__))) | |
# Import MinerU converter | |
from pdf_converter_mineru import PdfConverter | |
# Initialize converter with model path | |
CONVERTER = None | |
def initialize_converter(): | |
"""Initialize the PDF converter once""" | |
global CONVERTER | |
if CONVERTER is None: | |
logger.info("Initializing MinerU converter...") | |
model_path = os.environ.get('MINERU_MODEL_PATH', '/app/models') | |
# Create config | |
config = { | |
"model_dir": model_path, | |
"output_dir": "/tmp/mineru_output", | |
"device": "cuda" if os.path.exists('/dev/nvidia0') else "cpu", | |
"parse_method": "auto", | |
"debug": False | |
} | |
CONVERTER = PdfConverter(config) | |
logger.info("MinerU converter initialized successfully") | |
def handler(job): | |
""" | |
RunPod serverless handler for PDF to Markdown conversion | |
""" | |
try: | |
# Initialize converter on first run | |
initialize_converter() | |
job_input = job["input"] | |
# Get PDF data from base64 | |
pdf_base64 = job_input.get("pdf_base64") | |
filename = job_input.get("filename", "document.pdf") | |
if not pdf_base64: | |
return {"error": "No PDF data provided", "status": "failed"} | |
# Decode base64 PDF | |
pdf_data = base64.b64decode(pdf_base64) | |
# Save to temporary file | |
with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as tmp_file: | |
tmp_file.write(pdf_data) | |
pdf_path = tmp_file.name | |
logger.info(f"Processing PDF: {filename} ({len(pdf_data)} bytes)") | |
# Convert PDF to Markdown using MinerU | |
try: | |
output_dir = CONVERTER.convert_single_pdf(pdf_path) | |
# Find the markdown file in output | |
md_files = list(Path(output_dir).glob("**/*.md")) | |
if md_files: | |
with open(md_files[0], 'r', encoding='utf-8') as f: | |
markdown_content = f.read() | |
else: | |
# Fallback to text files | |
txt_files = list(Path(output_dir).glob("**/txt/*.txt")) | |
if txt_files: | |
with open(txt_files[0], 'r', encoding='utf-8') as f: | |
markdown_content = f.read() | |
else: | |
markdown_content = "# Conversion completed but no markdown found" | |
# Clean up | |
os.unlink(pdf_path) | |
return { | |
"markdown": markdown_content, | |
"filename": filename, | |
"status": "success", | |
"pages": len(markdown_content.split('\n---\n')) # Rough page count | |
} | |
except Exception as conv_error: | |
logger.error(f"Conversion error: {str(conv_error)}") | |
return { | |
"error": f"Conversion failed: {str(conv_error)}", | |
"filename": filename, | |
"status": "failed" | |
} | |
except Exception as e: | |
logger.error(f"Handler error: {str(e)}") | |
return { | |
"error": str(e), | |
"status": "failed" | |
} | |
# RunPod serverless entrypoint | |
runpod.serverless.start({"handler": handler}) |