Spaces:
Sleeping
Sleeping
import os | |
import time | |
import logging | |
import json | |
from dataclasses import dataclass | |
from typing import Optional, Tuple | |
# Don't import DocumentConverter at module level to prevent early initialization | |
# from docling.document_converter import DocumentConverter | |
from processing.sections import SectionExtractor | |
from utils.cost_tracker import cost_tracker | |
# Remove global converter initialization - will be done lazily | |
# _docling_converter = DocumentConverter() | |
logger = logging.getLogger(__name__) # Logger for this module | |
class DocumentResult: | |
"""Holds processed results for a document.""" | |
file_path: str | |
structured_markdown: str | |
structured_json: dict | |
structured_yaml: str # Add YAML format | |
redacted_markdown: str | |
redacted_json: dict | |
raw_text: str # Add raw text without preprocessing | |
class ProcessingResult: | |
"""Simple result for Jupyter notebook usage.""" | |
original_document_md: str | |
redacted_document_md: str | |
original_document_json: dict | |
original_document_yaml: str # Add YAML format | |
redacted_document_json: dict | |
raw_text: str # Add raw text without preprocessing | |
removed_indices: list # Add the actual indices that were removed | |
input_tokens: int | |
output_tokens: int | |
cost: float | |
def process_document_with_redaction( | |
file_path: str, | |
endpoint: str, | |
api_key: str, | |
api_version: str, | |
deployment: str, | |
section_extractor: Optional[SectionExtractor] = None | |
) -> ProcessingResult: | |
""" | |
Process a document and return a simple tuple with results. | |
Args: | |
file_path: Path to the PDF file to process | |
endpoint: Azure OpenAI endpoint | |
api_key: Azure OpenAI API key | |
api_version: Azure OpenAI API version | |
deployment: Azure OpenAI deployment name | |
section_extractor: Optional custom section extractor | |
Returns: | |
ProcessingResult with (original_document_md, redacted_document_md, input_tokens, output_tokens, cost) | |
""" | |
logger.info(f"Processing document: {file_path}") | |
# Reset cost tracker for this processing session | |
cost_tracker.reset_session() | |
# Create section extractor if not provided | |
if section_extractor is None: | |
from processing.sections import ReasoningSectionExtractor | |
section_extractor = ReasoningSectionExtractor( | |
endpoint=endpoint, | |
api_key=api_key, | |
api_version=api_version, | |
deployment=deployment, | |
) | |
# Process the document | |
processor = DocumentProcessor(section_extractor=section_extractor) | |
result = processor.process(file_path) | |
# Get the actual removed indices from the section extractor | |
removed_indices = [] | |
if section_extractor: | |
# Extract the removed indices from the LLM response | |
extraction_result = section_extractor.llm_extractor.extract_medication_sections(result.structured_json) | |
removed_indices = extraction_result.get("indices_to_remove", []) | |
# Get cost summary | |
cost_summary = cost_tracker.get_session_summary() | |
total_input_tokens = cost_summary.get("total_tokens", 0) | |
total_output_tokens = 0 # We'll calculate this from the breakdown | |
total_cost = cost_summary.get("total_cost", 0.0) | |
# Calculate output tokens from model breakdown | |
for model_stats in cost_summary.get("model_breakdown", {}).values(): | |
total_output_tokens += model_stats.get("output_tokens", 0) | |
# Calculate input tokens (total - output) | |
total_input_tokens = total_input_tokens - total_output_tokens | |
logger.info(f"Processing complete - Input: {total_input_tokens}, Output: {total_output_tokens}, Cost: ${total_cost:.4f}") | |
return ProcessingResult( | |
original_document_md=result.structured_markdown, | |
redacted_document_md=result.redacted_markdown, | |
original_document_json=result.structured_json, | |
original_document_yaml=result.structured_yaml, | |
redacted_document_json=result.redacted_json, | |
raw_text=result.raw_text, | |
removed_indices=removed_indices, | |
input_tokens=total_input_tokens, | |
output_tokens=total_output_tokens, | |
cost=total_cost | |
) | |
class DocumentProcessor: | |
"""Handles parsing of documents with Docling and redacting specified sections.""" | |
def __init__(self, section_extractor: Optional[SectionExtractor] = None): | |
""" | |
Initialize with an optional SectionExtractor for removing specific sections. | |
If None, no redaction will be performed (original structure only). | |
The Docling DocumentConverter will be initialized lazily when needed. | |
""" | |
self.section_extractor = section_extractor | |
self._converter = None # Lazy initialization | |
def converter(self): | |
"""Lazy initialization of DocumentConverter to prevent early Hugging Face Hub initialization.""" | |
if self._converter is None: | |
# Import here to ensure environment variables are set first | |
from docling.document_converter import DocumentConverter | |
logger.info("Initializing Docling DocumentConverter...") | |
self._converter = DocumentConverter() | |
logger.info("Docling DocumentConverter initialized successfully") | |
return self._converter | |
def process(self, file_path: str) -> DocumentResult: | |
"""Parse the document and optionally remove specified sections. Returns a DocumentResult.""" | |
logger.info(f"Starting processing for file: {file_path}") | |
start_time = time.time() | |
# Ensure environment variables are set before processing | |
self._ensure_cache_directories() | |
# Convert the document using Docling | |
conv_result = self.converter.convert(file_path) | |
elapsed = time.time() - start_time | |
logger.info(f"Docling conversion completed in {elapsed:.2f} seconds") | |
# Export results from Docling | |
structured_md = conv_result.document.export_to_markdown() | |
structured_text = conv_result.document.export_to_text() | |
doc_json = conv_result.document.export_to_dict() | |
# Convert JSON to YAML for display | |
import yaml | |
doc_yaml = yaml.dump(doc_json, default_flow_style=False, allow_unicode=True, sort_keys=False) | |
logger.info(f"Extracted document content (text length {len(structured_text)} characters)") | |
# Use SectionExtractor to remove target sections if provided | |
if self.section_extractor: | |
# Use the new JSON-based approach for better section removal | |
redacted_json = self.section_extractor.remove_sections_from_json(doc_json) | |
# Convert the redacted JSON back to markdown using Docling's export method | |
# Create a modified document structure for proper markdown export | |
redacted_md = self._export_redacted_markdown(conv_result.document, redacted_json) | |
logger.info("Applied section redaction to remove specified sections") | |
else: | |
redacted_md = structured_md # No redaction, use original | |
redacted_json = doc_json # No redaction, use original | |
logger.info("No section redaction applied (showing original structure)") | |
# Persist outputs to files (JSON and redacted text) for auditing | |
base_name = os.path.splitext(os.path.basename(file_path))[0] | |
# Use the same temp directory as the main application | |
temp_dir = os.environ.get('TEMP_DIR', '/tmp/docling_temp') | |
try: | |
os.makedirs(temp_dir, exist_ok=True) | |
except PermissionError: | |
# Fallback to system temp directory if we can't create in the main temp dir | |
import tempfile | |
temp_dir = os.path.join(tempfile.gettempdir(), "docling_temp_files") | |
os.makedirs(temp_dir, exist_ok=True) | |
json_path = os.path.join(temp_dir, f"{base_name}_structured.json") | |
redacted_path = os.path.join(temp_dir, f"{base_name}_redacted.txt") | |
redacted_json_path = os.path.join(temp_dir, f"{base_name}_redacted.json") | |
try: | |
with open(json_path, "w", encoding="utf-8") as jf: | |
json.dump(doc_json, jf, ensure_ascii=False, indent=2) | |
with open(redacted_path, "w", encoding="utf-8") as tf: | |
tf.write(redacted_md) | |
with open(redacted_json_path, "w", encoding="utf-8") as jf: | |
json.dump(redacted_json, jf, ensure_ascii=False, indent=2) | |
logger.info(f"Saved structured JSON to {json_path}, redacted text to {redacted_path}, and redacted JSON to {redacted_json_path}") | |
except Exception as e: | |
logger.error(f"Error saving outputs to files: {e}") | |
# Prepare result object | |
result = DocumentResult( | |
file_path=file_path, | |
structured_markdown=structured_md, | |
structured_json=doc_json, | |
structured_yaml=doc_yaml, | |
redacted_markdown=redacted_md, | |
redacted_json=redacted_json, | |
raw_text=structured_text # Include the raw text | |
) | |
logger.info(f"Finished processing for file: {file_path}") | |
return result | |
def _ensure_cache_directories(self): | |
"""Ensure all necessary cache directories exist before processing.""" | |
cache_dirs = [ | |
os.environ.get('HF_HOME', '/tmp/docling_temp/huggingface'), | |
os.environ.get('HF_CACHE_HOME', '/tmp/docling_temp/huggingface_cache'), | |
os.environ.get('HF_HUB_CACHE', '/tmp/docling_temp/huggingface_cache'), | |
os.environ.get('TRANSFORMERS_CACHE', '/tmp/docling_temp/transformers_cache'), | |
os.environ.get('HF_DATASETS_CACHE', '/tmp/docling_temp/datasets_cache'), | |
os.environ.get('DIFFUSERS_CACHE', '/tmp/docling_temp/diffusers_cache'), | |
os.environ.get('ACCELERATE_CACHE', '/tmp/docling_temp/accelerate_cache'), | |
os.environ.get('TORCH_HOME', '/tmp/docling_temp/torch'), | |
os.environ.get('TENSORFLOW_HOME', '/tmp/docling_temp/tensorflow'), | |
os.environ.get('KERAS_HOME', '/tmp/docling_temp/keras'), | |
] | |
for cache_dir in cache_dirs: | |
try: | |
os.makedirs(cache_dir, exist_ok=True) | |
logger.debug(f"Ensured cache directory exists: {cache_dir}") | |
except Exception as e: | |
logger.warning(f"Could not create cache directory {cache_dir}: {e}") | |
def _export_redacted_markdown(self, document, redacted_json): | |
"""Export redacted markdown using Docling's Document class for proper formatting.""" | |
try: | |
# Try different possible import paths for Docling Document class | |
try: | |
from docling.document import Document | |
except ImportError: | |
try: | |
from docling import Document | |
except ImportError: | |
try: | |
from docling.core import Document | |
except ImportError: | |
# If all imports fail, use the fallback method | |
logger.warning("Could not import Docling Document class from any known location") | |
raise ImportError("Docling Document class not found") | |
# Create a new Document from the redacted JSON | |
redacted_document = Document.from_dict(redacted_json) | |
# Use Docling's export method for proper markdown formatting | |
redacted_md = redacted_document.export_to_markdown() | |
logger.info("Successfully generated redacted markdown using Docling Document class") | |
return redacted_md | |
except Exception as e: | |
logger.warning(f"Failed to create Docling Document from redacted JSON: {e}") | |
logger.info("Falling back to manual JSON-to-markdown conversion") | |
# Fallback to the old method if Docling Document creation fails | |
return self._json_to_markdown(redacted_json) | |
def generate_redacted_pdf(self, redacted_json: dict, output_path: str) -> bool: | |
""" | |
Generate a redacted PDF from the redacted JSON structure. | |
Args: | |
redacted_json: The redacted document JSON structure | |
output_path: Path where the PDF should be saved | |
Returns: | |
bool: True if PDF generation was successful, False otherwise | |
""" | |
try: | |
# Import required libraries | |
from reportlab.lib.pagesizes import letter, A4 | |
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle | |
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
from reportlab.lib.units import inch | |
from reportlab.lib import colors | |
import io | |
logger.info(f"Generating redacted PDF: {output_path}") | |
# Create PDF document | |
doc = SimpleDocTemplate(output_path, pagesize=A4) | |
story = [] | |
# Get styles | |
styles = getSampleStyleSheet() | |
normal_style = styles['Normal'] | |
heading_style = styles['Heading1'] | |
# Create custom styles for better formatting | |
table_style = ParagraphStyle( | |
'TableStyle', | |
parent=normal_style, | |
fontName='Courier', | |
fontSize=9, | |
spaceAfter=6 | |
) | |
# Process text elements from JSON | |
texts = redacted_json.get("texts", []) | |
# Group consecutive table-like elements together | |
i = 0 | |
while i < len(texts): | |
text_elem = texts[i] | |
text_content = text_elem.get("text", "").strip() | |
label = text_elem.get("label", "") | |
level = text_elem.get("level", 0) | |
if not text_content: | |
i += 1 | |
continue | |
# Handle different content types | |
if label == "section_header": | |
# Create header with appropriate level | |
if level == 1: | |
story.append(Paragraph(text_content, heading_style)) | |
else: | |
# Create sub-heading style | |
sub_heading_style = ParagraphStyle( | |
f'Heading{min(level, 3)}', | |
parent=normal_style, | |
fontSize=14 - level, | |
spaceAfter=12, | |
spaceBefore=12, | |
textColor=colors.darkblue | |
) | |
story.append(Paragraph(text_content, sub_heading_style)) | |
elif label == "list_item": | |
# Handle list items | |
marker = text_elem.get("marker", "•") | |
list_text = f"{marker} {text_content}" | |
story.append(Paragraph(list_text, normal_style)) | |
elif '|' in text_content and text_content.count('|') > 1: | |
# Handle table-like content - collect consecutive table rows | |
table_rows = [] | |
# Add the current row | |
cells = [cell.strip() for cell in text_content.split('|') if cell.strip()] | |
if cells: | |
table_rows.append(cells) | |
# Look ahead for consecutive table rows | |
j = i + 1 | |
while j < len(texts): | |
next_text = texts[j].get("text", "").strip() | |
if '|' in next_text and next_text.count('|') > 1: | |
next_cells = [cell.strip() for cell in next_text.split('|') if cell.strip()] | |
if next_cells: | |
table_rows.append(next_cells) | |
j += 1 | |
else: | |
break | |
# Create table if we have rows | |
if table_rows: | |
table = Table(table_rows) | |
table.setStyle(TableStyle([ | |
('ALIGN', (0, 0), (-1, -1), 'LEFT'), | |
('FONTNAME', (0, 0), (-1, -1), 'Courier'), | |
('FONTSIZE', (0, 0), (-1, -1), 9), | |
('BOTTOMPADDING', (0, 0), (-1, -1), 3), | |
('TOPPADDING', (0, 0), (-1, -1), 3), | |
('GRID', (0, 0), (-1, -1), 0.5, colors.grey), | |
('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), # Header row | |
])) | |
story.append(table) | |
story.append(Spacer(1, 6)) | |
# Skip the rows we've already processed | |
i = j - 1 | |
else: | |
# Single row or no valid cells | |
story.append(Paragraph(text_content, table_style)) | |
else: | |
# Regular text content | |
story.append(Paragraph(text_content, normal_style)) | |
# Add small spacing between elements | |
story.append(Spacer(1, 3)) | |
i += 1 | |
# Build PDF | |
doc.build(story) | |
logger.info(f"Successfully generated redacted PDF: {output_path}") | |
return True | |
except ImportError as e: | |
logger.error(f"Required PDF generation libraries not available: {e}") | |
logger.info("Install reportlab with: pip install reportlab") | |
return False | |
except Exception as e: | |
logger.error(f"Error generating redacted PDF: {e}") | |
return False | |
def _json_to_markdown(self, json_data: dict) -> str: | |
"""Convert JSON document structure back to markdown format using Docling's structure.""" | |
markdown_lines = [] | |
# Get all text elements from the JSON | |
texts = json_data.get("texts", []) | |
for text_elem in texts: | |
text_content = text_elem.get("text", "") | |
label = text_elem.get("label", "") | |
level = text_elem.get("level", 0) | |
if not text_content.strip(): | |
continue | |
# Format based on the label and level (following Docling's structure) | |
if label == "section_header": | |
# Add appropriate markdown headers | |
if level == 1: | |
markdown_lines.append(f"# {text_content}") | |
elif level == 2: | |
markdown_lines.append(f"## {text_content}") | |
elif level == 3: | |
markdown_lines.append(f"### {text_content}") | |
else: | |
markdown_lines.append(f"#### {text_content}") | |
elif label == "list_item": | |
# Handle list items - preserve the original marker | |
marker = text_elem.get("marker", "-") | |
markdown_lines.append(f"{marker} {text_content}") | |
elif label == "text": | |
# Regular text content - preserve as-is | |
markdown_lines.append(text_content) | |
else: | |
# Default to regular text | |
markdown_lines.append(text_content) | |
# Join without extra spacing to match Docling's formatting | |
return "\n".join(markdown_lines) | |