Arthur Passuello
Added missing sources
b5246f1
#!/usr/bin/env python3
"""
PDFPlumber-based Parser
Advanced PDF parsing using pdfplumber for better structure detection
and cleaner text extraction.
Author: Arthur Passuello
"""
import re
import pdfplumber
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any
class PDFPlumberParser:
"""Advanced PDF parser using pdfplumber for structure-aware extraction."""
def __init__(self, target_chunk_size: int = 1400, min_chunk_size: int = 800,
max_chunk_size: int = 2000):
"""Initialize PDFPlumber parser."""
self.target_chunk_size = target_chunk_size
self.min_chunk_size = min_chunk_size
self.max_chunk_size = max_chunk_size
# Trash content patterns
self.trash_patterns = [
r'Creative Commons.*?License',
r'International License.*?authors',
r'RISC-V International',
r'Visit.*?for further',
r'editors to suggest.*?corrections',
r'released under.*?license',
r'\.{5,}', # Long dots (TOC artifacts)
r'^\d+\s*$', # Page numbers alone
]
def extract_with_structure(self, pdf_path: Path) -> List[Dict]:
"""Extract PDF content with structure awareness using pdfplumber."""
chunks = []
with pdfplumber.open(pdf_path) as pdf:
current_section = None
current_text = []
for page_num, page in enumerate(pdf.pages):
# Extract text with formatting info
page_content = self._extract_page_content(page, page_num + 1)
for element in page_content:
if element['type'] == 'header':
# Save previous section if exists
if current_text:
chunk_text = '\n\n'.join(current_text)
if self._is_valid_chunk(chunk_text):
chunks.extend(self._create_chunks(
chunk_text,
current_section or "Document",
page_num
))
# Start new section
current_section = element['text']
current_text = []
elif element['type'] == 'content':
# Add to current section
if self._is_valid_content(element['text']):
current_text.append(element['text'])
# Don't forget last section
if current_text:
chunk_text = '\n\n'.join(current_text)
if self._is_valid_chunk(chunk_text):
chunks.extend(self._create_chunks(
chunk_text,
current_section or "Document",
len(pdf.pages)
))
return chunks
def _extract_page_content(self, page: Any, page_num: int) -> List[Dict]:
"""Extract structured content from a page."""
content = []
# Get all text with positioning
chars = page.chars
if not chars:
return content
# Group by lines
lines = []
current_line = []
current_y = None
for char in sorted(chars, key=lambda x: (x['top'], x['x0'])):
if current_y is None or abs(char['top'] - current_y) < 2:
current_line.append(char)
current_y = char['top']
else:
if current_line:
lines.append(current_line)
current_line = [char]
current_y = char['top']
if current_line:
lines.append(current_line)
# Analyze each line
for line in lines:
line_text = ''.join(char['text'] for char in line).strip()
if not line_text:
continue
# Detect headers by font size
avg_font_size = sum(char.get('size', 12) for char in line) / len(line)
is_bold = any(char.get('fontname', '').lower().count('bold') > 0 for char in line)
# Classify content
if avg_font_size > 14 or is_bold:
# Likely a header
if self._is_valid_header(line_text):
content.append({
'type': 'header',
'text': line_text,
'font_size': avg_font_size,
'page': page_num
})
else:
# Regular content
content.append({
'type': 'content',
'text': line_text,
'font_size': avg_font_size,
'page': page_num
})
return content
def _is_valid_header(self, text: str) -> bool:
"""Check if text is a valid header."""
# Skip if too short or too long
if len(text) < 3 or len(text) > 200:
return False
# Skip if matches trash patterns
for pattern in self.trash_patterns:
if re.search(pattern, text, re.IGNORECASE):
return False
# Valid if starts with number or capital letter
if re.match(r'^(\d+\.?\d*\s+|[A-Z])', text):
return True
# Valid if contains keywords
keywords = ['chapter', 'section', 'introduction', 'conclusion', 'appendix']
return any(keyword in text.lower() for keyword in keywords)
def _is_valid_content(self, text: str) -> bool:
"""Check if text is valid content (not trash)."""
# Skip very short text
if len(text.strip()) < 10:
return False
# Skip trash patterns
for pattern in self.trash_patterns:
if re.search(pattern, text, re.IGNORECASE):
return False
return True
def _is_valid_chunk(self, text: str) -> bool:
"""Check if chunk text is valid."""
# Must have minimum length
if len(text.strip()) < self.min_chunk_size // 2:
return False
# Must have some alphabetic content
alpha_chars = sum(1 for c in text if c.isalpha())
if alpha_chars < len(text) * 0.5:
return False
return True
def _create_chunks(self, text: str, title: str, page: int) -> List[Dict]:
"""Create chunks from text."""
chunks = []
# Clean text
text = self._clean_text(text)
if len(text) <= self.max_chunk_size:
# Single chunk
chunks.append({
'text': text,
'title': title,
'page': page,
'metadata': {
'parsing_method': 'pdfplumber',
'quality_score': self._calculate_quality_score(text)
}
})
else:
# Split into chunks
text_chunks = self._split_text_into_chunks(text)
for i, chunk_text in enumerate(text_chunks):
chunks.append({
'text': chunk_text,
'title': f"{title} (Part {i+1})",
'page': page,
'metadata': {
'parsing_method': 'pdfplumber',
'part_number': i + 1,
'total_parts': len(text_chunks),
'quality_score': self._calculate_quality_score(chunk_text)
}
})
return chunks
def _clean_text(self, text: str) -> str:
"""Clean text from artifacts."""
# Remove volume headers (e.g., "Volume I: RISC-V Unprivileged ISA V20191213")
text = re.sub(r'Volume\s+[IVX]+:\s*RISC-V[^V]*V\d{8}\s*', '', text, flags=re.IGNORECASE)
text = re.sub(r'^\d+\s+Volume\s+[IVX]+:.*?$', '', text, flags=re.MULTILINE)
# Remove document version artifacts
text = re.sub(r'Document Version \d{8}\s*', '', text, flags=re.IGNORECASE)
# Remove repeated ISA headers
text = re.sub(r'RISC-V.*?ISA.*?V\d{8}\s*', '', text, flags=re.IGNORECASE)
text = re.sub(r'The RISC-V Instruction Set Manual\s*', '', text, flags=re.IGNORECASE)
# Remove figure/table references that are standalone
text = re.sub(r'^(Figure|Table)\s+\d+\.\d+:.*?$', '', text, flags=re.MULTILINE)
# Remove email addresses (often in contributor lists)
text = re.sub(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', '', text)
# Remove URLs
text = re.sub(r'https?://[^\s]+', '', text)
# Remove page numbers at start/end of lines
text = re.sub(r'^\d{1,3}\s+', '', text, flags=re.MULTILINE)
text = re.sub(r'\s+\d{1,3}$', '', text, flags=re.MULTILINE)
# Remove excessive dots (TOC artifacts)
text = re.sub(r'\.{3,}', '', text)
# Remove standalone numbers (often page numbers or figure numbers)
text = re.sub(r'^\s*\d+\s*$', '', text, flags=re.MULTILINE)
# Clean up multiple spaces and newlines
text = re.sub(r'\s{3,}', ' ', text)
text = re.sub(r'\n{3,}', '\n\n', text)
text = re.sub(r'[ \t]+', ' ', text) # Normalize all whitespace
# Remove common boilerplate phrases
text = re.sub(r'Contains Nonbinding Recommendations\s*', '', text, flags=re.IGNORECASE)
text = re.sub(r'Guidance for Industry and FDA Staff\s*', '', text, flags=re.IGNORECASE)
return text.strip()
def _split_text_into_chunks(self, text: str) -> List[str]:
"""Split text into chunks at sentence boundaries."""
sentences = re.split(r'(?<=[.!?])\s+', text)
chunks = []
current_chunk = []
current_size = 0
for sentence in sentences:
sentence_size = len(sentence)
if current_size + sentence_size > self.target_chunk_size and current_chunk:
chunks.append(' '.join(current_chunk))
current_chunk = [sentence]
current_size = sentence_size
else:
current_chunk.append(sentence)
current_size += sentence_size + 1
if current_chunk:
chunks.append(' '.join(current_chunk))
return chunks
def _calculate_quality_score(self, text: str) -> float:
"""Calculate quality score for chunk."""
score = 1.0
# Penalize very short or very long
if len(text) < self.min_chunk_size:
score *= 0.8
elif len(text) > self.max_chunk_size:
score *= 0.9
# Reward complete sentences
if text.strip().endswith(('.', '!', '?')):
score *= 1.1
# Reward technical content
technical_terms = ['risc', 'instruction', 'register', 'memory', 'processor']
term_count = sum(1 for term in technical_terms if term in text.lower())
score *= (1 + term_count * 0.05)
return min(score, 1.0)
def extract_with_page_coverage(self, pdf_path: Path, pymupdf_pages: List[Dict]) -> List[Dict]:
"""
Extract content ensuring ALL pages are covered using PyMuPDF page data.
Args:
pdf_path: Path to PDF file
pymupdf_pages: Page data from PyMuPDF with page numbers and text
Returns:
List of chunks covering ALL document pages
"""
chunks = []
chunk_id = 0
print(f"📄 Processing {len(pymupdf_pages)} pages with PDFPlumber quality extraction...")
with pdfplumber.open(str(pdf_path)) as pdf:
for pymupdf_page in pymupdf_pages:
page_num = pymupdf_page['page_number'] # 1-indexed from PyMuPDF
page_idx = page_num - 1 # Convert to 0-indexed for PDFPlumber
if page_idx < len(pdf.pages):
# Extract with PDFPlumber quality from this specific page
pdfplumber_page = pdf.pages[page_idx]
page_text = pdfplumber_page.extract_text()
if page_text and page_text.strip():
# Clean and chunk the page text
cleaned_text = self._clean_text(page_text)
if len(cleaned_text) >= 100: # Minimum meaningful content
# Create chunks from this page
page_chunks = self._create_page_chunks(
cleaned_text, page_num, chunk_id
)
chunks.extend(page_chunks)
chunk_id += len(page_chunks)
if len(chunks) % 50 == 0: # Progress indicator
print(f" Processed {page_num} pages, created {len(chunks)} chunks")
print(f"✅ Full coverage: {len(chunks)} chunks from {len(pymupdf_pages)} pages")
return chunks
def _create_page_chunks(self, page_text: str, page_num: int, start_chunk_id: int) -> List[Dict]:
"""Create properly sized chunks from a single page's content."""
# Clean and validate page text first
cleaned_text = self._ensure_complete_sentences(page_text)
if not cleaned_text or len(cleaned_text) < 50:
# Skip pages with insufficient content
return []
if len(cleaned_text) <= self.max_chunk_size:
# Single chunk for small pages
return [{
'text': cleaned_text,
'title': f"Page {page_num}",
'page': page_num,
'metadata': {
'parsing_method': 'pdfplumber_page_coverage',
'quality_score': self._calculate_quality_score(cleaned_text),
'full_page_coverage': True
}
}]
else:
# Split large pages into chunks with sentence boundaries
text_chunks = self._split_text_into_chunks(cleaned_text)
page_chunks = []
for i, chunk_text in enumerate(text_chunks):
# Ensure each chunk is complete
complete_chunk = self._ensure_complete_sentences(chunk_text)
if complete_chunk and len(complete_chunk) >= 100:
page_chunks.append({
'text': complete_chunk,
'title': f"Page {page_num} (Part {i+1})",
'page': page_num,
'metadata': {
'parsing_method': 'pdfplumber_page_coverage',
'part_number': i + 1,
'total_parts': len(text_chunks),
'quality_score': self._calculate_quality_score(complete_chunk),
'full_page_coverage': True
}
})
return page_chunks
def _ensure_complete_sentences(self, text: str) -> str:
"""Ensure text contains only complete sentences."""
text = text.strip()
if not text:
return ""
# Find last complete sentence
last_sentence_end = -1
for i, char in enumerate(reversed(text)):
if char in '.!?:':
last_sentence_end = len(text) - i
break
if last_sentence_end > 0:
# Return text up to last complete sentence
complete_text = text[:last_sentence_end].strip()
# Ensure it starts properly (capital letter or common starters)
if complete_text and (complete_text[0].isupper() or
complete_text.startswith(('The ', 'A ', 'An ', 'This ', 'RISC'))):
return complete_text
# If no complete sentences found, return empty
return ""
def parse_document(self, pdf_path: Path, pdf_data: Dict[str, Any] = None) -> List[Dict]:
"""
Parse document using PDFPlumber (required by HybridParser).
Args:
pdf_path: Path to PDF file
pdf_data: PyMuPDF page data to ensure full page coverage
Returns:
List of chunks with structure preservation across ALL pages
"""
if pdf_data and 'pages' in pdf_data:
# Use PyMuPDF page data to ensure full coverage
return self.extract_with_page_coverage(pdf_path, pdf_data['pages'])
else:
# Fallback to structure-based extraction
return self.extract_with_structure(pdf_path)
def parse_pdf_with_pdfplumber(pdf_path: Path, **kwargs) -> List[Dict]:
"""Main entry point for PDFPlumber parsing."""
parser = PDFPlumberParser(**kwargs)
chunks = parser.extract_with_structure(pdf_path)
print(f"PDFPlumber extracted {len(chunks)} chunks")
return chunks