File size: 7,782 Bytes
9145e48 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
import logging
import tempfile
import os
from pathlib import Path
from typing import Optional, Dict, Any
import asyncio
# Document processing libraries
import PyPDF2
from docx import Document as DocxDocument
from PIL import Image
import pytesseract
from .models import Document, DocumentType
import config
logger = logging.getLogger(__name__)
class DocumentParser:
def __init__(self):
self.config = config.config
async def parse_document(self, file_path: str, filename: str) -> Document:
"""Parse a document and extract its content"""
try:
file_ext = Path(filename).suffix.lower()
file_size = os.path.getsize(file_path)
# Determine document type and parse accordingly
if file_ext == '.pdf':
content = await self._parse_pdf(file_path)
doc_type = DocumentType.PDF
elif file_ext == '.txt':
content = await self._parse_text(file_path)
doc_type = DocumentType.TEXT
elif file_ext == '.docx':
content = await self._parse_docx(file_path)
doc_type = DocumentType.DOCX
elif file_ext in ['.png', '.jpg', '.jpeg', '.bmp', '.tiff']:
content = await self._parse_image(file_path)
doc_type = DocumentType.IMAGE
else:
raise ValueError(f"Unsupported file type: {file_ext}")
# Create document object
document = Document(
id=self._generate_document_id(),
filename=filename,
content=content,
doc_type=doc_type,
file_size=file_size,
metadata={
"file_extension": file_ext,
"content_length": len(content),
"word_count": len(content.split()) if content else 0
}
)
logger.info(f"Successfully parsed document: {filename}")
return document
except Exception as e:
logger.error(f"Error parsing document {filename}: {str(e)}")
raise
async def _parse_pdf(self, file_path: str) -> str:
"""Extract text from PDF file"""
try:
content = ""
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
for page_num, page in enumerate(pdf_reader.pages):
try:
page_text = page.extract_text()
if page_text.strip():
content += f"\n--- Page {page_num + 1} ---\n"
content += page_text + "\n"
except Exception as e:
logger.warning(f"Error extracting text from page {page_num + 1}: {str(e)}")
continue
return content.strip()
except Exception as e:
logger.error(f"Error parsing PDF: {str(e)}")
raise
async def _parse_text(self, file_path: str) -> str:
"""Read plain text file"""
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as file:
content = file.read()
return content.strip()
except Exception as e:
logger.error(f"Error parsing text file: {str(e)}")
raise
async def _parse_docx(self, file_path: str) -> str:
"""Extract text from DOCX file"""
try:
doc = DocxDocument(file_path)
content = ""
for paragraph in doc.paragraphs:
if paragraph.text.strip():
content += paragraph.text + "\n"
# Extract text from tables
for table in doc.tables:
for row in table.rows:
row_text = []
for cell in row.cells:
if cell.text.strip():
row_text.append(cell.text.strip())
if row_text:
content += " | ".join(row_text) + "\n"
return content.strip()
except Exception as e:
logger.error(f"Error parsing DOCX file: {str(e)}")
raise
async def _parse_image(self, file_path: str) -> str:
"""Extract text from image using OCR"""
try:
# First try with OCR service if available
if hasattr(self, 'ocr_service') and self.ocr_service:
logger.info(f"Using OCR service for image: {file_path}")
text = await self.ocr_service.extract_text_from_image(file_path)
if text:
return text
# Fallback to direct pytesseract
logger.info(f"Using direct pytesseract for image: {file_path}")
image = Image.open(file_path)
# Perform OCR
content = pytesseract.image_to_string(
image,
lang=self.config.OCR_LANGUAGE,
config='--psm 6' # Assume a single uniform block of text
)
return content.strip()
except Exception as e:
logger.error(f"Error performing OCR on image: {str(e)}")
# Return empty string if OCR fails
return ""
def _generate_document_id(self) -> str:
"""Generate a unique document ID"""
import uuid
return str(uuid.uuid4())
async def extract_metadata(self, file_path: str, content: str) -> Dict[str, Any]:
"""Extract additional metadata from the document"""
try:
metadata = {}
# Basic statistics
metadata["content_length"] = len(content)
metadata["word_count"] = len(content.split()) if content else 0
metadata["line_count"] = len(content.splitlines()) if content else 0
# File information
file_stat = os.stat(file_path)
metadata["file_size"] = file_stat.st_size
metadata["created_time"] = file_stat.st_ctime
metadata["modified_time"] = file_stat.st_mtime
# Content analysis
if content:
# Language detection (simple heuristic)
metadata["estimated_language"] = self._detect_language(content)
# Reading time estimation (average 200 words per minute)
metadata["estimated_reading_time_minutes"] = max(1, metadata["word_count"] // 200)
return metadata
except Exception as e:
logger.error(f"Error extracting metadata: {str(e)}")
return {}
def _detect_language(self, content: str) -> str:
"""Simple language detection based on character patterns"""
# This is a very basic implementation
# In production, you might want to use a proper language detection library
if not content:
return "unknown"
# Count common English words
english_words = ["the", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by", "from", "as", "is", "was", "are", "were", "be", "been", "have", "has", "had", "do", "does", "did", "will", "would", "could", "should", "may", "might", "can", "this", "that", "these", "those"]
words = content.lower().split()
english_count = sum(1 for word in words if word in english_words)
if len(words) > 0 and english_count / len(words) > 0.1:
return "en"
else:
return "unknown" |