File size: 5,136 Bytes
0a40afa
 
54478a0
26b3eb7
 
0a40afa
54478a0
0a40afa
 
 
 
 
 
f98e92f
26b3eb7
f98e92f
 
0a40afa
f98e92f
 
924cb7d
f98e92f
924cb7d
f98e92f
 
 
924cb7d
f98e92f
 
924cb7d
f98e92f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
924cb7d
0a40afa
 
 
 
f98e92f
 
54478a0
 
924cb7d
54478a0
 
f98e92f
924cb7d
f98e92f
 
 
 
 
 
 
924cb7d
f98e92f
924cb7d
54478a0
f98e92f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0a40afa
924cb7d
f98e92f
 
924cb7d
0a40afa
 
f98e92f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
"""Real wrapper goes here – currently not used by stub agents."""
import logging
import json
from datetime import datetime
from pathlib import Path
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.ai.documentintelligence.models import DocumentContentFormat
from azure.core.credentials import AzureKeyCredential

class AzureDIService:
    def __init__(self, endpoint: str, key: str):
        self.client = DocumentIntelligenceClient(endpoint=endpoint, credential=AzureKeyCredential(key))
        self.logger = logging.getLogger(__name__)
        self.log_dir = Path("logs/di_content").absolute()
        self.log_dir.mkdir(parents=True, exist_ok=True)
        self.logger.info(f"Log directory created at: {self.log_dir}")
        self.logger.info(f"Absolute path: {self.log_dir.absolute()}")

    def _get_original_html_table(self, table):
        """Generate HTML for the original table structure."""
        if not hasattr(table, 'cells'):
            return ""
            
        # Get dimensions
        rows = max(cell.row_index + (getattr(cell, 'row_span', 1) or 1) - 1 for cell in table.cells) + 1
        cols = max(cell.column_index + (getattr(cell, 'column_span', 1) or 1) - 1 for cell in table.cells) + 1
        
        # Create matrix
        matrix = [[None for _ in range(cols)] for _ in range(rows)]
        for cell in table.cells:
            r0 = cell.row_index
            c0 = cell.column_index
            r_span = getattr(cell, 'row_span', 1) or 1
            c_span = getattr(cell, 'column_span', 1) or 1
            for dr in range(r_span):
                for dc in range(c_span):
                    matrix[r0 + dr][c0 + dc] = cell.content
        
        # Generate HTML
        html = ['<table border="1">']
        for row in matrix:
            html.append('<tr>')
            for cell in row:
                if cell is not None:
                    html.append(f'<td>{cell}</td>')
                else:
                    html.append('<td></td>')
            html.append('</tr>')
        html.append('</table>')
        return '\n'.join(html)

    def extract_tables(self, pdf_bytes: bytes):
        try:
            self.logger.info("Starting document analysis with Azure Document Intelligence")
            
            # Get document analysis
            poller = self.client.begin_analyze_document(
                "prebuilt-layout", 
                body=pdf_bytes, 
                content_type="application/octet-stream",
                output_content_format=DocumentContentFormat.MARKDOWN
            )
            result = poller.result()
            
            # Extract tables
            tables = []
            for table in getattr(result, "tables", []):
                table_data = {
                    "original_html": self._get_original_html_table(table)
                }
                tables.append(table_data)
            
            # Save tables for debugging
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            
            # Save HTML version
            html_path = self.log_dir / f"di_content_{timestamp}_tables.html"
            with open(html_path, "w", encoding="utf-8") as f:
                f.write("""<!DOCTYPE html>
<html>
<head>
    <title>Azure DI Tables</title>
    <style>
        body { font-family: Arial, sans-serif; margin: 20px; }
        .table-container { margin-bottom: 40px; }
        h2 { color: #333; }
        table { border-collapse: collapse; width: 100%; margin-bottom: 10px; }
        th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
        th { background-color: #f5f5f5; }
        hr { border: none; border-top: 2px solid #ccc; margin: 20px 0; }
    </style>
</head>
<body>
    <h1>Azure Document Intelligence Tables</h1>
""")
                
                for i, table in enumerate(tables, 1):
                    f.write(f"""
    <div class="table-container">
        <h2>Table {i}</h2>
        {table['original_html']}
        <hr>
    </div>
""")
                
                f.write("</body></html>")
            self.logger.info(f"Saved HTML tables to: {html_path.absolute()}")
            
            return {
                "text": result.content if hasattr(result, "content") else "",
                "tables": tables
            }
            
        except Exception as e:
            self.logger.error(f"Error extracting tables: {str(e)}")
            raise

    def _table_to_markdown(self, table) -> str:
        """Convert a table to markdown format."""
        expanded = self._expand_table(table)
        if not expanded:
            return ""
            
        # Convert to markdown
        md_rows = []
        for row in expanded:
            md_row = "| " + " | ".join(str(cell if cell is not None else "") for cell in row) + " |"
            md_rows.append(md_row)
            
        # Add header separator
        if md_rows:
            header = md_rows[0]
            separator = "| " + " | ".join(["---"] * len(expanded[0])) + " |"
            md_rows.insert(1, separator)
            
        return "\n".join(md_rows)