doctorecord / src /services /azure_di_service.py
levalencia's picture
feat: update unique indices combinator to return array of objects
f98e92f
"""Real wrapper goes here – currently not used by stub agents."""
import logging
import json
from datetime import datetime
from pathlib import Path
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.ai.documentintelligence.models import DocumentContentFormat
from azure.core.credentials import AzureKeyCredential
class AzureDIService:
def __init__(self, endpoint: str, key: str):
self.client = DocumentIntelligenceClient(endpoint=endpoint, credential=AzureKeyCredential(key))
self.logger = logging.getLogger(__name__)
self.log_dir = Path("logs/di_content").absolute()
self.log_dir.mkdir(parents=True, exist_ok=True)
self.logger.info(f"Log directory created at: {self.log_dir}")
self.logger.info(f"Absolute path: {self.log_dir.absolute()}")
def _get_original_html_table(self, table):
"""Generate HTML for the original table structure."""
if not hasattr(table, 'cells'):
return ""
# Get dimensions
rows = max(cell.row_index + (getattr(cell, 'row_span', 1) or 1) - 1 for cell in table.cells) + 1
cols = max(cell.column_index + (getattr(cell, 'column_span', 1) or 1) - 1 for cell in table.cells) + 1
# Create matrix
matrix = [[None for _ in range(cols)] for _ in range(rows)]
for cell in table.cells:
r0 = cell.row_index
c0 = cell.column_index
r_span = getattr(cell, 'row_span', 1) or 1
c_span = getattr(cell, 'column_span', 1) or 1
for dr in range(r_span):
for dc in range(c_span):
matrix[r0 + dr][c0 + dc] = cell.content
# Generate HTML
html = ['<table border="1">']
for row in matrix:
html.append('<tr>')
for cell in row:
if cell is not None:
html.append(f'<td>{cell}</td>')
else:
html.append('<td></td>')
html.append('</tr>')
html.append('</table>')
return '\n'.join(html)
def extract_tables(self, pdf_bytes: bytes):
try:
self.logger.info("Starting document analysis with Azure Document Intelligence")
# Get document analysis
poller = self.client.begin_analyze_document(
"prebuilt-layout",
body=pdf_bytes,
content_type="application/octet-stream",
output_content_format=DocumentContentFormat.MARKDOWN
)
result = poller.result()
# Extract tables
tables = []
for table in getattr(result, "tables", []):
table_data = {
"original_html": self._get_original_html_table(table)
}
tables.append(table_data)
# Save tables for debugging
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Save HTML version
html_path = self.log_dir / f"di_content_{timestamp}_tables.html"
with open(html_path, "w", encoding="utf-8") as f:
f.write("""<!DOCTYPE html>
<html>
<head>
<title>Azure DI Tables</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.table-container { margin-bottom: 40px; }
h2 { color: #333; }
table { border-collapse: collapse; width: 100%; margin-bottom: 10px; }
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
th { background-color: #f5f5f5; }
hr { border: none; border-top: 2px solid #ccc; margin: 20px 0; }
</style>
</head>
<body>
<h1>Azure Document Intelligence Tables</h1>
""")
for i, table in enumerate(tables, 1):
f.write(f"""
<div class="table-container">
<h2>Table {i}</h2>
{table['original_html']}
<hr>
</div>
""")
f.write("</body></html>")
self.logger.info(f"Saved HTML tables to: {html_path.absolute()}")
return {
"text": result.content if hasattr(result, "content") else "",
"tables": tables
}
except Exception as e:
self.logger.error(f"Error extracting tables: {str(e)}")
raise
def _table_to_markdown(self, table) -> str:
"""Convert a table to markdown format."""
expanded = self._expand_table(table)
if not expanded:
return ""
# Convert to markdown
md_rows = []
for row in expanded:
md_row = "| " + " | ".join(str(cell if cell is not None else "") for cell in row) + " |"
md_rows.append(md_row)
# Add header separator
if md_rows:
header = md_rows[0]
separator = "| " + " | ".join(["---"] * len(expanded[0])) + " |"
md_rows.insert(1, separator)
return "\n".join(md_rows)