docling / src /processing /document_processor.py
levalencia's picture
Enhance Streamlit app to support original JSON and YAML display
c62bdf5
import os
import time
import logging
import json
from dataclasses import dataclass
from typing import Optional, Tuple
# Don't import DocumentConverter at module level to prevent early initialization
# from docling.document_converter import DocumentConverter
from processing.sections import SectionExtractor
from utils.cost_tracker import cost_tracker
# Remove global converter initialization - will be done lazily
# _docling_converter = DocumentConverter()
logger = logging.getLogger(__name__) # Logger for this module
@dataclass
class DocumentResult:
"""Holds processed results for a document."""
file_path: str
structured_markdown: str
structured_json: dict
structured_yaml: str # Add YAML format
redacted_markdown: str
redacted_json: dict
raw_text: str # Add raw text without preprocessing
@dataclass
class ProcessingResult:
"""Simple result for Jupyter notebook usage."""
original_document_md: str
redacted_document_md: str
original_document_json: dict
original_document_yaml: str # Add YAML format
redacted_document_json: dict
raw_text: str # Add raw text without preprocessing
removed_indices: list # Add the actual indices that were removed
input_tokens: int
output_tokens: int
cost: float
def process_document_with_redaction(
file_path: str,
endpoint: str,
api_key: str,
api_version: str,
deployment: str,
section_extractor: Optional[SectionExtractor] = None
) -> ProcessingResult:
"""
Process a document and return a simple tuple with results.
Args:
file_path: Path to the PDF file to process
endpoint: Azure OpenAI endpoint
api_key: Azure OpenAI API key
api_version: Azure OpenAI API version
deployment: Azure OpenAI deployment name
section_extractor: Optional custom section extractor
Returns:
ProcessingResult with (original_document_md, redacted_document_md, input_tokens, output_tokens, cost)
"""
logger.info(f"Processing document: {file_path}")
# Reset cost tracker for this processing session
cost_tracker.reset_session()
# Create section extractor if not provided
if section_extractor is None:
from processing.sections import ReasoningSectionExtractor
section_extractor = ReasoningSectionExtractor(
endpoint=endpoint,
api_key=api_key,
api_version=api_version,
deployment=deployment,
)
# Process the document
processor = DocumentProcessor(section_extractor=section_extractor)
result = processor.process(file_path)
# Get the actual removed indices from the section extractor
removed_indices = []
if section_extractor:
# Extract the removed indices from the LLM response
extraction_result = section_extractor.llm_extractor.extract_medication_sections(result.structured_json)
removed_indices = extraction_result.get("indices_to_remove", [])
# Get cost summary
cost_summary = cost_tracker.get_session_summary()
total_input_tokens = cost_summary.get("total_tokens", 0)
total_output_tokens = 0 # We'll calculate this from the breakdown
total_cost = cost_summary.get("total_cost", 0.0)
# Calculate output tokens from model breakdown
for model_stats in cost_summary.get("model_breakdown", {}).values():
total_output_tokens += model_stats.get("output_tokens", 0)
# Calculate input tokens (total - output)
total_input_tokens = total_input_tokens - total_output_tokens
logger.info(f"Processing complete - Input: {total_input_tokens}, Output: {total_output_tokens}, Cost: ${total_cost:.4f}")
return ProcessingResult(
original_document_md=result.structured_markdown,
redacted_document_md=result.redacted_markdown,
original_document_json=result.structured_json,
original_document_yaml=result.structured_yaml,
redacted_document_json=result.redacted_json,
raw_text=result.raw_text,
removed_indices=removed_indices,
input_tokens=total_input_tokens,
output_tokens=total_output_tokens,
cost=total_cost
)
class DocumentProcessor:
"""Handles parsing of documents with Docling and redacting specified sections."""
def __init__(self, section_extractor: Optional[SectionExtractor] = None):
"""
Initialize with an optional SectionExtractor for removing specific sections.
If None, no redaction will be performed (original structure only).
The Docling DocumentConverter will be initialized lazily when needed.
"""
self.section_extractor = section_extractor
self._converter = None # Lazy initialization
@property
def converter(self):
"""Lazy initialization of DocumentConverter to prevent early Hugging Face Hub initialization."""
if self._converter is None:
# Import here to ensure environment variables are set first
from docling.document_converter import DocumentConverter
logger.info("Initializing Docling DocumentConverter...")
self._converter = DocumentConverter()
logger.info("Docling DocumentConverter initialized successfully")
return self._converter
def process(self, file_path: str) -> DocumentResult:
"""Parse the document and optionally remove specified sections. Returns a DocumentResult."""
logger.info(f"Starting processing for file: {file_path}")
start_time = time.time()
# Ensure environment variables are set before processing
self._ensure_cache_directories()
# Convert the document using Docling
conv_result = self.converter.convert(file_path)
elapsed = time.time() - start_time
logger.info(f"Docling conversion completed in {elapsed:.2f} seconds")
# Export results from Docling
structured_md = conv_result.document.export_to_markdown()
structured_text = conv_result.document.export_to_text()
doc_json = conv_result.document.export_to_dict()
# Convert JSON to YAML for display
import yaml
doc_yaml = yaml.dump(doc_json, default_flow_style=False, allow_unicode=True, sort_keys=False)
logger.info(f"Extracted document content (text length {len(structured_text)} characters)")
# Use SectionExtractor to remove target sections if provided
if self.section_extractor:
# Use the new JSON-based approach for better section removal
redacted_json = self.section_extractor.remove_sections_from_json(doc_json)
# Convert the redacted JSON back to markdown using Docling's export method
# Create a modified document structure for proper markdown export
redacted_md = self._export_redacted_markdown(conv_result.document, redacted_json)
logger.info("Applied section redaction to remove specified sections")
else:
redacted_md = structured_md # No redaction, use original
redacted_json = doc_json # No redaction, use original
logger.info("No section redaction applied (showing original structure)")
# Persist outputs to files (JSON and redacted text) for auditing
base_name = os.path.splitext(os.path.basename(file_path))[0]
# Use the same temp directory as the main application
temp_dir = os.environ.get('TEMP_DIR', '/tmp/docling_temp')
try:
os.makedirs(temp_dir, exist_ok=True)
except PermissionError:
# Fallback to system temp directory if we can't create in the main temp dir
import tempfile
temp_dir = os.path.join(tempfile.gettempdir(), "docling_temp_files")
os.makedirs(temp_dir, exist_ok=True)
json_path = os.path.join(temp_dir, f"{base_name}_structured.json")
redacted_path = os.path.join(temp_dir, f"{base_name}_redacted.txt")
redacted_json_path = os.path.join(temp_dir, f"{base_name}_redacted.json")
try:
with open(json_path, "w", encoding="utf-8") as jf:
json.dump(doc_json, jf, ensure_ascii=False, indent=2)
with open(redacted_path, "w", encoding="utf-8") as tf:
tf.write(redacted_md)
with open(redacted_json_path, "w", encoding="utf-8") as jf:
json.dump(redacted_json, jf, ensure_ascii=False, indent=2)
logger.info(f"Saved structured JSON to {json_path}, redacted text to {redacted_path}, and redacted JSON to {redacted_json_path}")
except Exception as e:
logger.error(f"Error saving outputs to files: {e}")
# Prepare result object
result = DocumentResult(
file_path=file_path,
structured_markdown=structured_md,
structured_json=doc_json,
structured_yaml=doc_yaml,
redacted_markdown=redacted_md,
redacted_json=redacted_json,
raw_text=structured_text # Include the raw text
)
logger.info(f"Finished processing for file: {file_path}")
return result
def _ensure_cache_directories(self):
"""Ensure all necessary cache directories exist before processing."""
cache_dirs = [
os.environ.get('HF_HOME', '/tmp/docling_temp/huggingface'),
os.environ.get('HF_CACHE_HOME', '/tmp/docling_temp/huggingface_cache'),
os.environ.get('HF_HUB_CACHE', '/tmp/docling_temp/huggingface_cache'),
os.environ.get('TRANSFORMERS_CACHE', '/tmp/docling_temp/transformers_cache'),
os.environ.get('HF_DATASETS_CACHE', '/tmp/docling_temp/datasets_cache'),
os.environ.get('DIFFUSERS_CACHE', '/tmp/docling_temp/diffusers_cache'),
os.environ.get('ACCELERATE_CACHE', '/tmp/docling_temp/accelerate_cache'),
os.environ.get('TORCH_HOME', '/tmp/docling_temp/torch'),
os.environ.get('TENSORFLOW_HOME', '/tmp/docling_temp/tensorflow'),
os.environ.get('KERAS_HOME', '/tmp/docling_temp/keras'),
]
for cache_dir in cache_dirs:
try:
os.makedirs(cache_dir, exist_ok=True)
logger.debug(f"Ensured cache directory exists: {cache_dir}")
except Exception as e:
logger.warning(f"Could not create cache directory {cache_dir}: {e}")
def _export_redacted_markdown(self, document, redacted_json):
"""Export redacted markdown using Docling's Document class for proper formatting."""
try:
# Try different possible import paths for Docling Document class
try:
from docling.document import Document
except ImportError:
try:
from docling import Document
except ImportError:
try:
from docling.core import Document
except ImportError:
# If all imports fail, use the fallback method
logger.warning("Could not import Docling Document class from any known location")
raise ImportError("Docling Document class not found")
# Create a new Document from the redacted JSON
redacted_document = Document.from_dict(redacted_json)
# Use Docling's export method for proper markdown formatting
redacted_md = redacted_document.export_to_markdown()
logger.info("Successfully generated redacted markdown using Docling Document class")
return redacted_md
except Exception as e:
logger.warning(f"Failed to create Docling Document from redacted JSON: {e}")
logger.info("Falling back to manual JSON-to-markdown conversion")
# Fallback to the old method if Docling Document creation fails
return self._json_to_markdown(redacted_json)
def generate_redacted_pdf(self, redacted_json: dict, output_path: str) -> bool:
"""
Generate a redacted PDF from the redacted JSON structure.
Args:
redacted_json: The redacted document JSON structure
output_path: Path where the PDF should be saved
Returns:
bool: True if PDF generation was successful, False otherwise
"""
try:
# Import required libraries
from reportlab.lib.pagesizes import letter, A4
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch
from reportlab.lib import colors
import io
logger.info(f"Generating redacted PDF: {output_path}")
# Create PDF document
doc = SimpleDocTemplate(output_path, pagesize=A4)
story = []
# Get styles
styles = getSampleStyleSheet()
normal_style = styles['Normal']
heading_style = styles['Heading1']
# Create custom styles for better formatting
table_style = ParagraphStyle(
'TableStyle',
parent=normal_style,
fontName='Courier',
fontSize=9,
spaceAfter=6
)
# Process text elements from JSON
texts = redacted_json.get("texts", [])
# Group consecutive table-like elements together
i = 0
while i < len(texts):
text_elem = texts[i]
text_content = text_elem.get("text", "").strip()
label = text_elem.get("label", "")
level = text_elem.get("level", 0)
if not text_content:
i += 1
continue
# Handle different content types
if label == "section_header":
# Create header with appropriate level
if level == 1:
story.append(Paragraph(text_content, heading_style))
else:
# Create sub-heading style
sub_heading_style = ParagraphStyle(
f'Heading{min(level, 3)}',
parent=normal_style,
fontSize=14 - level,
spaceAfter=12,
spaceBefore=12,
textColor=colors.darkblue
)
story.append(Paragraph(text_content, sub_heading_style))
elif label == "list_item":
# Handle list items
marker = text_elem.get("marker", "•")
list_text = f"{marker} {text_content}"
story.append(Paragraph(list_text, normal_style))
elif '|' in text_content and text_content.count('|') > 1:
# Handle table-like content - collect consecutive table rows
table_rows = []
# Add the current row
cells = [cell.strip() for cell in text_content.split('|') if cell.strip()]
if cells:
table_rows.append(cells)
# Look ahead for consecutive table rows
j = i + 1
while j < len(texts):
next_text = texts[j].get("text", "").strip()
if '|' in next_text and next_text.count('|') > 1:
next_cells = [cell.strip() for cell in next_text.split('|') if cell.strip()]
if next_cells:
table_rows.append(next_cells)
j += 1
else:
break
# Create table if we have rows
if table_rows:
table = Table(table_rows)
table.setStyle(TableStyle([
('ALIGN', (0, 0), (-1, -1), 'LEFT'),
('FONTNAME', (0, 0), (-1, -1), 'Courier'),
('FONTSIZE', (0, 0), (-1, -1), 9),
('BOTTOMPADDING', (0, 0), (-1, -1), 3),
('TOPPADDING', (0, 0), (-1, -1), 3),
('GRID', (0, 0), (-1, -1), 0.5, colors.grey),
('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), # Header row
]))
story.append(table)
story.append(Spacer(1, 6))
# Skip the rows we've already processed
i = j - 1
else:
# Single row or no valid cells
story.append(Paragraph(text_content, table_style))
else:
# Regular text content
story.append(Paragraph(text_content, normal_style))
# Add small spacing between elements
story.append(Spacer(1, 3))
i += 1
# Build PDF
doc.build(story)
logger.info(f"Successfully generated redacted PDF: {output_path}")
return True
except ImportError as e:
logger.error(f"Required PDF generation libraries not available: {e}")
logger.info("Install reportlab with: pip install reportlab")
return False
except Exception as e:
logger.error(f"Error generating redacted PDF: {e}")
return False
def _json_to_markdown(self, json_data: dict) -> str:
"""Convert JSON document structure back to markdown format using Docling's structure."""
markdown_lines = []
# Get all text elements from the JSON
texts = json_data.get("texts", [])
for text_elem in texts:
text_content = text_elem.get("text", "")
label = text_elem.get("label", "")
level = text_elem.get("level", 0)
if not text_content.strip():
continue
# Format based on the label and level (following Docling's structure)
if label == "section_header":
# Add appropriate markdown headers
if level == 1:
markdown_lines.append(f"# {text_content}")
elif level == 2:
markdown_lines.append(f"## {text_content}")
elif level == 3:
markdown_lines.append(f"### {text_content}")
else:
markdown_lines.append(f"#### {text_content}")
elif label == "list_item":
# Handle list items - preserve the original marker
marker = text_elem.get("marker", "-")
markdown_lines.append(f"{marker} {text_content}")
elif label == "text":
# Regular text content - preserve as-is
markdown_lines.append(text_content)
else:
# Default to regular text
markdown_lines.append(text_content)
# Join without extra spacing to match Docling's formatting
return "\n".join(markdown_lines)