Spaces:
Sleeping
Sleeping
File size: 5,136 Bytes
0a40afa 54478a0 26b3eb7 0a40afa 54478a0 0a40afa f98e92f 26b3eb7 f98e92f 0a40afa f98e92f 924cb7d f98e92f 924cb7d f98e92f 924cb7d f98e92f 924cb7d f98e92f 924cb7d 0a40afa f98e92f 54478a0 924cb7d 54478a0 f98e92f 924cb7d f98e92f 924cb7d f98e92f 924cb7d 54478a0 f98e92f 0a40afa 924cb7d f98e92f 924cb7d 0a40afa f98e92f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
"""Real wrapper goes here – currently not used by stub agents."""
import logging
import json
from datetime import datetime
from pathlib import Path
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.ai.documentintelligence.models import DocumentContentFormat
from azure.core.credentials import AzureKeyCredential
class AzureDIService:
def __init__(self, endpoint: str, key: str):
self.client = DocumentIntelligenceClient(endpoint=endpoint, credential=AzureKeyCredential(key))
self.logger = logging.getLogger(__name__)
self.log_dir = Path("logs/di_content").absolute()
self.log_dir.mkdir(parents=True, exist_ok=True)
self.logger.info(f"Log directory created at: {self.log_dir}")
self.logger.info(f"Absolute path: {self.log_dir.absolute()}")
def _get_original_html_table(self, table):
"""Generate HTML for the original table structure."""
if not hasattr(table, 'cells'):
return ""
# Get dimensions
rows = max(cell.row_index + (getattr(cell, 'row_span', 1) or 1) - 1 for cell in table.cells) + 1
cols = max(cell.column_index + (getattr(cell, 'column_span', 1) or 1) - 1 for cell in table.cells) + 1
# Create matrix
matrix = [[None for _ in range(cols)] for _ in range(rows)]
for cell in table.cells:
r0 = cell.row_index
c0 = cell.column_index
r_span = getattr(cell, 'row_span', 1) or 1
c_span = getattr(cell, 'column_span', 1) or 1
for dr in range(r_span):
for dc in range(c_span):
matrix[r0 + dr][c0 + dc] = cell.content
# Generate HTML
html = ['<table border="1">']
for row in matrix:
html.append('<tr>')
for cell in row:
if cell is not None:
html.append(f'<td>{cell}</td>')
else:
html.append('<td></td>')
html.append('</tr>')
html.append('</table>')
return '\n'.join(html)
def extract_tables(self, pdf_bytes: bytes):
try:
self.logger.info("Starting document analysis with Azure Document Intelligence")
# Get document analysis
poller = self.client.begin_analyze_document(
"prebuilt-layout",
body=pdf_bytes,
content_type="application/octet-stream",
output_content_format=DocumentContentFormat.MARKDOWN
)
result = poller.result()
# Extract tables
tables = []
for table in getattr(result, "tables", []):
table_data = {
"original_html": self._get_original_html_table(table)
}
tables.append(table_data)
# Save tables for debugging
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Save HTML version
html_path = self.log_dir / f"di_content_{timestamp}_tables.html"
with open(html_path, "w", encoding="utf-8") as f:
f.write("""<!DOCTYPE html>
<html>
<head>
<title>Azure DI Tables</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.table-container { margin-bottom: 40px; }
h2 { color: #333; }
table { border-collapse: collapse; width: 100%; margin-bottom: 10px; }
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
th { background-color: #f5f5f5; }
hr { border: none; border-top: 2px solid #ccc; margin: 20px 0; }
</style>
</head>
<body>
<h1>Azure Document Intelligence Tables</h1>
""")
for i, table in enumerate(tables, 1):
f.write(f"""
<div class="table-container">
<h2>Table {i}</h2>
{table['original_html']}
<hr>
</div>
""")
f.write("</body></html>")
self.logger.info(f"Saved HTML tables to: {html_path.absolute()}")
return {
"text": result.content if hasattr(result, "content") else "",
"tables": tables
}
except Exception as e:
self.logger.error(f"Error extracting tables: {str(e)}")
raise
def _table_to_markdown(self, table) -> str:
"""Convert a table to markdown format."""
expanded = self._expand_table(table)
if not expanded:
return ""
# Convert to markdown
md_rows = []
for row in expanded:
md_row = "| " + " | ".join(str(cell if cell is not None else "") for cell in row) + " |"
md_rows.append(md_row)
# Add header separator
if md_rows:
header = md_rows[0]
separator = "| " + " | ".join(["---"] * len(expanded[0])) + " |"
md_rows.insert(1, separator)
return "\n".join(md_rows) |