|
""" |
|
Document processing module for parsing and chunking various document formats. |
|
""" |
|
|
|
import re |
|
from pathlib import Path |
|
from typing import Dict, List, Optional, Tuple, Any |
|
import hashlib |
|
import mimetypes |
|
|
|
|
|
import PyPDF2 |
|
from docx import Document as DocxDocument |
|
from io import BytesIO |
|
|
|
from .error_handler import DocumentProcessingError, validate_file_upload |
|
|
|
|
|
class DocumentChunk: |
|
"""Represents a chunk of processed document content.""" |
|
|
|
def __init__( |
|
self, |
|
content: str, |
|
metadata: Dict[str, Any], |
|
chunk_id: str = None |
|
): |
|
self.content = content.strip() |
|
self.metadata = metadata |
|
self.chunk_id = chunk_id or self._generate_chunk_id() |
|
|
|
def _generate_chunk_id(self) -> str: |
|
"""Generate unique chunk ID based on content hash.""" |
|
content_hash = hashlib.md5(self.content.encode()).hexdigest()[:8] |
|
source = self.metadata.get("source", "unknown") |
|
page = self.metadata.get("page", 0) |
|
return f"{Path(source).stem}_{page}_{content_hash}" |
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
"""Convert chunk to dictionary representation.""" |
|
return { |
|
"chunk_id": self.chunk_id, |
|
"content": self.content, |
|
"metadata": self.metadata |
|
} |
|
|
|
|
|
class DocumentProcessor: |
|
"""Main document processing class supporting multiple formats.""" |
|
|
|
def __init__(self, config: Dict[str, Any]): |
|
self.config = config |
|
self.processing_config = config.get("processing", {}) |
|
self.chunk_size = self.processing_config.get("chunk_size", 512) |
|
self.chunk_overlap = self.processing_config.get("chunk_overlap", 50) |
|
self.min_chunk_size = self.processing_config.get("min_chunk_size", 100) |
|
self.max_chunks_per_doc = self.processing_config.get("max_chunks_per_doc", 1000) |
|
self.supported_formats = self.processing_config.get("supported_formats", ["pdf", "docx", "txt"]) |
|
|
|
def process_document( |
|
self, |
|
file_path: str, |
|
filename: Optional[str] = None |
|
) -> List[DocumentChunk]: |
|
""" |
|
Process a document and return list of chunks. |
|
|
|
Args: |
|
file_path: Path to the document file |
|
filename: Optional original filename |
|
|
|
Returns: |
|
List of DocumentChunk objects |
|
""" |
|
|
|
max_size = self.config.get("app", {}).get("max_upload_size", 50) * 1024 * 1024 |
|
allowed_extensions = [f".{fmt}" for fmt in self.supported_formats] |
|
validate_file_upload(file_path, max_size, allowed_extensions) |
|
|
|
file_path = Path(file_path) |
|
filename = filename or file_path.name |
|
|
|
|
|
try: |
|
text_content, metadata = self._extract_text(file_path, filename) |
|
|
|
if not text_content.strip(): |
|
raise DocumentProcessingError("Document appears to be empty or contains no extractable text") |
|
|
|
|
|
chunks = self._create_chunks(text_content, metadata) |
|
|
|
if not chunks: |
|
raise DocumentProcessingError("Failed to create any valid chunks from document") |
|
|
|
if len(chunks) > self.max_chunks_per_doc: |
|
raise DocumentProcessingError( |
|
f"Document too large. Generated {len(chunks)} chunks, " |
|
f"maximum allowed is {self.max_chunks_per_doc}" |
|
) |
|
|
|
return chunks |
|
|
|
except Exception as e: |
|
if isinstance(e, DocumentProcessingError): |
|
raise |
|
else: |
|
raise DocumentProcessingError(f"Failed to process document: {str(e)}") from e |
|
|
|
def _extract_text(self, file_path: Path, filename: str) -> Tuple[str, Dict[str, Any]]: |
|
"""Extract text from document based on file type.""" |
|
file_extension = file_path.suffix.lower() |
|
|
|
|
|
metadata = { |
|
"source": str(file_path), |
|
"filename": filename, |
|
"file_type": file_extension, |
|
"file_size": file_path.stat().st_size |
|
} |
|
|
|
if file_extension == ".pdf": |
|
text, pdf_metadata = self._extract_pdf_text(file_path) |
|
metadata.update(pdf_metadata) |
|
elif file_extension == ".docx": |
|
text, docx_metadata = self._extract_docx_text(file_path) |
|
metadata.update(docx_metadata) |
|
elif file_extension == ".txt": |
|
text, txt_metadata = self._extract_txt_text(file_path) |
|
metadata.update(txt_metadata) |
|
else: |
|
raise DocumentProcessingError(f"Unsupported file format: {file_extension}") |
|
|
|
return text, metadata |
|
|
|
def _extract_pdf_text(self, file_path: Path) -> Tuple[str, Dict[str, Any]]: |
|
"""Extract text from PDF file.""" |
|
try: |
|
with open(file_path, "rb") as file: |
|
pdf_reader = PyPDF2.PdfReader(file) |
|
|
|
if len(pdf_reader.pages) == 0: |
|
raise DocumentProcessingError("PDF file contains no pages") |
|
|
|
text_parts = [] |
|
for page_num, page in enumerate(pdf_reader.pages): |
|
try: |
|
page_text = page.extract_text() |
|
if page_text.strip(): |
|
text_parts.append(f"\n\n--- Page {page_num + 1} ---\n\n{page_text}") |
|
except Exception as e: |
|
|
|
print(f"Warning: Could not extract text from page {page_num + 1}: {e}") |
|
|
|
if not text_parts: |
|
raise DocumentProcessingError("Could not extract any text from PDF") |
|
|
|
|
|
pdf_metadata = { |
|
"page_count": len(pdf_reader.pages), |
|
"pdf_metadata": {} |
|
} |
|
|
|
if pdf_reader.metadata: |
|
pdf_metadata["pdf_metadata"] = { |
|
"title": pdf_reader.metadata.get("/Title", ""), |
|
"author": pdf_reader.metadata.get("/Author", ""), |
|
"subject": pdf_reader.metadata.get("/Subject", ""), |
|
"creator": pdf_reader.metadata.get("/Creator", "") |
|
} |
|
|
|
return "\n".join(text_parts), pdf_metadata |
|
|
|
except Exception as e: |
|
if isinstance(e, DocumentProcessingError): |
|
raise |
|
else: |
|
raise DocumentProcessingError(f"Failed to read PDF file: {str(e)}") from e |
|
|
|
def _extract_docx_text(self, file_path: Path) -> Tuple[str, Dict[str, Any]]: |
|
"""Extract text from DOCX file.""" |
|
try: |
|
doc = DocxDocument(file_path) |
|
|
|
|
|
paragraphs = [] |
|
for paragraph in doc.paragraphs: |
|
text = paragraph.text.strip() |
|
if text: |
|
paragraphs.append(text) |
|
|
|
|
|
table_texts = [] |
|
for table in doc.tables: |
|
table_data = [] |
|
for row in table.rows: |
|
row_data = [cell.text.strip() for cell in row.cells if cell.text.strip()] |
|
if row_data: |
|
table_data.append(" | ".join(row_data)) |
|
if table_data: |
|
table_texts.append("Table:\n" + "\n".join(table_data)) |
|
|
|
all_text = "\n\n".join(paragraphs + table_texts) |
|
|
|
if not all_text.strip(): |
|
raise DocumentProcessingError("DOCX file contains no extractable text") |
|
|
|
|
|
docx_metadata = { |
|
"paragraph_count": len(paragraphs), |
|
"table_count": len(table_texts) |
|
} |
|
|
|
|
|
if hasattr(doc, "core_properties"): |
|
props = doc.core_properties |
|
docx_metadata["docx_metadata"] = { |
|
"title": props.title or "", |
|
"author": props.author or "", |
|
"subject": props.subject or "", |
|
"created": str(props.created) if props.created else "" |
|
} |
|
|
|
return all_text, docx_metadata |
|
|
|
except Exception as e: |
|
if isinstance(e, DocumentProcessingError): |
|
raise |
|
else: |
|
raise DocumentProcessingError(f"Failed to read DOCX file: {str(e)}") from e |
|
|
|
def _extract_txt_text(self, file_path: Path) -> Tuple[str, Dict[str, Any]]: |
|
"""Extract text from TXT file.""" |
|
try: |
|
|
|
encodings = ["utf-8", "utf-8-sig", "latin1", "cp1252"] |
|
|
|
text = None |
|
encoding_used = None |
|
|
|
for encoding in encodings: |
|
try: |
|
with open(file_path, "r", encoding=encoding) as file: |
|
text = file.read() |
|
encoding_used = encoding |
|
break |
|
except UnicodeDecodeError: |
|
continue |
|
|
|
if text is None: |
|
raise DocumentProcessingError("Could not decode text file with any supported encoding") |
|
|
|
if not text.strip(): |
|
raise DocumentProcessingError("Text file is empty") |
|
|
|
|
|
lines = text.split("\n") |
|
txt_metadata = { |
|
"encoding": encoding_used, |
|
"line_count": len(lines), |
|
"char_count": len(text) |
|
} |
|
|
|
return text, txt_metadata |
|
|
|
except Exception as e: |
|
if isinstance(e, DocumentProcessingError): |
|
raise |
|
else: |
|
raise DocumentProcessingError(f"Failed to read text file: {str(e)}") from e |
|
|
|
def _create_chunks(self, text: str, base_metadata: Dict[str, Any]) -> List[DocumentChunk]: |
|
"""Create overlapping chunks from text.""" |
|
|
|
text = self._clean_text(text) |
|
|
|
|
|
sentences = self._split_into_sentences(text) |
|
|
|
if not sentences: |
|
return [] |
|
|
|
chunks = [] |
|
current_chunk = [] |
|
current_length = 0 |
|
|
|
for sentence in sentences: |
|
sentence_length = len(sentence) |
|
|
|
|
|
if current_length + sentence_length > self.chunk_size and current_chunk: |
|
|
|
chunk_text = " ".join(current_chunk) |
|
if len(chunk_text) >= self.min_chunk_size: |
|
chunk_metadata = { |
|
**base_metadata, |
|
"chunk_index": len(chunks), |
|
"char_count": len(chunk_text), |
|
"sentence_count": len(current_chunk) |
|
} |
|
chunks.append(DocumentChunk(chunk_text, chunk_metadata)) |
|
|
|
|
|
if self.chunk_overlap > 0: |
|
overlap_sentences = self._get_overlap_sentences(current_chunk) |
|
current_chunk = overlap_sentences |
|
current_length = sum(len(s) for s in overlap_sentences) |
|
else: |
|
current_chunk = [] |
|
current_length = 0 |
|
|
|
|
|
current_chunk.append(sentence) |
|
current_length += sentence_length |
|
|
|
|
|
if current_chunk: |
|
chunk_text = " ".join(current_chunk) |
|
if len(chunk_text) >= self.min_chunk_size: |
|
chunk_metadata = { |
|
**base_metadata, |
|
"chunk_index": len(chunks), |
|
"char_count": len(chunk_text), |
|
"sentence_count": len(current_chunk) |
|
} |
|
chunks.append(DocumentChunk(chunk_text, chunk_metadata)) |
|
|
|
return chunks |
|
|
|
def _clean_text(self, text: str) -> str: |
|
"""Clean and normalize text.""" |
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
|
|
|
text = re.sub(r'\n--- Page \d+ ---\n', '\n', text) |
|
|
|
|
|
text = re.sub(r'([a-z])([A-Z])', r'\1 \2', text) |
|
text = re.sub(r'([.!?])([A-Z])', r'\1 \2', text) |
|
|
|
return text.strip() |
|
|
|
def _split_into_sentences(self, text: str) -> List[str]: |
|
"""Split text into sentences using simple heuristics.""" |
|
|
|
sentences = re.split(r'[.!?]+', text) |
|
|
|
|
|
cleaned_sentences = [] |
|
for sentence in sentences: |
|
sentence = sentence.strip() |
|
if len(sentence) >= 10: |
|
cleaned_sentences.append(sentence) |
|
|
|
return cleaned_sentences |
|
|
|
def _get_overlap_sentences(self, sentences: List[str]) -> List[str]: |
|
"""Get sentences for overlap based on character count.""" |
|
overlap_sentences = [] |
|
overlap_length = 0 |
|
|
|
|
|
for sentence in reversed(sentences): |
|
if overlap_length + len(sentence) <= self.chunk_overlap: |
|
overlap_sentences.insert(0, sentence) |
|
overlap_length += len(sentence) |
|
else: |
|
break |
|
|
|
return overlap_sentences |
|
|
|
def get_document_stats(self, chunks: List[DocumentChunk]) -> Dict[str, Any]: |
|
"""Get statistics about processed document.""" |
|
if not chunks: |
|
return {"chunk_count": 0, "total_chars": 0, "avg_chunk_size": 0} |
|
|
|
total_chars = sum(len(chunk.content) for chunk in chunks) |
|
|
|
return { |
|
"chunk_count": len(chunks), |
|
"total_chars": total_chars, |
|
"avg_chunk_size": total_chars / len(chunks), |
|
"min_chunk_size": min(len(chunk.content) for chunk in chunks), |
|
"max_chunk_size": max(len(chunk.content) for chunk in chunks), |
|
"source_file": chunks[0].metadata.get("filename", "unknown"), |
|
"file_type": chunks[0].metadata.get("file_type", "unknown") |
|
} |