""" PDF Processor Component Processes PDF files to extract text and metadata """ import os import re import warnings from typing import List, Dict, Optional, Any from datetime import datetime from pathlib import Path # PDF processing libraries import pypdf try: import pdfplumber import fitz # PyMuPDF PDF_ENHANCED = True except ImportError: PDF_ENHANCED = False warnings.filterwarnings('ignore') class PDFProcessor: """ Processes PDF files to extract text, metadata, and structure Supports multiple PDF processing libraries for better compatibility """ def __init__(self, config=None): # Import Config only when needed to avoid dependency issues if config is None: try: from .config import Config self.config = Config() except ImportError: # Fallback to None if Config cannot be imported self.config = None else: self.config = config self.supported_formats = ['.pdf'] # Check available libraries self.libraries = { 'pypdf': True, 'pdfplumber': PDF_ENHANCED, 'PyMuPDF': PDF_ENHANCED } print(f"PDF Processor initialized with libraries: {[k for k, v in self.libraries.items() if v]}") def extract_text_from_file(self, file_path: str, method: str = 'auto') -> Dict[str, Any]: """ Extract text from PDF file Args: file_path: Path to PDF file method: Extraction method ('auto', 'pypdf', 'pdfplumber', 'pymupdf') Returns: Dictionary with extracted text and metadata """ if not os.path.exists(file_path): return {'error': f"File not found: {file_path}"} if not file_path.lower().endswith('.pdf'): return {'error': f"Not a PDF file: {file_path}"} try: print(f"Processing PDF: {os.path.basename(file_path)}") # Try different methods based on preference if method == 'auto': # Try methods in order of preference methods = ['pdfplumber', 'pymupdf', 'pypdf'] for m in methods: if self.libraries.get(m.replace('pymupdf', 'PyMuPDF').replace('pdfplumber', 'pdfplumber').replace('pypdf', 'pypdf')): result = self._extract_with_method(file_path, m) if result and not result.get('error'): return result # If all methods fail, return error return {'error': 'All extraction methods failed'} else: return self._extract_with_method(file_path, method) except Exception as e: return {'error': f"Error processing PDF: {str(e)}"} def _extract_with_method(self, file_path: str, method: str) -> Dict[str, Any]: """ Extract text using a specific method Args: file_path: Path to PDF file method: Extraction method Returns: Dictionary with extracted text and metadata """ try: if method == 'pdfplumber' and self.libraries['pdfplumber']: return self._extract_with_pdfplumber(file_path) elif method == 'pymupdf' and self.libraries['PyMuPDF']: return self._extract_with_pymupdf(file_path) elif method == 'pypdf' and self.libraries['pypdf']: return self._extract_with_pypdf(file_path) else: return {'error': f"Method {method} not available"} except Exception as e: return {'error': f"Error with method {method}: {str(e)}"} def _extract_with_pdfplumber(self, file_path: str) -> Dict[str, Any]: """Extract text using pdfplumber (best for tables and layout)""" import pdfplumber text_content = [] metadata = { 'method': 'pdfplumber', 'pages': 0, 'tables': 0, 'images': 0 } with pdfplumber.open(file_path) as pdf: metadata['pages'] = len(pdf.pages) for page_num, page in enumerate(pdf.pages): # Extract text page_text = page.extract_text() if page_text: text_content.append(f"--- Page {page_num + 1} ---\n{page_text}") # Count tables tables = page.extract_tables() if tables: metadata['tables'] += len(tables) # Add table content for table in tables: table_text = self._format_table(table) text_content.append(f"--- Table on Page {page_num + 1} ---\n{table_text}") # Count images if hasattr(page, 'images'): metadata['images'] += len(page.images) full_text = '\n\n'.join(text_content) return { 'text': full_text, 'metadata': metadata, 'word_count': len(full_text.split()), 'char_count': len(full_text), 'extracted_at': datetime.now().isoformat(), 'file_path': file_path } def _extract_with_pymupdf(self, file_path: str) -> Dict[str, Any]: """Extract text using PyMuPDF (fast and accurate)""" import fitz doc = fitz.open(file_path) text_content = [] metadata = { 'method': 'pymupdf', 'pages': len(doc), 'images': 0, 'links': 0 } for page_num in range(len(doc)): page = doc[page_num] # Extract text page_text = page.get_text() if page_text.strip(): text_content.append(f"--- Page {page_num + 1} ---\n{page_text}") # Count images images = page.get_images() metadata['images'] += len(images) # Count links links = page.get_links() metadata['links'] += len(links) doc.close() full_text = '\n\n'.join(text_content) return { 'text': full_text, 'metadata': metadata, 'word_count': len(full_text.split()), 'char_count': len(full_text), 'extracted_at': datetime.now().isoformat(), 'file_path': file_path } def _extract_with_pypdf(self, file_path: str) -> Dict[str, Any]: """Extract text using pypdf (basic but reliable)""" text_content = [] metadata = { 'method': 'pypdf', 'pages': 0 } with open(file_path, 'rb') as file: pdf_reader = pypdf.PdfReader(file) metadata['pages'] = len(pdf_reader.pages) for page_num, page in enumerate(pdf_reader.pages): page_text = page.extract_text() if page_text.strip(): text_content.append(f"--- Page {page_num + 1} ---\n{page_text}") full_text = '\n\n'.join(text_content) return { 'text': full_text, 'metadata': metadata, 'word_count': len(full_text.split()), 'char_count': len(full_text), 'extracted_at': datetime.now().isoformat(), 'file_path': file_path } def _format_table(self, table: List[List[str]]) -> str: """Format a table for text output""" if not table: return "" formatted_rows = [] for row in table: if row: # Skip empty rows formatted_row = ' | '.join(str(cell) if cell else '' for cell in row) formatted_rows.append(formatted_row) return '\n'.join(formatted_rows) def extract_text_from_bytes(self, pdf_bytes: bytes, filename: str = "uploaded.pdf") -> Dict[str, Any]: """ Extract text from PDF bytes (for uploaded files) Args: pdf_bytes: PDF file bytes filename: Original filename Returns: Dictionary with extracted text and metadata """ try: # Save bytes to temporary file import tempfile with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: tmp_file.write(pdf_bytes) tmp_path = tmp_file.name # Extract text result = self.extract_text_from_file(tmp_path) # Clean up os.unlink(tmp_path) # Update metadata if 'metadata' in result: result['metadata']['original_filename'] = filename result['metadata']['file_size'] = len(pdf_bytes) return result except Exception as e: return {'error': f"Error processing PDF bytes: {str(e)}"} def validate_pdf(self, file_path: str) -> Dict[str, Any]: """ Validate PDF file Args: file_path: Path to PDF file Returns: Validation result """ try: if not os.path.exists(file_path): return {'valid': False, 'error': 'File not found'} if not file_path.lower().endswith('.pdf'): return {'valid': False, 'error': 'Not a PDF file'} # Try to open with pypdf with open(file_path, 'rb') as file: pdf_reader = pypdf.PdfReader(file) page_count = len(pdf_reader.pages) # Check if encrypted is_encrypted = pdf_reader.is_encrypted # Get file size file_size = os.path.getsize(file_path) return { 'valid': True, 'pages': page_count, 'encrypted': is_encrypted, 'file_size': file_size, 'file_path': file_path } except Exception as e: return {'valid': False, 'error': str(e)} def get_pdf_metadata(self, file_path: str) -> Dict[str, Any]: """ Extract metadata from PDF Args: file_path: Path to PDF file Returns: PDF metadata """ try: metadata = {} # Try pypdf first try: with open(file_path, 'rb') as file: pdf_reader = pypdf.PdfReader(file) if pdf_reader.metadata: metadata.update({ 'title': pdf_reader.metadata.get('/Title', ''), 'author': pdf_reader.metadata.get('/Author', ''), 'subject': pdf_reader.metadata.get('/Subject', ''), 'creator': pdf_reader.metadata.get('/Creator', ''), 'producer': pdf_reader.metadata.get('/Producer', ''), 'creation_date': pdf_reader.metadata.get('/CreationDate', ''), 'modification_date': pdf_reader.metadata.get('/ModDate', '') }) except Exception: pass # Try PyMuPDF for additional metadata if self.libraries['PyMuPDF']: try: import fitz doc = fitz.open(file_path) doc_metadata = doc.metadata doc.close() if doc_metadata: metadata.update({ 'format': doc_metadata.get('format', ''), 'encryption': doc_metadata.get('encryption', ''), 'keywords': doc_metadata.get('keywords', '') }) except Exception: pass # Add file system metadata stat = os.stat(file_path) metadata.update({ 'file_size': stat.st_size, 'created': datetime.fromtimestamp(stat.st_ctime).isoformat(), 'modified': datetime.fromtimestamp(stat.st_mtime).isoformat(), 'accessed': datetime.fromtimestamp(stat.st_atime).isoformat() }) return metadata except Exception as e: return {'error': f"Error extracting metadata: {str(e)}"} def split_pdf_text(self, text: str, chunk_size: int = None, chunk_overlap: int = None) -> List[str]: """ Split PDF text into chunks for processing Args: text: Extracted text chunk_size: Size of each chunk chunk_overlap: Overlap between chunks Returns: List of text chunks """ # Use provided values or defaults if config is None if chunk_size is None: chunk_size = self.config.CHUNK_SIZE if self.config else 1000 if chunk_overlap is None: chunk_overlap = self.config.CHUNK_OVERLAP if self.config else 200 if len(text) <= chunk_size: return [text] chunks = [] start = 0 while start < len(text): end = start + chunk_size # Try to break at sentence boundary if end < len(text): # Look for sentence ending sentence_end = text.rfind('.', start, end) if sentence_end > start: end = sentence_end + 1 else: # Look for paragraph break para_end = text.rfind('\n\n', start, end) if para_end > start: end = para_end + 2 else: # Look for any line break line_end = text.rfind('\n', start, end) if line_end > start: end = line_end + 1 chunk = text[start:end].strip() if chunk: chunks.append(chunk) start = end - chunk_overlap return chunks def clean_text(self, text: str) -> str: """ Clean extracted text Args: text: Raw extracted text Returns: Cleaned text """ if not text: return "" # Remove extra whitespace text = re.sub(r'\s+', ' ', text) # Remove page headers/footers (basic) text = re.sub(r'Page \d+', '', text) # Remove email addresses (optional) text = re.sub(r'\S+@\S+', '', text) # Remove URLs (optional) text = re.sub(r'https?://\S+', '', text) # Fix common OCR errors text = text.replace('fi', 'fi') text = text.replace('fl', 'fl') text = text.replace('ff', 'ff') text = text.replace('ffi', 'ffi') text = text.replace('ffl', 'ffl') return text.strip() def get_processing_stats(self) -> Dict[str, Any]: """ Get PDF processing statistics Returns: Processing statistics """ return { 'available_libraries': self.libraries, 'supported_formats': self.supported_formats, 'enhanced_features': PDF_ENHANCED, 'config': { 'chunk_size': self.config.CHUNK_SIZE if self.config else 1000, 'chunk_overlap': self.config.CHUNK_OVERLAP if self.config else 200 } }