Spaces:
Running
Running
""" | |
Document processing module for parsing and chunking various document formats. | |
""" | |
import re | |
from pathlib import Path | |
from typing import Dict, List, Optional, Tuple, Any | |
import hashlib | |
import mimetypes | |
# Document parsing imports | |
import PyPDF2 | |
from docx import Document as DocxDocument | |
from io import BytesIO | |
from .error_handler import DocumentProcessingError, validate_file_upload | |
class DocumentChunk: | |
"""Represents a chunk of processed document content.""" | |
def __init__( | |
self, | |
content: str, | |
metadata: Dict[str, Any], | |
chunk_id: str = None | |
): | |
self.content = content.strip() | |
self.metadata = metadata | |
self.chunk_id = chunk_id or self._generate_chunk_id() | |
def _generate_chunk_id(self) -> str: | |
"""Generate unique chunk ID based on content hash.""" | |
content_hash = hashlib.md5(self.content.encode()).hexdigest()[:8] | |
source = self.metadata.get("source", "unknown") | |
page = self.metadata.get("page", 0) | |
return f"{Path(source).stem}_{page}_{content_hash}" | |
def to_dict(self) -> Dict[str, Any]: | |
"""Convert chunk to dictionary representation.""" | |
return { | |
"chunk_id": self.chunk_id, | |
"content": self.content, | |
"metadata": self.metadata | |
} | |
class DocumentProcessor: | |
"""Main document processing class supporting multiple formats.""" | |
def __init__(self, config: Dict[str, Any]): | |
self.config = config | |
self.processing_config = config.get("processing", {}) | |
self.chunk_size = self.processing_config.get("chunk_size", 512) | |
self.chunk_overlap = self.processing_config.get("chunk_overlap", 50) | |
self.min_chunk_size = self.processing_config.get("min_chunk_size", 100) | |
self.max_chunks_per_doc = self.processing_config.get("max_chunks_per_doc", 1000) | |
self.supported_formats = self.processing_config.get("supported_formats", ["pdf", "docx", "txt"]) | |
def process_document( | |
self, | |
file_path: str, | |
filename: Optional[str] = None | |
) -> List[DocumentChunk]: | |
""" | |
Process a document and return list of chunks. | |
Args: | |
file_path: Path to the document file | |
filename: Optional original filename | |
Returns: | |
List of DocumentChunk objects | |
""" | |
# Validate file | |
max_size = self.config.get("app", {}).get("max_upload_size", 50) * 1024 * 1024 | |
allowed_extensions = [f".{fmt}" for fmt in self.supported_formats] | |
validate_file_upload(file_path, max_size, allowed_extensions) | |
file_path = Path(file_path) | |
filename = filename or file_path.name | |
# Detect file type and extract text | |
try: | |
text_content, metadata = self._extract_text(file_path, filename) | |
if not text_content.strip(): | |
raise DocumentProcessingError("Document appears to be empty or contains no extractable text") | |
# Create chunks | |
chunks = self._create_chunks(text_content, metadata) | |
if not chunks: | |
raise DocumentProcessingError("Failed to create any valid chunks from document") | |
if len(chunks) > self.max_chunks_per_doc: | |
raise DocumentProcessingError( | |
f"Document too large. Generated {len(chunks)} chunks, " | |
f"maximum allowed is {self.max_chunks_per_doc}" | |
) | |
return chunks | |
except Exception as e: | |
if isinstance(e, DocumentProcessingError): | |
raise | |
else: | |
raise DocumentProcessingError(f"Failed to process document: {str(e)}") from e | |
def _extract_text(self, file_path: Path, filename: str) -> Tuple[str, Dict[str, Any]]: | |
"""Extract text from document based on file type.""" | |
file_extension = file_path.suffix.lower() | |
# Base metadata | |
metadata = { | |
"source": str(file_path), | |
"filename": filename, | |
"file_type": file_extension, | |
"file_size": file_path.stat().st_size | |
} | |
if file_extension == ".pdf": | |
text, pdf_metadata = self._extract_pdf_text(file_path) | |
metadata.update(pdf_metadata) | |
elif file_extension == ".docx": | |
text, docx_metadata = self._extract_docx_text(file_path) | |
metadata.update(docx_metadata) | |
elif file_extension == ".txt": | |
text, txt_metadata = self._extract_txt_text(file_path) | |
metadata.update(txt_metadata) | |
else: | |
raise DocumentProcessingError(f"Unsupported file format: {file_extension}") | |
return text, metadata | |
def _extract_pdf_text(self, file_path: Path) -> Tuple[str, Dict[str, Any]]: | |
"""Extract text from PDF file.""" | |
try: | |
with open(file_path, "rb") as file: | |
pdf_reader = PyPDF2.PdfReader(file) | |
if len(pdf_reader.pages) == 0: | |
raise DocumentProcessingError("PDF file contains no pages") | |
text_parts = [] | |
for page_num, page in enumerate(pdf_reader.pages): | |
try: | |
page_text = page.extract_text() | |
if page_text.strip(): | |
text_parts.append(f"\n\n--- Page {page_num + 1} ---\n\n{page_text}") | |
except Exception as e: | |
# Log warning but continue with other pages | |
print(f"Warning: Could not extract text from page {page_num + 1}: {e}") | |
if not text_parts: | |
raise DocumentProcessingError("Could not extract any text from PDF") | |
# Extract metadata | |
pdf_metadata = { | |
"page_count": len(pdf_reader.pages), | |
"pdf_metadata": {} | |
} | |
if pdf_reader.metadata: | |
pdf_metadata["pdf_metadata"] = { | |
"title": pdf_reader.metadata.get("/Title", ""), | |
"author": pdf_reader.metadata.get("/Author", ""), | |
"subject": pdf_reader.metadata.get("/Subject", ""), | |
"creator": pdf_reader.metadata.get("/Creator", "") | |
} | |
return "\n".join(text_parts), pdf_metadata | |
except Exception as e: | |
if isinstance(e, DocumentProcessingError): | |
raise | |
else: | |
raise DocumentProcessingError(f"Failed to read PDF file: {str(e)}") from e | |
def _extract_docx_text(self, file_path: Path) -> Tuple[str, Dict[str, Any]]: | |
"""Extract text from DOCX file.""" | |
try: | |
doc = DocxDocument(file_path) | |
# Extract paragraphs | |
paragraphs = [] | |
for paragraph in doc.paragraphs: | |
text = paragraph.text.strip() | |
if text: | |
paragraphs.append(text) | |
# Extract tables | |
table_texts = [] | |
for table in doc.tables: | |
table_data = [] | |
for row in table.rows: | |
row_data = [cell.text.strip() for cell in row.cells if cell.text.strip()] | |
if row_data: | |
table_data.append(" | ".join(row_data)) | |
if table_data: | |
table_texts.append("Table:\n" + "\n".join(table_data)) | |
all_text = "\n\n".join(paragraphs + table_texts) | |
if not all_text.strip(): | |
raise DocumentProcessingError("DOCX file contains no extractable text") | |
# Metadata | |
docx_metadata = { | |
"paragraph_count": len(paragraphs), | |
"table_count": len(table_texts) | |
} | |
# Core properties | |
if hasattr(doc, "core_properties"): | |
props = doc.core_properties | |
docx_metadata["docx_metadata"] = { | |
"title": props.title or "", | |
"author": props.author or "", | |
"subject": props.subject or "", | |
"created": str(props.created) if props.created else "" | |
} | |
return all_text, docx_metadata | |
except Exception as e: | |
if isinstance(e, DocumentProcessingError): | |
raise | |
else: | |
raise DocumentProcessingError(f"Failed to read DOCX file: {str(e)}") from e | |
def _extract_txt_text(self, file_path: Path) -> Tuple[str, Dict[str, Any]]: | |
"""Extract text from TXT file.""" | |
try: | |
# Try different encodings | |
encodings = ["utf-8", "utf-8-sig", "latin1", "cp1252"] | |
text = None | |
encoding_used = None | |
for encoding in encodings: | |
try: | |
with open(file_path, "r", encoding=encoding) as file: | |
text = file.read() | |
encoding_used = encoding | |
break | |
except UnicodeDecodeError: | |
continue | |
if text is None: | |
raise DocumentProcessingError("Could not decode text file with any supported encoding") | |
if not text.strip(): | |
raise DocumentProcessingError("Text file is empty") | |
# Basic text statistics | |
lines = text.split("\n") | |
txt_metadata = { | |
"encoding": encoding_used, | |
"line_count": len(lines), | |
"char_count": len(text) | |
} | |
return text, txt_metadata | |
except Exception as e: | |
if isinstance(e, DocumentProcessingError): | |
raise | |
else: | |
raise DocumentProcessingError(f"Failed to read text file: {str(e)}") from e | |
def _create_chunks(self, text: str, base_metadata: Dict[str, Any]) -> List[DocumentChunk]: | |
"""Create overlapping chunks from text.""" | |
# Clean and normalize text | |
text = self._clean_text(text) | |
# Split into sentences for better chunk boundaries | |
sentences = self._split_into_sentences(text) | |
if not sentences: | |
return [] | |
chunks = [] | |
current_chunk = [] | |
current_length = 0 | |
for sentence in sentences: | |
sentence_length = len(sentence) | |
# If adding this sentence would exceed chunk size | |
if current_length + sentence_length > self.chunk_size and current_chunk: | |
# Create chunk from current sentences | |
chunk_text = " ".join(current_chunk) | |
if len(chunk_text) >= self.min_chunk_size: | |
chunk_metadata = { | |
**base_metadata, | |
"chunk_index": len(chunks), | |
"char_count": len(chunk_text), | |
"sentence_count": len(current_chunk) | |
} | |
chunks.append(DocumentChunk(chunk_text, chunk_metadata)) | |
# Start new chunk with overlap | |
if self.chunk_overlap > 0: | |
overlap_sentences = self._get_overlap_sentences(current_chunk) | |
current_chunk = overlap_sentences | |
current_length = sum(len(s) for s in overlap_sentences) | |
else: | |
current_chunk = [] | |
current_length = 0 | |
# Add current sentence | |
current_chunk.append(sentence) | |
current_length += sentence_length | |
# Create final chunk | |
if current_chunk: | |
chunk_text = " ".join(current_chunk) | |
if len(chunk_text) >= self.min_chunk_size: | |
chunk_metadata = { | |
**base_metadata, | |
"chunk_index": len(chunks), | |
"char_count": len(chunk_text), | |
"sentence_count": len(current_chunk) | |
} | |
chunks.append(DocumentChunk(chunk_text, chunk_metadata)) | |
return chunks | |
def _clean_text(self, text: str) -> str: | |
"""Clean and normalize text.""" | |
# Remove excessive whitespace | |
text = re.sub(r'\s+', ' ', text) | |
# Remove page markers (from PDF extraction) | |
text = re.sub(r'\n--- Page \d+ ---\n', '\n', text) | |
# Fix common OCR errors and formatting issues | |
text = re.sub(r'([a-z])([A-Z])', r'\1 \2', text) # Add space between camelCase | |
text = re.sub(r'([.!?])([A-Z])', r'\1 \2', text) # Add space after punctuation | |
return text.strip() | |
def _split_into_sentences(self, text: str) -> List[str]: | |
"""Split text into sentences using simple heuristics.""" | |
# Simple sentence splitting - can be enhanced with NLTK if needed | |
sentences = re.split(r'[.!?]+', text) | |
# Clean up sentences | |
cleaned_sentences = [] | |
for sentence in sentences: | |
sentence = sentence.strip() | |
if len(sentence) >= 10: # Minimum sentence length | |
cleaned_sentences.append(sentence) | |
return cleaned_sentences | |
def _get_overlap_sentences(self, sentences: List[str]) -> List[str]: | |
"""Get sentences for overlap based on character count.""" | |
overlap_sentences = [] | |
overlap_length = 0 | |
# Take sentences from the end up to the overlap size | |
for sentence in reversed(sentences): | |
if overlap_length + len(sentence) <= self.chunk_overlap: | |
overlap_sentences.insert(0, sentence) | |
overlap_length += len(sentence) | |
else: | |
break | |
return overlap_sentences | |
def get_document_stats(self, chunks: List[DocumentChunk]) -> Dict[str, Any]: | |
"""Get statistics about processed document.""" | |
if not chunks: | |
return {"chunk_count": 0, "total_chars": 0, "avg_chunk_size": 0} | |
total_chars = sum(len(chunk.content) for chunk in chunks) | |
return { | |
"chunk_count": len(chunks), | |
"total_chars": total_chars, | |
"avg_chunk_size": total_chars / len(chunks), | |
"min_chunk_size": min(len(chunk.content) for chunk in chunks), | |
"max_chunk_size": max(len(chunk.content) for chunk in chunks), | |
"source_file": chunks[0].metadata.get("filename", "unknown"), | |
"file_type": chunks[0].metadata.get("file_type", "unknown") | |
} |