#!/usr/bin/env python3 """ PDFPlumber-based Parser Advanced PDF parsing using pdfplumber for better structure detection and cleaner text extraction. Author: Arthur Passuello """ import re import pdfplumber from pathlib import Path from typing import Dict, List, Optional, Tuple, Any class PDFPlumberParser: """Advanced PDF parser using pdfplumber for structure-aware extraction.""" def __init__(self, target_chunk_size: int = 1400, min_chunk_size: int = 800, max_chunk_size: int = 2000): """Initialize PDFPlumber parser.""" self.target_chunk_size = target_chunk_size self.min_chunk_size = min_chunk_size self.max_chunk_size = max_chunk_size # Trash content patterns self.trash_patterns = [ r'Creative Commons.*?License', r'International License.*?authors', r'RISC-V International', r'Visit.*?for further', r'editors to suggest.*?corrections', r'released under.*?license', r'\.{5,}', # Long dots (TOC artifacts) r'^\d+\s*$', # Page numbers alone ] def extract_with_structure(self, pdf_path: Path) -> List[Dict]: """Extract PDF content with structure awareness using pdfplumber.""" chunks = [] with pdfplumber.open(pdf_path) as pdf: current_section = None current_text = [] for page_num, page in enumerate(pdf.pages): # Extract text with formatting info page_content = self._extract_page_content(page, page_num + 1) for element in page_content: if element['type'] == 'header': # Save previous section if exists if current_text: chunk_text = '\n\n'.join(current_text) if self._is_valid_chunk(chunk_text): chunks.extend(self._create_chunks( chunk_text, current_section or "Document", page_num )) # Start new section current_section = element['text'] current_text = [] elif element['type'] == 'content': # Add to current section if self._is_valid_content(element['text']): current_text.append(element['text']) # Don't forget last section if current_text: chunk_text = '\n\n'.join(current_text) if self._is_valid_chunk(chunk_text): chunks.extend(self._create_chunks( chunk_text, current_section or "Document", len(pdf.pages) )) return chunks def _extract_page_content(self, page: Any, page_num: int) -> List[Dict]: """Extract structured content from a page.""" content = [] # Get all text with positioning chars = page.chars if not chars: return content # Group by lines lines = [] current_line = [] current_y = None for char in sorted(chars, key=lambda x: (x['top'], x['x0'])): if current_y is None or abs(char['top'] - current_y) < 2: current_line.append(char) current_y = char['top'] else: if current_line: lines.append(current_line) current_line = [char] current_y = char['top'] if current_line: lines.append(current_line) # Analyze each line for line in lines: line_text = ''.join(char['text'] for char in line).strip() if not line_text: continue # Detect headers by font size avg_font_size = sum(char.get('size', 12) for char in line) / len(line) is_bold = any(char.get('fontname', '').lower().count('bold') > 0 for char in line) # Classify content if avg_font_size > 14 or is_bold: # Likely a header if self._is_valid_header(line_text): content.append({ 'type': 'header', 'text': line_text, 'font_size': avg_font_size, 'page': page_num }) else: # Regular content content.append({ 'type': 'content', 'text': line_text, 'font_size': avg_font_size, 'page': page_num }) return content def _is_valid_header(self, text: str) -> bool: """Check if text is a valid header.""" # Skip if too short or too long if len(text) < 3 or len(text) > 200: return False # Skip if matches trash patterns for pattern in self.trash_patterns: if re.search(pattern, text, re.IGNORECASE): return False # Valid if starts with number or capital letter if re.match(r'^(\d+\.?\d*\s+|[A-Z])', text): return True # Valid if contains keywords keywords = ['chapter', 'section', 'introduction', 'conclusion', 'appendix'] return any(keyword in text.lower() for keyword in keywords) def _is_valid_content(self, text: str) -> bool: """Check if text is valid content (not trash).""" # Skip very short text if len(text.strip()) < 10: return False # Skip trash patterns for pattern in self.trash_patterns: if re.search(pattern, text, re.IGNORECASE): return False return True def _is_valid_chunk(self, text: str) -> bool: """Check if chunk text is valid.""" # Must have minimum length if len(text.strip()) < self.min_chunk_size // 2: return False # Must have some alphabetic content alpha_chars = sum(1 for c in text if c.isalpha()) if alpha_chars < len(text) * 0.5: return False return True def _create_chunks(self, text: str, title: str, page: int) -> List[Dict]: """Create chunks from text.""" chunks = [] # Clean text text = self._clean_text(text) if len(text) <= self.max_chunk_size: # Single chunk chunks.append({ 'text': text, 'title': title, 'page': page, 'metadata': { 'parsing_method': 'pdfplumber', 'quality_score': self._calculate_quality_score(text) } }) else: # Split into chunks text_chunks = self._split_text_into_chunks(text) for i, chunk_text in enumerate(text_chunks): chunks.append({ 'text': chunk_text, 'title': f"{title} (Part {i+1})", 'page': page, 'metadata': { 'parsing_method': 'pdfplumber', 'part_number': i + 1, 'total_parts': len(text_chunks), 'quality_score': self._calculate_quality_score(chunk_text) } }) return chunks def _clean_text(self, text: str) -> str: """Clean text from artifacts.""" # Remove volume headers (e.g., "Volume I: RISC-V Unprivileged ISA V20191213") text = re.sub(r'Volume\s+[IVX]+:\s*RISC-V[^V]*V\d{8}\s*', '', text, flags=re.IGNORECASE) text = re.sub(r'^\d+\s+Volume\s+[IVX]+:.*?$', '', text, flags=re.MULTILINE) # Remove document version artifacts text = re.sub(r'Document Version \d{8}\s*', '', text, flags=re.IGNORECASE) # Remove repeated ISA headers text = re.sub(r'RISC-V.*?ISA.*?V\d{8}\s*', '', text, flags=re.IGNORECASE) text = re.sub(r'The RISC-V Instruction Set Manual\s*', '', text, flags=re.IGNORECASE) # Remove figure/table references that are standalone text = re.sub(r'^(Figure|Table)\s+\d+\.\d+:.*?$', '', text, flags=re.MULTILINE) # Remove email addresses (often in contributor lists) text = re.sub(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', '', text) # Remove URLs text = re.sub(r'https?://[^\s]+', '', text) # Remove page numbers at start/end of lines text = re.sub(r'^\d{1,3}\s+', '', text, flags=re.MULTILINE) text = re.sub(r'\s+\d{1,3}$', '', text, flags=re.MULTILINE) # Remove excessive dots (TOC artifacts) text = re.sub(r'\.{3,}', '', text) # Remove standalone numbers (often page numbers or figure numbers) text = re.sub(r'^\s*\d+\s*$', '', text, flags=re.MULTILINE) # Clean up multiple spaces and newlines text = re.sub(r'\s{3,}', ' ', text) text = re.sub(r'\n{3,}', '\n\n', text) text = re.sub(r'[ \t]+', ' ', text) # Normalize all whitespace # Remove common boilerplate phrases text = re.sub(r'Contains Nonbinding Recommendations\s*', '', text, flags=re.IGNORECASE) text = re.sub(r'Guidance for Industry and FDA Staff\s*', '', text, flags=re.IGNORECASE) return text.strip() def _split_text_into_chunks(self, text: str) -> List[str]: """Split text into chunks at sentence boundaries.""" sentences = re.split(r'(?<=[.!?])\s+', text) chunks = [] current_chunk = [] current_size = 0 for sentence in sentences: sentence_size = len(sentence) if current_size + sentence_size > self.target_chunk_size and current_chunk: chunks.append(' '.join(current_chunk)) current_chunk = [sentence] current_size = sentence_size else: current_chunk.append(sentence) current_size += sentence_size + 1 if current_chunk: chunks.append(' '.join(current_chunk)) return chunks def _calculate_quality_score(self, text: str) -> float: """Calculate quality score for chunk.""" score = 1.0 # Penalize very short or very long if len(text) < self.min_chunk_size: score *= 0.8 elif len(text) > self.max_chunk_size: score *= 0.9 # Reward complete sentences if text.strip().endswith(('.', '!', '?')): score *= 1.1 # Reward technical content technical_terms = ['risc', 'instruction', 'register', 'memory', 'processor'] term_count = sum(1 for term in technical_terms if term in text.lower()) score *= (1 + term_count * 0.05) return min(score, 1.0) def extract_with_page_coverage(self, pdf_path: Path, pymupdf_pages: List[Dict]) -> List[Dict]: """ Extract content ensuring ALL pages are covered using PyMuPDF page data. Args: pdf_path: Path to PDF file pymupdf_pages: Page data from PyMuPDF with page numbers and text Returns: List of chunks covering ALL document pages """ chunks = [] chunk_id = 0 print(f"📄 Processing {len(pymupdf_pages)} pages with PDFPlumber quality extraction...") with pdfplumber.open(str(pdf_path)) as pdf: for pymupdf_page in pymupdf_pages: page_num = pymupdf_page['page_number'] # 1-indexed from PyMuPDF page_idx = page_num - 1 # Convert to 0-indexed for PDFPlumber if page_idx < len(pdf.pages): # Extract with PDFPlumber quality from this specific page pdfplumber_page = pdf.pages[page_idx] page_text = pdfplumber_page.extract_text() if page_text and page_text.strip(): # Clean and chunk the page text cleaned_text = self._clean_text(page_text) if len(cleaned_text) >= 100: # Minimum meaningful content # Create chunks from this page page_chunks = self._create_page_chunks( cleaned_text, page_num, chunk_id ) chunks.extend(page_chunks) chunk_id += len(page_chunks) if len(chunks) % 50 == 0: # Progress indicator print(f" Processed {page_num} pages, created {len(chunks)} chunks") print(f"✅ Full coverage: {len(chunks)} chunks from {len(pymupdf_pages)} pages") return chunks def _create_page_chunks(self, page_text: str, page_num: int, start_chunk_id: int) -> List[Dict]: """Create properly sized chunks from a single page's content.""" # Clean and validate page text first cleaned_text = self._ensure_complete_sentences(page_text) if not cleaned_text or len(cleaned_text) < 50: # Skip pages with insufficient content return [] if len(cleaned_text) <= self.max_chunk_size: # Single chunk for small pages return [{ 'text': cleaned_text, 'title': f"Page {page_num}", 'page': page_num, 'metadata': { 'parsing_method': 'pdfplumber_page_coverage', 'quality_score': self._calculate_quality_score(cleaned_text), 'full_page_coverage': True } }] else: # Split large pages into chunks with sentence boundaries text_chunks = self._split_text_into_chunks(cleaned_text) page_chunks = [] for i, chunk_text in enumerate(text_chunks): # Ensure each chunk is complete complete_chunk = self._ensure_complete_sentences(chunk_text) if complete_chunk and len(complete_chunk) >= 100: page_chunks.append({ 'text': complete_chunk, 'title': f"Page {page_num} (Part {i+1})", 'page': page_num, 'metadata': { 'parsing_method': 'pdfplumber_page_coverage', 'part_number': i + 1, 'total_parts': len(text_chunks), 'quality_score': self._calculate_quality_score(complete_chunk), 'full_page_coverage': True } }) return page_chunks def _ensure_complete_sentences(self, text: str) -> str: """Ensure text contains only complete sentences.""" text = text.strip() if not text: return "" # Find last complete sentence last_sentence_end = -1 for i, char in enumerate(reversed(text)): if char in '.!?:': last_sentence_end = len(text) - i break if last_sentence_end > 0: # Return text up to last complete sentence complete_text = text[:last_sentence_end].strip() # Ensure it starts properly (capital letter or common starters) if complete_text and (complete_text[0].isupper() or complete_text.startswith(('The ', 'A ', 'An ', 'This ', 'RISC'))): return complete_text # If no complete sentences found, return empty return "" def parse_document(self, pdf_path: Path, pdf_data: Dict[str, Any] = None) -> List[Dict]: """ Parse document using PDFPlumber (required by HybridParser). Args: pdf_path: Path to PDF file pdf_data: PyMuPDF page data to ensure full page coverage Returns: List of chunks with structure preservation across ALL pages """ if pdf_data and 'pages' in pdf_data: # Use PyMuPDF page data to ensure full coverage return self.extract_with_page_coverage(pdf_path, pdf_data['pages']) else: # Fallback to structure-based extraction return self.extract_with_structure(pdf_path) def parse_pdf_with_pdfplumber(pdf_path: Path, **kwargs) -> List[Dict]: """Main entry point for PDFPlumber parsing.""" parser = PDFPlumberParser(**kwargs) chunks = parser.extract_with_structure(pdf_path) print(f"PDFPlumber extracted {len(chunks)} chunks") return chunks