import os import time import logging import json from dataclasses import dataclass from typing import Optional, Tuple # Don't import DocumentConverter at module level to prevent early initialization # from docling.document_converter import DocumentConverter from processing.sections import SectionExtractor from utils.cost_tracker import cost_tracker # Remove global converter initialization - will be done lazily # _docling_converter = DocumentConverter() logger = logging.getLogger(__name__) # Logger for this module @dataclass class DocumentResult: """Holds processed results for a document.""" file_path: str structured_markdown: str structured_json: dict structured_yaml: str # Add YAML format redacted_markdown: str redacted_json: dict raw_text: str # Add raw text without preprocessing @dataclass class ProcessingResult: """Simple result for Jupyter notebook usage.""" original_document_md: str redacted_document_md: str original_document_json: dict original_document_yaml: str # Add YAML format redacted_document_json: dict raw_text: str # Add raw text without preprocessing removed_indices: list # Add the actual indices that were removed input_tokens: int output_tokens: int cost: float def process_document_with_redaction( file_path: str, endpoint: str, api_key: str, api_version: str, deployment: str, section_extractor: Optional[SectionExtractor] = None ) -> ProcessingResult: """ Process a document and return a simple tuple with results. Args: file_path: Path to the PDF file to process endpoint: Azure OpenAI endpoint api_key: Azure OpenAI API key api_version: Azure OpenAI API version deployment: Azure OpenAI deployment name section_extractor: Optional custom section extractor Returns: ProcessingResult with (original_document_md, redacted_document_md, input_tokens, output_tokens, cost) """ logger.info(f"Processing document: {file_path}") # Reset cost tracker for this processing session cost_tracker.reset_session() # Create section extractor if not provided if section_extractor is None: from processing.sections import ReasoningSectionExtractor section_extractor = ReasoningSectionExtractor( endpoint=endpoint, api_key=api_key, api_version=api_version, deployment=deployment, ) # Process the document processor = DocumentProcessor(section_extractor=section_extractor) result = processor.process(file_path) # Get the actual removed indices from the section extractor removed_indices = [] if section_extractor: # Extract the removed indices from the LLM response extraction_result = section_extractor.llm_extractor.extract_medication_sections(result.structured_json) removed_indices = extraction_result.get("indices_to_remove", []) # Get cost summary cost_summary = cost_tracker.get_session_summary() total_input_tokens = cost_summary.get("total_tokens", 0) total_output_tokens = 0 # We'll calculate this from the breakdown total_cost = cost_summary.get("total_cost", 0.0) # Calculate output tokens from model breakdown for model_stats in cost_summary.get("model_breakdown", {}).values(): total_output_tokens += model_stats.get("output_tokens", 0) # Calculate input tokens (total - output) total_input_tokens = total_input_tokens - total_output_tokens logger.info(f"Processing complete - Input: {total_input_tokens}, Output: {total_output_tokens}, Cost: ${total_cost:.4f}") return ProcessingResult( original_document_md=result.structured_markdown, redacted_document_md=result.redacted_markdown, original_document_json=result.structured_json, original_document_yaml=result.structured_yaml, redacted_document_json=result.redacted_json, raw_text=result.raw_text, removed_indices=removed_indices, input_tokens=total_input_tokens, output_tokens=total_output_tokens, cost=total_cost ) class DocumentProcessor: """Handles parsing of documents with Docling and redacting specified sections.""" def __init__(self, section_extractor: Optional[SectionExtractor] = None): """ Initialize with an optional SectionExtractor for removing specific sections. If None, no redaction will be performed (original structure only). The Docling DocumentConverter will be initialized lazily when needed. """ self.section_extractor = section_extractor self._converter = None # Lazy initialization @property def converter(self): """Lazy initialization of DocumentConverter to prevent early Hugging Face Hub initialization.""" if self._converter is None: # Import here to ensure environment variables are set first from docling.document_converter import DocumentConverter logger.info("Initializing Docling DocumentConverter...") self._converter = DocumentConverter() logger.info("Docling DocumentConverter initialized successfully") return self._converter def process(self, file_path: str) -> DocumentResult: """Parse the document and optionally remove specified sections. Returns a DocumentResult.""" logger.info(f"Starting processing for file: {file_path}") start_time = time.time() # Ensure environment variables are set before processing self._ensure_cache_directories() # Convert the document using Docling conv_result = self.converter.convert(file_path) elapsed = time.time() - start_time logger.info(f"Docling conversion completed in {elapsed:.2f} seconds") # Export results from Docling structured_md = conv_result.document.export_to_markdown() structured_text = conv_result.document.export_to_text() doc_json = conv_result.document.export_to_dict() # Convert JSON to YAML for display import yaml doc_yaml = yaml.dump(doc_json, default_flow_style=False, allow_unicode=True, sort_keys=False) logger.info(f"Extracted document content (text length {len(structured_text)} characters)") # Use SectionExtractor to remove target sections if provided if self.section_extractor: # Use the new JSON-based approach for better section removal redacted_json = self.section_extractor.remove_sections_from_json(doc_json) # Convert the redacted JSON back to markdown using Docling's export method # Create a modified document structure for proper markdown export redacted_md = self._export_redacted_markdown(conv_result.document, redacted_json) logger.info("Applied section redaction to remove specified sections") else: redacted_md = structured_md # No redaction, use original redacted_json = doc_json # No redaction, use original logger.info("No section redaction applied (showing original structure)") # Persist outputs to files (JSON and redacted text) for auditing base_name = os.path.splitext(os.path.basename(file_path))[0] # Use the same temp directory as the main application temp_dir = os.environ.get('TEMP_DIR', '/tmp/docling_temp') try: os.makedirs(temp_dir, exist_ok=True) except PermissionError: # Fallback to system temp directory if we can't create in the main temp dir import tempfile temp_dir = os.path.join(tempfile.gettempdir(), "docling_temp_files") os.makedirs(temp_dir, exist_ok=True) json_path = os.path.join(temp_dir, f"{base_name}_structured.json") redacted_path = os.path.join(temp_dir, f"{base_name}_redacted.txt") redacted_json_path = os.path.join(temp_dir, f"{base_name}_redacted.json") try: with open(json_path, "w", encoding="utf-8") as jf: json.dump(doc_json, jf, ensure_ascii=False, indent=2) with open(redacted_path, "w", encoding="utf-8") as tf: tf.write(redacted_md) with open(redacted_json_path, "w", encoding="utf-8") as jf: json.dump(redacted_json, jf, ensure_ascii=False, indent=2) logger.info(f"Saved structured JSON to {json_path}, redacted text to {redacted_path}, and redacted JSON to {redacted_json_path}") except Exception as e: logger.error(f"Error saving outputs to files: {e}") # Prepare result object result = DocumentResult( file_path=file_path, structured_markdown=structured_md, structured_json=doc_json, structured_yaml=doc_yaml, redacted_markdown=redacted_md, redacted_json=redacted_json, raw_text=structured_text # Include the raw text ) logger.info(f"Finished processing for file: {file_path}") return result def _ensure_cache_directories(self): """Ensure all necessary cache directories exist before processing.""" cache_dirs = [ os.environ.get('HF_HOME', '/tmp/docling_temp/huggingface'), os.environ.get('HF_CACHE_HOME', '/tmp/docling_temp/huggingface_cache'), os.environ.get('HF_HUB_CACHE', '/tmp/docling_temp/huggingface_cache'), os.environ.get('TRANSFORMERS_CACHE', '/tmp/docling_temp/transformers_cache'), os.environ.get('HF_DATASETS_CACHE', '/tmp/docling_temp/datasets_cache'), os.environ.get('DIFFUSERS_CACHE', '/tmp/docling_temp/diffusers_cache'), os.environ.get('ACCELERATE_CACHE', '/tmp/docling_temp/accelerate_cache'), os.environ.get('TORCH_HOME', '/tmp/docling_temp/torch'), os.environ.get('TENSORFLOW_HOME', '/tmp/docling_temp/tensorflow'), os.environ.get('KERAS_HOME', '/tmp/docling_temp/keras'), ] for cache_dir in cache_dirs: try: os.makedirs(cache_dir, exist_ok=True) logger.debug(f"Ensured cache directory exists: {cache_dir}") except Exception as e: logger.warning(f"Could not create cache directory {cache_dir}: {e}") def _export_redacted_markdown(self, document, redacted_json): """Export redacted markdown using Docling's Document class for proper formatting.""" try: # Try different possible import paths for Docling Document class try: from docling.document import Document except ImportError: try: from docling import Document except ImportError: try: from docling.core import Document except ImportError: # If all imports fail, use the fallback method logger.warning("Could not import Docling Document class from any known location") raise ImportError("Docling Document class not found") # Create a new Document from the redacted JSON redacted_document = Document.from_dict(redacted_json) # Use Docling's export method for proper markdown formatting redacted_md = redacted_document.export_to_markdown() logger.info("Successfully generated redacted markdown using Docling Document class") return redacted_md except Exception as e: logger.warning(f"Failed to create Docling Document from redacted JSON: {e}") logger.info("Falling back to manual JSON-to-markdown conversion") # Fallback to the old method if Docling Document creation fails return self._json_to_markdown(redacted_json) def generate_redacted_pdf(self, redacted_json: dict, output_path: str) -> bool: """ Generate a redacted PDF from the redacted JSON structure. Args: redacted_json: The redacted document JSON structure output_path: Path where the PDF should be saved Returns: bool: True if PDF generation was successful, False otherwise """ try: # Import required libraries from reportlab.lib.pagesizes import letter, A4 from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib.units import inch from reportlab.lib import colors import io logger.info(f"Generating redacted PDF: {output_path}") # Create PDF document doc = SimpleDocTemplate(output_path, pagesize=A4) story = [] # Get styles styles = getSampleStyleSheet() normal_style = styles['Normal'] heading_style = styles['Heading1'] # Create custom styles for better formatting table_style = ParagraphStyle( 'TableStyle', parent=normal_style, fontName='Courier', fontSize=9, spaceAfter=6 ) # Process text elements from JSON texts = redacted_json.get("texts", []) # Group consecutive table-like elements together i = 0 while i < len(texts): text_elem = texts[i] text_content = text_elem.get("text", "").strip() label = text_elem.get("label", "") level = text_elem.get("level", 0) if not text_content: i += 1 continue # Handle different content types if label == "section_header": # Create header with appropriate level if level == 1: story.append(Paragraph(text_content, heading_style)) else: # Create sub-heading style sub_heading_style = ParagraphStyle( f'Heading{min(level, 3)}', parent=normal_style, fontSize=14 - level, spaceAfter=12, spaceBefore=12, textColor=colors.darkblue ) story.append(Paragraph(text_content, sub_heading_style)) elif label == "list_item": # Handle list items marker = text_elem.get("marker", "•") list_text = f"{marker} {text_content}" story.append(Paragraph(list_text, normal_style)) elif '|' in text_content and text_content.count('|') > 1: # Handle table-like content - collect consecutive table rows table_rows = [] # Add the current row cells = [cell.strip() for cell in text_content.split('|') if cell.strip()] if cells: table_rows.append(cells) # Look ahead for consecutive table rows j = i + 1 while j < len(texts): next_text = texts[j].get("text", "").strip() if '|' in next_text and next_text.count('|') > 1: next_cells = [cell.strip() for cell in next_text.split('|') if cell.strip()] if next_cells: table_rows.append(next_cells) j += 1 else: break # Create table if we have rows if table_rows: table = Table(table_rows) table.setStyle(TableStyle([ ('ALIGN', (0, 0), (-1, -1), 'LEFT'), ('FONTNAME', (0, 0), (-1, -1), 'Courier'), ('FONTSIZE', (0, 0), (-1, -1), 9), ('BOTTOMPADDING', (0, 0), (-1, -1), 3), ('TOPPADDING', (0, 0), (-1, -1), 3), ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), # Header row ])) story.append(table) story.append(Spacer(1, 6)) # Skip the rows we've already processed i = j - 1 else: # Single row or no valid cells story.append(Paragraph(text_content, table_style)) else: # Regular text content story.append(Paragraph(text_content, normal_style)) # Add small spacing between elements story.append(Spacer(1, 3)) i += 1 # Build PDF doc.build(story) logger.info(f"Successfully generated redacted PDF: {output_path}") return True except ImportError as e: logger.error(f"Required PDF generation libraries not available: {e}") logger.info("Install reportlab with: pip install reportlab") return False except Exception as e: logger.error(f"Error generating redacted PDF: {e}") return False def _json_to_markdown(self, json_data: dict) -> str: """Convert JSON document structure back to markdown format using Docling's structure.""" markdown_lines = [] # Get all text elements from the JSON texts = json_data.get("texts", []) for text_elem in texts: text_content = text_elem.get("text", "") label = text_elem.get("label", "") level = text_elem.get("level", 0) if not text_content.strip(): continue # Format based on the label and level (following Docling's structure) if label == "section_header": # Add appropriate markdown headers if level == 1: markdown_lines.append(f"# {text_content}") elif level == 2: markdown_lines.append(f"## {text_content}") elif level == 3: markdown_lines.append(f"### {text_content}") else: markdown_lines.append(f"#### {text_content}") elif label == "list_item": # Handle list items - preserve the original marker marker = text_elem.get("marker", "-") markdown_lines.append(f"{marker} {text_content}") elif label == "text": # Regular text content - preserve as-is markdown_lines.append(text_content) else: # Default to regular text markdown_lines.append(text_content) # Join without extra spacing to match Docling's formatting return "\n".join(markdown_lines)