|
import logging |
|
import tempfile |
|
import os |
|
from pathlib import Path |
|
from typing import Optional, Dict, Any |
|
import asyncio |
|
|
|
|
|
import PyPDF2 |
|
from docx import Document as DocxDocument |
|
from PIL import Image |
|
import pytesseract |
|
|
|
from .models import Document, DocumentType |
|
import config |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class DocumentParser: |
|
def __init__(self): |
|
self.config = config.config |
|
|
|
async def parse_document(self, file_path: str, filename: str) -> Document: |
|
"""Parse a document and extract its content""" |
|
try: |
|
file_ext = Path(filename).suffix.lower() |
|
file_size = os.path.getsize(file_path) |
|
|
|
|
|
if file_ext == '.pdf': |
|
content = await self._parse_pdf(file_path) |
|
doc_type = DocumentType.PDF |
|
elif file_ext == '.txt': |
|
content = await self._parse_text(file_path) |
|
doc_type = DocumentType.TEXT |
|
elif file_ext == '.docx': |
|
content = await self._parse_docx(file_path) |
|
doc_type = DocumentType.DOCX |
|
elif file_ext in ['.png', '.jpg', '.jpeg', '.bmp', '.tiff']: |
|
content = await self._parse_image(file_path) |
|
doc_type = DocumentType.IMAGE |
|
else: |
|
raise ValueError(f"Unsupported file type: {file_ext}") |
|
|
|
|
|
document = Document( |
|
id=self._generate_document_id(), |
|
filename=filename, |
|
content=content, |
|
doc_type=doc_type, |
|
file_size=file_size, |
|
metadata={ |
|
"file_extension": file_ext, |
|
"content_length": len(content), |
|
"word_count": len(content.split()) if content else 0 |
|
} |
|
) |
|
|
|
logger.info(f"Successfully parsed document: {filename}") |
|
return document |
|
|
|
except Exception as e: |
|
logger.error(f"Error parsing document {filename}: {str(e)}") |
|
raise |
|
|
|
async def _parse_pdf(self, file_path: str) -> str: |
|
"""Extract text from PDF file""" |
|
try: |
|
content = "" |
|
with open(file_path, 'rb') as file: |
|
pdf_reader = PyPDF2.PdfReader(file) |
|
for page_num, page in enumerate(pdf_reader.pages): |
|
try: |
|
page_text = page.extract_text() |
|
if page_text.strip(): |
|
content += f"\n--- Page {page_num + 1} ---\n" |
|
content += page_text + "\n" |
|
except Exception as e: |
|
logger.warning(f"Error extracting text from page {page_num + 1}: {str(e)}") |
|
continue |
|
|
|
return content.strip() |
|
except Exception as e: |
|
logger.error(f"Error parsing PDF: {str(e)}") |
|
raise |
|
|
|
async def _parse_text(self, file_path: str) -> str: |
|
"""Read plain text file""" |
|
try: |
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as file: |
|
content = file.read() |
|
return content.strip() |
|
except Exception as e: |
|
logger.error(f"Error parsing text file: {str(e)}") |
|
raise |
|
|
|
async def _parse_docx(self, file_path: str) -> str: |
|
"""Extract text from DOCX file""" |
|
try: |
|
doc = DocxDocument(file_path) |
|
content = "" |
|
|
|
for paragraph in doc.paragraphs: |
|
if paragraph.text.strip(): |
|
content += paragraph.text + "\n" |
|
|
|
|
|
for table in doc.tables: |
|
for row in table.rows: |
|
row_text = [] |
|
for cell in row.cells: |
|
if cell.text.strip(): |
|
row_text.append(cell.text.strip()) |
|
if row_text: |
|
content += " | ".join(row_text) + "\n" |
|
|
|
return content.strip() |
|
except Exception as e: |
|
logger.error(f"Error parsing DOCX file: {str(e)}") |
|
raise |
|
|
|
async def _parse_image(self, file_path: str) -> str: |
|
"""Extract text from image using OCR""" |
|
try: |
|
|
|
if hasattr(self, 'ocr_service') and self.ocr_service: |
|
logger.info(f"Using OCR service for image: {file_path}") |
|
text = await self.ocr_service.extract_text_from_image(file_path) |
|
if text: |
|
return text |
|
|
|
|
|
logger.info(f"Using direct pytesseract for image: {file_path}") |
|
image = Image.open(file_path) |
|
|
|
|
|
content = pytesseract.image_to_string( |
|
image, |
|
lang=self.config.OCR_LANGUAGE, |
|
config='--psm 6' |
|
) |
|
|
|
return content.strip() |
|
except Exception as e: |
|
logger.error(f"Error performing OCR on image: {str(e)}") |
|
|
|
return "" |
|
|
|
def _generate_document_id(self) -> str: |
|
"""Generate a unique document ID""" |
|
import uuid |
|
return str(uuid.uuid4()) |
|
|
|
async def extract_metadata(self, file_path: str, content: str) -> Dict[str, Any]: |
|
"""Extract additional metadata from the document""" |
|
try: |
|
metadata = {} |
|
|
|
|
|
metadata["content_length"] = len(content) |
|
metadata["word_count"] = len(content.split()) if content else 0 |
|
metadata["line_count"] = len(content.splitlines()) if content else 0 |
|
|
|
|
|
file_stat = os.stat(file_path) |
|
metadata["file_size"] = file_stat.st_size |
|
metadata["created_time"] = file_stat.st_ctime |
|
metadata["modified_time"] = file_stat.st_mtime |
|
|
|
|
|
if content: |
|
|
|
metadata["estimated_language"] = self._detect_language(content) |
|
|
|
|
|
metadata["estimated_reading_time_minutes"] = max(1, metadata["word_count"] // 200) |
|
|
|
return metadata |
|
except Exception as e: |
|
logger.error(f"Error extracting metadata: {str(e)}") |
|
return {} |
|
|
|
def _detect_language(self, content: str) -> str: |
|
"""Simple language detection based on character patterns""" |
|
|
|
|
|
if not content: |
|
return "unknown" |
|
|
|
|
|
english_words = ["the", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by", "from", "as", "is", "was", "are", "were", "be", "been", "have", "has", "had", "do", "does", "did", "will", "would", "could", "should", "may", "might", "can", "this", "that", "these", "those"] |
|
|
|
words = content.lower().split() |
|
english_count = sum(1 for word in words if word in english_words) |
|
|
|
if len(words) > 0 and english_count / len(words) > 0.1: |
|
return "en" |
|
else: |
|
return "unknown" |