Spaces:
Running
Running
""" | |
Document processing service layer. | |
""" | |
import tempfile | |
import logging | |
import time | |
import os | |
import threading | |
from pathlib import Path | |
from typing import Optional, Tuple, Any, List | |
from src.core.config import config | |
from src.core.exceptions import ( | |
DocumentProcessingError, | |
FileSizeLimitError, | |
UnsupportedFileTypeError, | |
ConversionError | |
) | |
from src.core.parser_factory import ParserFactory | |
from src.core.latex_to_markdown_converter import convert_latex_to_markdown | |
class DocumentService: | |
"""Service for handling document processing operations.""" | |
def __init__(self): | |
self._conversion_in_progress = False | |
self._cancellation_flag: Optional[threading.Event] = None | |
def set_cancellation_flag(self, flag: threading.Event) -> None: | |
"""Set the cancellation flag for this service.""" | |
self._cancellation_flag = flag | |
def is_conversion_in_progress(self) -> bool: | |
"""Check if conversion is currently in progress.""" | |
return self._conversion_in_progress | |
def _check_cancellation(self) -> bool: | |
"""Check if cancellation has been requested.""" | |
if self._cancellation_flag and self._cancellation_flag.is_set(): | |
logging.info("Cancellation detected in document service") | |
return True | |
return False | |
def _safe_delete_file(self, file_path: Optional[str]) -> None: | |
"""Safely delete a file with error handling.""" | |
if file_path and os.path.exists(file_path): | |
try: | |
os.unlink(file_path) | |
except Exception as e: | |
logging.error(f"Error cleaning up temp file {file_path}: {e}") | |
def _validate_file(self, file_path: str) -> None: | |
"""Validate file size and type.""" | |
if not os.path.exists(file_path): | |
raise DocumentProcessingError(f"File not found: {file_path}") | |
# Check file size | |
file_size = os.path.getsize(file_path) | |
if file_size > config.app.max_file_size: | |
raise FileSizeLimitError( | |
f"File size ({file_size} bytes) exceeds maximum allowed size " | |
f"({config.app.max_file_size} bytes)" | |
) | |
# Check file extension | |
file_ext = Path(file_path).suffix.lower() | |
if file_ext not in config.app.allowed_extensions: | |
raise UnsupportedFileTypeError( | |
f"File type '{file_ext}' is not supported. " | |
f"Allowed types: {', '.join(config.app.allowed_extensions)}" | |
) | |
def _create_temp_file(self, original_path: str) -> str: | |
"""Create a temporary file with English filename.""" | |
original_ext = Path(original_path).suffix | |
with tempfile.NamedTemporaryFile(suffix=original_ext, delete=False) as temp_file: | |
temp_path = temp_file.name | |
# Copy content in chunks with cancellation checks | |
with open(original_path, 'rb') as original: | |
chunk_size = 1024 * 1024 # 1MB chunks | |
while True: | |
if self._check_cancellation(): | |
self._safe_delete_file(temp_path) | |
raise ConversionError("Conversion cancelled during file copy") | |
chunk = original.read(chunk_size) | |
if not chunk: | |
break | |
temp_file.write(chunk) | |
return temp_path | |
def _process_latex_content(self, content: str, parser_name: str, ocr_method_name: str) -> str: | |
"""Process LaTeX content - for GOT-OCR, return raw LaTeX without conversion.""" | |
# For GOT-OCR, skip LLM conversion and return raw LaTeX | |
if parser_name == "GOT-OCR (jpg,png only)": | |
logging.info("GOT-OCR detected: returning raw LaTeX output (no LLM conversion)") | |
return content | |
# For other parsers with LaTeX content, process as before | |
if (content and | |
("\\begin" in content or "\\end" in content or "$" in content) and | |
config.api.google_api_key): | |
logging.info("Converting LaTeX output to Markdown using Gemini API") | |
start_convert = time.time() | |
if self._check_cancellation(): | |
raise ConversionError("Conversion cancelled before LaTeX conversion") | |
try: | |
from src.core.latex_to_markdown_converter import convert_latex_to_markdown | |
markdown_content = convert_latex_to_markdown(content) | |
if markdown_content: | |
logging.info(f"LaTeX conversion completed in {time.time() - start_convert:.2f} seconds") | |
return markdown_content | |
else: | |
logging.warning("LaTeX to Markdown conversion failed, using raw LaTeX output") | |
except Exception as e: | |
logging.error(f"Error converting LaTeX to Markdown: {str(e)}") | |
# Continue with original content on error | |
return content | |
def _create_output_file(self, content: str, output_format: str, original_file_path: Optional[str] = None, parser_name: Optional[str] = None) -> str: | |
"""Create output file with proper extension and preserved filename.""" | |
# Determine file extension based on parser and format | |
if parser_name == "GOT-OCR (jpg,png only)": | |
# For GOT-OCR, use .tex extension | |
ext = ".tex" | |
else: | |
# For other parsers, use format-based extensions | |
format_extensions = { | |
"markdown": ".md", | |
"json": ".json", | |
"text": ".txt", | |
"document tags": ".doctags" | |
} | |
ext = format_extensions.get(output_format.lower(), ".txt") | |
if self._check_cancellation(): | |
raise ConversionError("Conversion cancelled before output file creation") | |
# Create output filename based on original filename if provided | |
if original_file_path: | |
original_name = Path(original_file_path).stem # Get filename without extension | |
# Clean the filename to be filesystem-safe while preserving spaces and common characters | |
clean_name = "".join(c for c in original_name if c.isalnum() or c in (' ', '-', '_', '.', '(', ')')).strip() | |
# Replace multiple spaces with single spaces | |
clean_name = ' '.join(clean_name.split()) | |
if not clean_name: # Fallback if cleaning removes everything | |
clean_name = "converted_document" | |
# Create output file in temp directory with proper name | |
temp_dir = tempfile.gettempdir() | |
output_filename = f"{clean_name}{ext}" | |
tmp_path = os.path.join(temp_dir, output_filename) | |
# Handle filename conflicts by adding a number suffix | |
counter = 1 | |
base_path = tmp_path | |
while os.path.exists(tmp_path): | |
name_part = f"{clean_name}_{counter}" | |
tmp_path = os.path.join(temp_dir, f"{name_part}{ext}") | |
counter += 1 | |
else: | |
# Fallback to random temporary file | |
with tempfile.NamedTemporaryFile(mode="w", suffix=ext, delete=False, encoding="utf-8") as tmp: | |
tmp_path = tmp.name | |
# Write content to file | |
try: | |
with open(tmp_path, "w", encoding="utf-8") as f: | |
# Write in chunks with cancellation checks | |
chunk_size = 10000 # characters | |
for i in range(0, len(content), chunk_size): | |
if self._check_cancellation(): | |
self._safe_delete_file(tmp_path) | |
raise ConversionError("Conversion cancelled during output file writing") | |
f.write(content[i:i+chunk_size]) | |
except Exception as e: | |
self._safe_delete_file(tmp_path) | |
raise ConversionError(f"Failed to write output file: {str(e)}") | |
return tmp_path | |
def convert_document( | |
self, | |
file_path: str, | |
parser_name: str, | |
ocr_method_name: str, | |
output_format: str | |
) -> Tuple[str, Optional[str]]: | |
""" | |
Convert a document using the specified parser and OCR method. | |
Args: | |
file_path: Path to the input file | |
parser_name: Name of the parser to use | |
ocr_method_name: Name of the OCR method to use | |
output_format: Output format (Markdown, JSON, Text, Document Tags) | |
Returns: | |
Tuple of (content, output_file_path) | |
Raises: | |
DocumentProcessingError: For general processing errors | |
FileSizeLimitError: When file is too large | |
UnsupportedFileTypeError: For unsupported file types | |
ConversionError: When conversion fails or is cancelled | |
""" | |
if not file_path: | |
raise DocumentProcessingError("No file provided") | |
self._conversion_in_progress = True | |
temp_input = None | |
output_path = None | |
try: | |
# Validate input file | |
self._validate_file(file_path) | |
if self._check_cancellation(): | |
raise ConversionError("Conversion cancelled") | |
# Create temporary file with English name | |
temp_input = self._create_temp_file(file_path) | |
if self._check_cancellation(): | |
raise ConversionError("Conversion cancelled") | |
# Process document using parser factory | |
start_time = time.time() | |
content = ParserFactory.parse_document( | |
file_path=temp_input, | |
parser_name=parser_name, | |
ocr_method_name=ocr_method_name, | |
output_format=output_format.lower(), | |
cancellation_flag=self._cancellation_flag | |
) | |
if content == "Conversion cancelled.": | |
raise ConversionError("Conversion cancelled by parser") | |
duration = time.time() - start_time | |
logging.info(f"Document processed in {duration:.2f} seconds") | |
if self._check_cancellation(): | |
raise ConversionError("Conversion cancelled") | |
# Process LaTeX content if needed | |
content = self._process_latex_content(content, parser_name, ocr_method_name) | |
if self._check_cancellation(): | |
raise ConversionError("Conversion cancelled") | |
# Create output file | |
output_path = self._create_output_file(content, output_format, file_path, parser_name) | |
return content, output_path | |
except (DocumentProcessingError, FileSizeLimitError, UnsupportedFileTypeError, ConversionError): | |
# Re-raise our custom exceptions | |
self._safe_delete_file(temp_input) | |
self._safe_delete_file(output_path) | |
raise | |
except Exception as e: | |
# Wrap unexpected exceptions | |
self._safe_delete_file(temp_input) | |
self._safe_delete_file(output_path) | |
raise DocumentProcessingError(f"Unexpected error during conversion: {str(e)}") | |
finally: | |
# Clean up temp input file | |
self._safe_delete_file(temp_input) | |
# Clean up output file if cancelled | |
if self._check_cancellation() and output_path: | |
self._safe_delete_file(output_path) | |
self._conversion_in_progress = False | |
def convert_documents( | |
self, | |
file_paths: List[str], | |
parser_name: str, | |
ocr_method_name: str, | |
output_format: str, | |
processing_type: str = "combined" | |
) -> Tuple[str, Optional[str]]: | |
""" | |
Unified method to convert single or multiple documents. | |
Args: | |
file_paths: List of paths to input files (can be single file) | |
parser_name: Name of the parser to use | |
ocr_method_name: Name of the OCR method to use | |
output_format: Output format (Markdown, JSON, Text, Document Tags) | |
processing_type: Type of multi-document processing (combined, individual, summary, comparison) | |
Returns: | |
Tuple of (content, output_file_path) | |
Raises: | |
DocumentProcessingError: For general processing errors | |
FileSizeLimitError: When file(s) are too large | |
UnsupportedFileTypeError: For unsupported file types | |
ConversionError: When conversion fails or is cancelled | |
""" | |
if not file_paths: | |
raise DocumentProcessingError("No files provided") | |
# Route to appropriate processing method | |
if len(file_paths) == 1: | |
# Single file processing - use existing method | |
return self.convert_document( | |
file_paths[0], parser_name, ocr_method_name, output_format | |
) | |
else: | |
# Multi-file processing - use new batch method | |
return self._convert_multiple_documents( | |
file_paths, parser_name, ocr_method_name, output_format, processing_type | |
) | |
def _convert_multiple_documents( | |
self, | |
file_paths: List[str], | |
parser_name: str, | |
ocr_method_name: str, | |
output_format: str, | |
processing_type: str | |
) -> Tuple[str, Optional[str]]: | |
""" | |
Convert multiple documents using batch processing. | |
Args: | |
file_paths: List of paths to input files | |
parser_name: Name of the parser to use | |
ocr_method_name: Name of the OCR method to use | |
output_format: Output format (Markdown, JSON, Text, Document Tags) | |
processing_type: Type of multi-document processing | |
Returns: | |
Tuple of (content, output_file_path) | |
""" | |
self._conversion_in_progress = True | |
temp_inputs = [] | |
output_path = None | |
self._original_file_paths = file_paths # Store original paths for filename reference | |
try: | |
# Validate all files first | |
for file_path in file_paths: | |
self._validate_file(file_path) | |
if self._check_cancellation(): | |
raise ConversionError("Conversion cancelled") | |
# Create temporary files with English names | |
for original_path in file_paths: | |
temp_path = self._create_temp_file(original_path) | |
temp_inputs.append(temp_path) | |
if self._check_cancellation(): | |
raise ConversionError("Conversion cancelled during file preparation") | |
# Process documents using parser factory with multi-document support | |
start_time = time.time() | |
content = self._process_multiple_with_parser( | |
temp_inputs, parser_name, ocr_method_name, output_format, processing_type | |
) | |
if content == "Conversion cancelled.": | |
raise ConversionError("Conversion cancelled by parser") | |
duration = time.time() - start_time | |
logging.info(f"Multiple documents processed in {duration:.2f} seconds") | |
if self._check_cancellation(): | |
raise ConversionError("Conversion cancelled") | |
# Create output file with batch naming | |
output_path = self._create_batch_output_file( | |
content, output_format, file_paths, processing_type | |
) | |
return content, output_path | |
except (DocumentProcessingError, FileSizeLimitError, UnsupportedFileTypeError, ConversionError): | |
# Re-raise our custom exceptions | |
for temp_path in temp_inputs: | |
self._safe_delete_file(temp_path) | |
self._safe_delete_file(output_path) | |
raise | |
except Exception as e: | |
# Wrap unexpected exceptions | |
for temp_path in temp_inputs: | |
self._safe_delete_file(temp_path) | |
self._safe_delete_file(output_path) | |
raise DocumentProcessingError(f"Unexpected error during batch conversion: {str(e)}") | |
finally: | |
# Clean up temp input files | |
for temp_path in temp_inputs: | |
self._safe_delete_file(temp_path) | |
# Clean up output file if cancelled | |
if self._check_cancellation() and output_path: | |
self._safe_delete_file(output_path) | |
self._conversion_in_progress = False | |
def _process_multiple_with_parser( | |
self, | |
temp_file_paths: List[str], | |
parser_name: str, | |
ocr_method_name: str, | |
output_format: str, | |
processing_type: str | |
) -> str: | |
"""Process multiple documents using the parser factory.""" | |
try: | |
# Get parser instance | |
from src.parsers.parser_registry import ParserRegistry | |
parser_class = ParserRegistry.get_parser_class(parser_name) | |
if not parser_class: | |
raise DocumentProcessingError(f"Parser '{parser_name}' not found") | |
parser_instance = parser_class() | |
parser_instance.set_cancellation_flag(self._cancellation_flag) | |
# Check if parser supports multi-document processing | |
if hasattr(parser_instance, 'parse_multiple'): | |
# Use multi-document parsing with original filenames for reference | |
return parser_instance.parse_multiple( | |
file_paths=temp_file_paths, | |
processing_type=processing_type, | |
ocr_method=ocr_method_name, | |
output_format=output_format.lower(), | |
original_filenames=[Path(fp).name for fp in self._original_file_paths] | |
) | |
else: | |
# Fallback: process individually and combine | |
results = [] | |
for i, file_path in enumerate(temp_file_paths): | |
if self._check_cancellation(): | |
return "Conversion cancelled." | |
result = parser_instance.parse( | |
file_path=file_path, | |
ocr_method=ocr_method_name | |
) | |
# Add section header for individual results using original filename | |
original_filename = Path(self._original_file_paths[i]).name | |
results.append(f"# Document {i+1}: {original_filename}\n\n{result}") | |
return "\n\n---\n\n".join(results) | |
except Exception as e: | |
raise DocumentProcessingError(f"Error processing multiple documents: {str(e)}") | |
def _create_batch_output_file( | |
self, | |
content: str, | |
output_format: str, | |
original_file_paths: List[str], | |
processing_type: str | |
) -> str: | |
"""Create output file for batch processing with descriptive naming.""" | |
# Determine file extension | |
format_extensions = { | |
"markdown": ".md", | |
"json": ".json", | |
"text": ".txt", | |
"document tags": ".doctags" | |
} | |
ext = format_extensions.get(output_format.lower(), ".txt") | |
if self._check_cancellation(): | |
raise ConversionError("Conversion cancelled before output file creation") | |
# Create descriptive filename for batch processing | |
file_count = len(original_file_paths) | |
timestamp = time.strftime("%Y%m%d_%H%M%S") | |
if processing_type == "combined": | |
filename = f"Combined_{file_count}_Documents_{timestamp}{ext}" | |
elif processing_type == "individual": | |
filename = f"Individual_Sections_{file_count}_Files_{timestamp}{ext}" | |
elif processing_type == "summary": | |
filename = f"Summary_Analysis_{file_count}_Files_{timestamp}{ext}" | |
elif processing_type == "comparison": | |
filename = f"Comparison_Analysis_{file_count}_Files_{timestamp}{ext}" | |
else: | |
filename = f"Batch_Processing_{file_count}_Files_{timestamp}{ext}" | |
# Create output file in temp directory | |
temp_dir = tempfile.gettempdir() | |
tmp_path = os.path.join(temp_dir, filename) | |
# Handle filename conflicts | |
counter = 1 | |
base_path = tmp_path | |
while os.path.exists(tmp_path): | |
name_part = filename.replace(ext, f"_{counter}{ext}") | |
tmp_path = os.path.join(temp_dir, name_part) | |
counter += 1 | |
# Write content to file | |
try: | |
with open(tmp_path, "w", encoding="utf-8") as f: | |
# Write in chunks with cancellation checks | |
chunk_size = 10000 # characters | |
for i in range(0, len(content), chunk_size): | |
if self._check_cancellation(): | |
self._safe_delete_file(tmp_path) | |
raise ConversionError("Conversion cancelled during output file writing") | |
f.write(content[i:i+chunk_size]) | |
except Exception as e: | |
self._safe_delete_file(tmp_path) | |
raise ConversionError(f"Failed to write batch output file: {str(e)}") | |
return tmp_path |