File size: 7,782 Bytes
9145e48
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import logging
import tempfile
import os
from pathlib import Path
from typing import Optional, Dict, Any
import asyncio

# Document processing libraries
import PyPDF2
from docx import Document as DocxDocument
from PIL import Image
import pytesseract

from .models import Document, DocumentType
import config

logger = logging.getLogger(__name__)

class DocumentParser:
    def __init__(self):
        self.config = config.config
    
    async def parse_document(self, file_path: str, filename: str) -> Document:
        """Parse a document and extract its content"""
        try:
            file_ext = Path(filename).suffix.lower()
            file_size = os.path.getsize(file_path)
            
            # Determine document type and parse accordingly
            if file_ext == '.pdf':
                content = await self._parse_pdf(file_path)
                doc_type = DocumentType.PDF
            elif file_ext == '.txt':
                content = await self._parse_text(file_path)
                doc_type = DocumentType.TEXT
            elif file_ext == '.docx':
                content = await self._parse_docx(file_path)
                doc_type = DocumentType.DOCX
            elif file_ext in ['.png', '.jpg', '.jpeg', '.bmp', '.tiff']:
                content = await self._parse_image(file_path)
                doc_type = DocumentType.IMAGE
            else:
                raise ValueError(f"Unsupported file type: {file_ext}")
            
            # Create document object
            document = Document(
                id=self._generate_document_id(),
                filename=filename,
                content=content,
                doc_type=doc_type,
                file_size=file_size,
                metadata={
                    "file_extension": file_ext,
                    "content_length": len(content),
                    "word_count": len(content.split()) if content else 0
                }
            )
            
            logger.info(f"Successfully parsed document: {filename}")
            return document
            
        except Exception as e:
            logger.error(f"Error parsing document {filename}: {str(e)}")
            raise
    
    async def _parse_pdf(self, file_path: str) -> str:
        """Extract text from PDF file"""
        try:
            content = ""
            with open(file_path, 'rb') as file:
                pdf_reader = PyPDF2.PdfReader(file)
                for page_num, page in enumerate(pdf_reader.pages):
                    try:
                        page_text = page.extract_text()
                        if page_text.strip():
                            content += f"\n--- Page {page_num + 1} ---\n"
                            content += page_text + "\n"
                    except Exception as e:
                        logger.warning(f"Error extracting text from page {page_num + 1}: {str(e)}")
                        continue
            
            return content.strip()
        except Exception as e:
            logger.error(f"Error parsing PDF: {str(e)}")
            raise
    
    async def _parse_text(self, file_path: str) -> str:
        """Read plain text file"""
        try:
            with open(file_path, 'r', encoding='utf-8', errors='ignore') as file:
                content = file.read()
            return content.strip()
        except Exception as e:
            logger.error(f"Error parsing text file: {str(e)}")
            raise
    
    async def _parse_docx(self, file_path: str) -> str:
        """Extract text from DOCX file"""
        try:
            doc = DocxDocument(file_path)
            content = ""
            
            for paragraph in doc.paragraphs:
                if paragraph.text.strip():
                    content += paragraph.text + "\n"
            
            # Extract text from tables
            for table in doc.tables:
                for row in table.rows:
                    row_text = []
                    for cell in row.cells:
                        if cell.text.strip():
                            row_text.append(cell.text.strip())
                    if row_text:
                        content += " | ".join(row_text) + "\n"
            
            return content.strip()
        except Exception as e:
            logger.error(f"Error parsing DOCX file: {str(e)}")
            raise
    
    async def _parse_image(self, file_path: str) -> str:
        """Extract text from image using OCR"""
        try:
            # First try with OCR service if available
            if hasattr(self, 'ocr_service') and self.ocr_service:
                logger.info(f"Using OCR service for image: {file_path}")
                text = await self.ocr_service.extract_text_from_image(file_path)
                if text:
                    return text
            
            # Fallback to direct pytesseract
            logger.info(f"Using direct pytesseract for image: {file_path}")
            image = Image.open(file_path)
            
            # Perform OCR
            content = pytesseract.image_to_string(
                image,
                lang=self.config.OCR_LANGUAGE,
                config='--psm 6'  # Assume a single uniform block of text
            )
            
            return content.strip()
        except Exception as e:
            logger.error(f"Error performing OCR on image: {str(e)}")
            # Return empty string if OCR fails
            return ""
    
    def _generate_document_id(self) -> str:
        """Generate a unique document ID"""
        import uuid
        return str(uuid.uuid4())
    
    async def extract_metadata(self, file_path: str, content: str) -> Dict[str, Any]:
        """Extract additional metadata from the document"""
        try:
            metadata = {}
            
            # Basic statistics
            metadata["content_length"] = len(content)
            metadata["word_count"] = len(content.split()) if content else 0
            metadata["line_count"] = len(content.splitlines()) if content else 0
            
            # File information
            file_stat = os.stat(file_path)
            metadata["file_size"] = file_stat.st_size
            metadata["created_time"] = file_stat.st_ctime
            metadata["modified_time"] = file_stat.st_mtime
            
            # Content analysis
            if content:
                # Language detection (simple heuristic)
                metadata["estimated_language"] = self._detect_language(content)
                
                # Reading time estimation (average 200 words per minute)
                metadata["estimated_reading_time_minutes"] = max(1, metadata["word_count"] // 200)
            
            return metadata
        except Exception as e:
            logger.error(f"Error extracting metadata: {str(e)}")
            return {}
    
    def _detect_language(self, content: str) -> str:
        """Simple language detection based on character patterns"""
        # This is a very basic implementation
        # In production, you might want to use a proper language detection library
        if not content:
            return "unknown"
        
        # Count common English words
        english_words = ["the", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by", "from", "as", "is", "was", "are", "were", "be", "been", "have", "has", "had", "do", "does", "did", "will", "would", "could", "should", "may", "might", "can", "this", "that", "these", "those"]
        
        words = content.lower().split()
        english_count = sum(1 for word in words if word in english_words)
        
        if len(words) > 0 and english_count / len(words) > 0.1:
            return "en"
        else:
            return "unknown"