""" Document processing module for parsing and chunking various document formats. """ import re from pathlib import Path from typing import Dict, List, Optional, Tuple, Any import hashlib import mimetypes # Document parsing imports import PyPDF2 from docx import Document as DocxDocument from io import BytesIO from .error_handler import DocumentProcessingError, validate_file_upload class DocumentChunk: """Represents a chunk of processed document content.""" def __init__( self, content: str, metadata: Dict[str, Any], chunk_id: str = None ): self.content = content.strip() self.metadata = metadata self.chunk_id = chunk_id or self._generate_chunk_id() def _generate_chunk_id(self) -> str: """Generate unique chunk ID based on content hash.""" content_hash = hashlib.md5(self.content.encode()).hexdigest()[:8] source = self.metadata.get("source", "unknown") page = self.metadata.get("page", 0) return f"{Path(source).stem}_{page}_{content_hash}" def to_dict(self) -> Dict[str, Any]: """Convert chunk to dictionary representation.""" return { "chunk_id": self.chunk_id, "content": self.content, "metadata": self.metadata } class DocumentProcessor: """Main document processing class supporting multiple formats.""" def __init__(self, config: Dict[str, Any]): self.config = config self.processing_config = config.get("processing", {}) self.chunk_size = self.processing_config.get("chunk_size", 512) self.chunk_overlap = self.processing_config.get("chunk_overlap", 50) self.min_chunk_size = self.processing_config.get("min_chunk_size", 100) self.max_chunks_per_doc = self.processing_config.get("max_chunks_per_doc", 1000) self.supported_formats = self.processing_config.get("supported_formats", ["pdf", "docx", "txt"]) def process_document( self, file_path: str, filename: Optional[str] = None ) -> List[DocumentChunk]: """ Process a document and return list of chunks. Args: file_path: Path to the document file filename: Optional original filename Returns: List of DocumentChunk objects """ # Validate file max_size = self.config.get("app", {}).get("max_upload_size", 50) * 1024 * 1024 allowed_extensions = [f".{fmt}" for fmt in self.supported_formats] validate_file_upload(file_path, max_size, allowed_extensions) file_path = Path(file_path) filename = filename or file_path.name # Detect file type and extract text try: text_content, metadata = self._extract_text(file_path, filename) if not text_content.strip(): raise DocumentProcessingError("Document appears to be empty or contains no extractable text") # Create chunks chunks = self._create_chunks(text_content, metadata) if not chunks: raise DocumentProcessingError("Failed to create any valid chunks from document") if len(chunks) > self.max_chunks_per_doc: raise DocumentProcessingError( f"Document too large. Generated {len(chunks)} chunks, " f"maximum allowed is {self.max_chunks_per_doc}" ) return chunks except Exception as e: if isinstance(e, DocumentProcessingError): raise else: raise DocumentProcessingError(f"Failed to process document: {str(e)}") from e def _extract_text(self, file_path: Path, filename: str) -> Tuple[str, Dict[str, Any]]: """Extract text from document based on file type.""" file_extension = file_path.suffix.lower() # Base metadata metadata = { "source": str(file_path), "filename": filename, "file_type": file_extension, "file_size": file_path.stat().st_size } if file_extension == ".pdf": text, pdf_metadata = self._extract_pdf_text(file_path) metadata.update(pdf_metadata) elif file_extension == ".docx": text, docx_metadata = self._extract_docx_text(file_path) metadata.update(docx_metadata) elif file_extension == ".txt": text, txt_metadata = self._extract_txt_text(file_path) metadata.update(txt_metadata) else: raise DocumentProcessingError(f"Unsupported file format: {file_extension}") return text, metadata def _extract_pdf_text(self, file_path: Path) -> Tuple[str, Dict[str, Any]]: """Extract text from PDF file.""" try: with open(file_path, "rb") as file: pdf_reader = PyPDF2.PdfReader(file) if len(pdf_reader.pages) == 0: raise DocumentProcessingError("PDF file contains no pages") text_parts = [] for page_num, page in enumerate(pdf_reader.pages): try: page_text = page.extract_text() if page_text.strip(): text_parts.append(f"\n\n--- Page {page_num + 1} ---\n\n{page_text}") except Exception as e: # Log warning but continue with other pages print(f"Warning: Could not extract text from page {page_num + 1}: {e}") if not text_parts: raise DocumentProcessingError("Could not extract any text from PDF") # Extract metadata pdf_metadata = { "page_count": len(pdf_reader.pages), "pdf_metadata": {} } if pdf_reader.metadata: pdf_metadata["pdf_metadata"] = { "title": pdf_reader.metadata.get("/Title", ""), "author": pdf_reader.metadata.get("/Author", ""), "subject": pdf_reader.metadata.get("/Subject", ""), "creator": pdf_reader.metadata.get("/Creator", "") } return "\n".join(text_parts), pdf_metadata except Exception as e: if isinstance(e, DocumentProcessingError): raise else: raise DocumentProcessingError(f"Failed to read PDF file: {str(e)}") from e def _extract_docx_text(self, file_path: Path) -> Tuple[str, Dict[str, Any]]: """Extract text from DOCX file.""" try: doc = DocxDocument(file_path) # Extract paragraphs paragraphs = [] for paragraph in doc.paragraphs: text = paragraph.text.strip() if text: paragraphs.append(text) # Extract tables table_texts = [] for table in doc.tables: table_data = [] for row in table.rows: row_data = [cell.text.strip() for cell in row.cells if cell.text.strip()] if row_data: table_data.append(" | ".join(row_data)) if table_data: table_texts.append("Table:\n" + "\n".join(table_data)) all_text = "\n\n".join(paragraphs + table_texts) if not all_text.strip(): raise DocumentProcessingError("DOCX file contains no extractable text") # Metadata docx_metadata = { "paragraph_count": len(paragraphs), "table_count": len(table_texts) } # Core properties if hasattr(doc, "core_properties"): props = doc.core_properties docx_metadata["docx_metadata"] = { "title": props.title or "", "author": props.author or "", "subject": props.subject or "", "created": str(props.created) if props.created else "" } return all_text, docx_metadata except Exception as e: if isinstance(e, DocumentProcessingError): raise else: raise DocumentProcessingError(f"Failed to read DOCX file: {str(e)}") from e def _extract_txt_text(self, file_path: Path) -> Tuple[str, Dict[str, Any]]: """Extract text from TXT file.""" try: # Try different encodings encodings = ["utf-8", "utf-8-sig", "latin1", "cp1252"] text = None encoding_used = None for encoding in encodings: try: with open(file_path, "r", encoding=encoding) as file: text = file.read() encoding_used = encoding break except UnicodeDecodeError: continue if text is None: raise DocumentProcessingError("Could not decode text file with any supported encoding") if not text.strip(): raise DocumentProcessingError("Text file is empty") # Basic text statistics lines = text.split("\n") txt_metadata = { "encoding": encoding_used, "line_count": len(lines), "char_count": len(text) } return text, txt_metadata except Exception as e: if isinstance(e, DocumentProcessingError): raise else: raise DocumentProcessingError(f"Failed to read text file: {str(e)}") from e def _create_chunks(self, text: str, base_metadata: Dict[str, Any]) -> List[DocumentChunk]: """Create overlapping chunks from text.""" # Clean and normalize text text = self._clean_text(text) # Split into sentences for better chunk boundaries sentences = self._split_into_sentences(text) if not sentences: return [] chunks = [] current_chunk = [] current_length = 0 for sentence in sentences: sentence_length = len(sentence) # If adding this sentence would exceed chunk size if current_length + sentence_length > self.chunk_size and current_chunk: # Create chunk from current sentences chunk_text = " ".join(current_chunk) if len(chunk_text) >= self.min_chunk_size: chunk_metadata = { **base_metadata, "chunk_index": len(chunks), "char_count": len(chunk_text), "sentence_count": len(current_chunk) } chunks.append(DocumentChunk(chunk_text, chunk_metadata)) # Start new chunk with overlap if self.chunk_overlap > 0: overlap_sentences = self._get_overlap_sentences(current_chunk) current_chunk = overlap_sentences current_length = sum(len(s) for s in overlap_sentences) else: current_chunk = [] current_length = 0 # Add current sentence current_chunk.append(sentence) current_length += sentence_length # Create final chunk if current_chunk: chunk_text = " ".join(current_chunk) if len(chunk_text) >= self.min_chunk_size: chunk_metadata = { **base_metadata, "chunk_index": len(chunks), "char_count": len(chunk_text), "sentence_count": len(current_chunk) } chunks.append(DocumentChunk(chunk_text, chunk_metadata)) return chunks def _clean_text(self, text: str) -> str: """Clean and normalize text.""" # Remove excessive whitespace text = re.sub(r'\s+', ' ', text) # Remove page markers (from PDF extraction) text = re.sub(r'\n--- Page \d+ ---\n', '\n', text) # Fix common OCR errors and formatting issues text = re.sub(r'([a-z])([A-Z])', r'\1 \2', text) # Add space between camelCase text = re.sub(r'([.!?])([A-Z])', r'\1 \2', text) # Add space after punctuation return text.strip() def _split_into_sentences(self, text: str) -> List[str]: """Split text into sentences using simple heuristics.""" # Simple sentence splitting - can be enhanced with NLTK if needed sentences = re.split(r'[.!?]+', text) # Clean up sentences cleaned_sentences = [] for sentence in sentences: sentence = sentence.strip() if len(sentence) >= 10: # Minimum sentence length cleaned_sentences.append(sentence) return cleaned_sentences def _get_overlap_sentences(self, sentences: List[str]) -> List[str]: """Get sentences for overlap based on character count.""" overlap_sentences = [] overlap_length = 0 # Take sentences from the end up to the overlap size for sentence in reversed(sentences): if overlap_length + len(sentence) <= self.chunk_overlap: overlap_sentences.insert(0, sentence) overlap_length += len(sentence) else: break return overlap_sentences def get_document_stats(self, chunks: List[DocumentChunk]) -> Dict[str, Any]: """Get statistics about processed document.""" if not chunks: return {"chunk_count": 0, "total_chars": 0, "avg_chunk_size": 0} total_chars = sum(len(chunk.content) for chunk in chunks) return { "chunk_count": len(chunks), "total_chars": total_chars, "avg_chunk_size": total_chars / len(chunks), "min_chunk_size": min(len(chunk.content) for chunk in chunks), "max_chunk_size": max(len(chunk.content) for chunk in chunks), "source_file": chunks[0].metadata.get("filename", "unknown"), "file_type": chunks[0].metadata.get("file_type", "unknown") }