Spaces:
Sleeping
Sleeping
"""Real wrapper goes here – currently not used by stub agents.""" | |
import logging | |
import json | |
from datetime import datetime | |
from pathlib import Path | |
from azure.ai.documentintelligence import DocumentIntelligenceClient | |
from azure.ai.documentintelligence.models import DocumentContentFormat | |
from azure.core.credentials import AzureKeyCredential | |
class AzureDIService: | |
def __init__(self, endpoint: str, key: str): | |
self.client = DocumentIntelligenceClient(endpoint=endpoint, credential=AzureKeyCredential(key)) | |
self.logger = logging.getLogger(__name__) | |
self.log_dir = Path("logs/di_content").absolute() | |
self.log_dir.mkdir(parents=True, exist_ok=True) | |
self.logger.info(f"Log directory created at: {self.log_dir}") | |
self.logger.info(f"Absolute path: {self.log_dir.absolute()}") | |
def _get_original_html_table(self, table): | |
"""Generate HTML for the original table structure.""" | |
if not hasattr(table, 'cells'): | |
return "" | |
# Get dimensions | |
rows = max(cell.row_index + (getattr(cell, 'row_span', 1) or 1) - 1 for cell in table.cells) + 1 | |
cols = max(cell.column_index + (getattr(cell, 'column_span', 1) or 1) - 1 for cell in table.cells) + 1 | |
# Create matrix | |
matrix = [[None for _ in range(cols)] for _ in range(rows)] | |
for cell in table.cells: | |
r0 = cell.row_index | |
c0 = cell.column_index | |
r_span = getattr(cell, 'row_span', 1) or 1 | |
c_span = getattr(cell, 'column_span', 1) or 1 | |
for dr in range(r_span): | |
for dc in range(c_span): | |
matrix[r0 + dr][c0 + dc] = cell.content | |
# Generate HTML | |
html = ['<table border="1">'] | |
for row in matrix: | |
html.append('<tr>') | |
for cell in row: | |
if cell is not None: | |
html.append(f'<td>{cell}</td>') | |
else: | |
html.append('<td></td>') | |
html.append('</tr>') | |
html.append('</table>') | |
return '\n'.join(html) | |
def extract_tables(self, pdf_bytes: bytes): | |
try: | |
self.logger.info("Starting document analysis with Azure Document Intelligence") | |
# Get document analysis | |
poller = self.client.begin_analyze_document( | |
"prebuilt-layout", | |
body=pdf_bytes, | |
content_type="application/octet-stream", | |
output_content_format=DocumentContentFormat.MARKDOWN | |
) | |
result = poller.result() | |
# Extract tables | |
tables = [] | |
for table in getattr(result, "tables", []): | |
table_data = { | |
"original_html": self._get_original_html_table(table) | |
} | |
tables.append(table_data) | |
# Save tables for debugging | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
# Save HTML version | |
html_path = self.log_dir / f"di_content_{timestamp}_tables.html" | |
with open(html_path, "w", encoding="utf-8") as f: | |
f.write("""<!DOCTYPE html> | |
<html> | |
<head> | |
<title>Azure DI Tables</title> | |
<style> | |
body { font-family: Arial, sans-serif; margin: 20px; } | |
.table-container { margin-bottom: 40px; } | |
h2 { color: #333; } | |
table { border-collapse: collapse; width: 100%; margin-bottom: 10px; } | |
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; } | |
th { background-color: #f5f5f5; } | |
hr { border: none; border-top: 2px solid #ccc; margin: 20px 0; } | |
</style> | |
</head> | |
<body> | |
<h1>Azure Document Intelligence Tables</h1> | |
""") | |
for i, table in enumerate(tables, 1): | |
f.write(f""" | |
<div class="table-container"> | |
<h2>Table {i}</h2> | |
{table['original_html']} | |
<hr> | |
</div> | |
""") | |
f.write("</body></html>") | |
self.logger.info(f"Saved HTML tables to: {html_path.absolute()}") | |
return { | |
"text": result.content if hasattr(result, "content") else "", | |
"tables": tables | |
} | |
except Exception as e: | |
self.logger.error(f"Error extracting tables: {str(e)}") | |
raise | |
def _table_to_markdown(self, table) -> str: | |
"""Convert a table to markdown format.""" | |
expanded = self._expand_table(table) | |
if not expanded: | |
return "" | |
# Convert to markdown | |
md_rows = [] | |
for row in expanded: | |
md_row = "| " + " | ".join(str(cell if cell is not None else "") for cell in row) + " |" | |
md_rows.append(md_row) | |
# Add header separator | |
if md_rows: | |
header = md_rows[0] | |
separator = "| " + " | ".join(["---"] * len(expanded[0])) + " |" | |
md_rows.insert(1, separator) | |
return "\n".join(md_rows) |