Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
PDFPlumber-based Parser | |
Advanced PDF parsing using pdfplumber for better structure detection | |
and cleaner text extraction. | |
Author: Arthur Passuello | |
""" | |
import re | |
import pdfplumber | |
from pathlib import Path | |
from typing import Dict, List, Optional, Tuple, Any | |
class PDFPlumberParser: | |
"""Advanced PDF parser using pdfplumber for structure-aware extraction.""" | |
def __init__(self, target_chunk_size: int = 1400, min_chunk_size: int = 800, | |
max_chunk_size: int = 2000): | |
"""Initialize PDFPlumber parser.""" | |
self.target_chunk_size = target_chunk_size | |
self.min_chunk_size = min_chunk_size | |
self.max_chunk_size = max_chunk_size | |
# Trash content patterns | |
self.trash_patterns = [ | |
r'Creative Commons.*?License', | |
r'International License.*?authors', | |
r'RISC-V International', | |
r'Visit.*?for further', | |
r'editors to suggest.*?corrections', | |
r'released under.*?license', | |
r'\.{5,}', # Long dots (TOC artifacts) | |
r'^\d+\s*$', # Page numbers alone | |
] | |
def extract_with_structure(self, pdf_path: Path) -> List[Dict]: | |
"""Extract PDF content with structure awareness using pdfplumber.""" | |
chunks = [] | |
with pdfplumber.open(pdf_path) as pdf: | |
current_section = None | |
current_text = [] | |
for page_num, page in enumerate(pdf.pages): | |
# Extract text with formatting info | |
page_content = self._extract_page_content(page, page_num + 1) | |
for element in page_content: | |
if element['type'] == 'header': | |
# Save previous section if exists | |
if current_text: | |
chunk_text = '\n\n'.join(current_text) | |
if self._is_valid_chunk(chunk_text): | |
chunks.extend(self._create_chunks( | |
chunk_text, | |
current_section or "Document", | |
page_num | |
)) | |
# Start new section | |
current_section = element['text'] | |
current_text = [] | |
elif element['type'] == 'content': | |
# Add to current section | |
if self._is_valid_content(element['text']): | |
current_text.append(element['text']) | |
# Don't forget last section | |
if current_text: | |
chunk_text = '\n\n'.join(current_text) | |
if self._is_valid_chunk(chunk_text): | |
chunks.extend(self._create_chunks( | |
chunk_text, | |
current_section or "Document", | |
len(pdf.pages) | |
)) | |
return chunks | |
def _extract_page_content(self, page: Any, page_num: int) -> List[Dict]: | |
"""Extract structured content from a page.""" | |
content = [] | |
# Get all text with positioning | |
chars = page.chars | |
if not chars: | |
return content | |
# Group by lines | |
lines = [] | |
current_line = [] | |
current_y = None | |
for char in sorted(chars, key=lambda x: (x['top'], x['x0'])): | |
if current_y is None or abs(char['top'] - current_y) < 2: | |
current_line.append(char) | |
current_y = char['top'] | |
else: | |
if current_line: | |
lines.append(current_line) | |
current_line = [char] | |
current_y = char['top'] | |
if current_line: | |
lines.append(current_line) | |
# Analyze each line | |
for line in lines: | |
line_text = ''.join(char['text'] for char in line).strip() | |
if not line_text: | |
continue | |
# Detect headers by font size | |
avg_font_size = sum(char.get('size', 12) for char in line) / len(line) | |
is_bold = any(char.get('fontname', '').lower().count('bold') > 0 for char in line) | |
# Classify content | |
if avg_font_size > 14 or is_bold: | |
# Likely a header | |
if self._is_valid_header(line_text): | |
content.append({ | |
'type': 'header', | |
'text': line_text, | |
'font_size': avg_font_size, | |
'page': page_num | |
}) | |
else: | |
# Regular content | |
content.append({ | |
'type': 'content', | |
'text': line_text, | |
'font_size': avg_font_size, | |
'page': page_num | |
}) | |
return content | |
def _is_valid_header(self, text: str) -> bool: | |
"""Check if text is a valid header.""" | |
# Skip if too short or too long | |
if len(text) < 3 or len(text) > 200: | |
return False | |
# Skip if matches trash patterns | |
for pattern in self.trash_patterns: | |
if re.search(pattern, text, re.IGNORECASE): | |
return False | |
# Valid if starts with number or capital letter | |
if re.match(r'^(\d+\.?\d*\s+|[A-Z])', text): | |
return True | |
# Valid if contains keywords | |
keywords = ['chapter', 'section', 'introduction', 'conclusion', 'appendix'] | |
return any(keyword in text.lower() for keyword in keywords) | |
def _is_valid_content(self, text: str) -> bool: | |
"""Check if text is valid content (not trash).""" | |
# Skip very short text | |
if len(text.strip()) < 10: | |
return False | |
# Skip trash patterns | |
for pattern in self.trash_patterns: | |
if re.search(pattern, text, re.IGNORECASE): | |
return False | |
return True | |
def _is_valid_chunk(self, text: str) -> bool: | |
"""Check if chunk text is valid.""" | |
# Must have minimum length | |
if len(text.strip()) < self.min_chunk_size // 2: | |
return False | |
# Must have some alphabetic content | |
alpha_chars = sum(1 for c in text if c.isalpha()) | |
if alpha_chars < len(text) * 0.5: | |
return False | |
return True | |
def _create_chunks(self, text: str, title: str, page: int) -> List[Dict]: | |
"""Create chunks from text.""" | |
chunks = [] | |
# Clean text | |
text = self._clean_text(text) | |
if len(text) <= self.max_chunk_size: | |
# Single chunk | |
chunks.append({ | |
'text': text, | |
'title': title, | |
'page': page, | |
'metadata': { | |
'parsing_method': 'pdfplumber', | |
'quality_score': self._calculate_quality_score(text) | |
} | |
}) | |
else: | |
# Split into chunks | |
text_chunks = self._split_text_into_chunks(text) | |
for i, chunk_text in enumerate(text_chunks): | |
chunks.append({ | |
'text': chunk_text, | |
'title': f"{title} (Part {i+1})", | |
'page': page, | |
'metadata': { | |
'parsing_method': 'pdfplumber', | |
'part_number': i + 1, | |
'total_parts': len(text_chunks), | |
'quality_score': self._calculate_quality_score(chunk_text) | |
} | |
}) | |
return chunks | |
def _clean_text(self, text: str) -> str: | |
"""Clean text from artifacts.""" | |
# Remove volume headers (e.g., "Volume I: RISC-V Unprivileged ISA V20191213") | |
text = re.sub(r'Volume\s+[IVX]+:\s*RISC-V[^V]*V\d{8}\s*', '', text, flags=re.IGNORECASE) | |
text = re.sub(r'^\d+\s+Volume\s+[IVX]+:.*?$', '', text, flags=re.MULTILINE) | |
# Remove document version artifacts | |
text = re.sub(r'Document Version \d{8}\s*', '', text, flags=re.IGNORECASE) | |
# Remove repeated ISA headers | |
text = re.sub(r'RISC-V.*?ISA.*?V\d{8}\s*', '', text, flags=re.IGNORECASE) | |
text = re.sub(r'The RISC-V Instruction Set Manual\s*', '', text, flags=re.IGNORECASE) | |
# Remove figure/table references that are standalone | |
text = re.sub(r'^(Figure|Table)\s+\d+\.\d+:.*?$', '', text, flags=re.MULTILINE) | |
# Remove email addresses (often in contributor lists) | |
text = re.sub(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', '', text) | |
# Remove URLs | |
text = re.sub(r'https?://[^\s]+', '', text) | |
# Remove page numbers at start/end of lines | |
text = re.sub(r'^\d{1,3}\s+', '', text, flags=re.MULTILINE) | |
text = re.sub(r'\s+\d{1,3}$', '', text, flags=re.MULTILINE) | |
# Remove excessive dots (TOC artifacts) | |
text = re.sub(r'\.{3,}', '', text) | |
# Remove standalone numbers (often page numbers or figure numbers) | |
text = re.sub(r'^\s*\d+\s*$', '', text, flags=re.MULTILINE) | |
# Clean up multiple spaces and newlines | |
text = re.sub(r'\s{3,}', ' ', text) | |
text = re.sub(r'\n{3,}', '\n\n', text) | |
text = re.sub(r'[ \t]+', ' ', text) # Normalize all whitespace | |
# Remove common boilerplate phrases | |
text = re.sub(r'Contains Nonbinding Recommendations\s*', '', text, flags=re.IGNORECASE) | |
text = re.sub(r'Guidance for Industry and FDA Staff\s*', '', text, flags=re.IGNORECASE) | |
return text.strip() | |
def _split_text_into_chunks(self, text: str) -> List[str]: | |
"""Split text into chunks at sentence boundaries.""" | |
sentences = re.split(r'(?<=[.!?])\s+', text) | |
chunks = [] | |
current_chunk = [] | |
current_size = 0 | |
for sentence in sentences: | |
sentence_size = len(sentence) | |
if current_size + sentence_size > self.target_chunk_size and current_chunk: | |
chunks.append(' '.join(current_chunk)) | |
current_chunk = [sentence] | |
current_size = sentence_size | |
else: | |
current_chunk.append(sentence) | |
current_size += sentence_size + 1 | |
if current_chunk: | |
chunks.append(' '.join(current_chunk)) | |
return chunks | |
def _calculate_quality_score(self, text: str) -> float: | |
"""Calculate quality score for chunk.""" | |
score = 1.0 | |
# Penalize very short or very long | |
if len(text) < self.min_chunk_size: | |
score *= 0.8 | |
elif len(text) > self.max_chunk_size: | |
score *= 0.9 | |
# Reward complete sentences | |
if text.strip().endswith(('.', '!', '?')): | |
score *= 1.1 | |
# Reward technical content | |
technical_terms = ['risc', 'instruction', 'register', 'memory', 'processor'] | |
term_count = sum(1 for term in technical_terms if term in text.lower()) | |
score *= (1 + term_count * 0.05) | |
return min(score, 1.0) | |
def extract_with_page_coverage(self, pdf_path: Path, pymupdf_pages: List[Dict]) -> List[Dict]: | |
""" | |
Extract content ensuring ALL pages are covered using PyMuPDF page data. | |
Args: | |
pdf_path: Path to PDF file | |
pymupdf_pages: Page data from PyMuPDF with page numbers and text | |
Returns: | |
List of chunks covering ALL document pages | |
""" | |
chunks = [] | |
chunk_id = 0 | |
print(f"📄 Processing {len(pymupdf_pages)} pages with PDFPlumber quality extraction...") | |
with pdfplumber.open(str(pdf_path)) as pdf: | |
for pymupdf_page in pymupdf_pages: | |
page_num = pymupdf_page['page_number'] # 1-indexed from PyMuPDF | |
page_idx = page_num - 1 # Convert to 0-indexed for PDFPlumber | |
if page_idx < len(pdf.pages): | |
# Extract with PDFPlumber quality from this specific page | |
pdfplumber_page = pdf.pages[page_idx] | |
page_text = pdfplumber_page.extract_text() | |
if page_text and page_text.strip(): | |
# Clean and chunk the page text | |
cleaned_text = self._clean_text(page_text) | |
if len(cleaned_text) >= 100: # Minimum meaningful content | |
# Create chunks from this page | |
page_chunks = self._create_page_chunks( | |
cleaned_text, page_num, chunk_id | |
) | |
chunks.extend(page_chunks) | |
chunk_id += len(page_chunks) | |
if len(chunks) % 50 == 0: # Progress indicator | |
print(f" Processed {page_num} pages, created {len(chunks)} chunks") | |
print(f"✅ Full coverage: {len(chunks)} chunks from {len(pymupdf_pages)} pages") | |
return chunks | |
def _create_page_chunks(self, page_text: str, page_num: int, start_chunk_id: int) -> List[Dict]: | |
"""Create properly sized chunks from a single page's content.""" | |
# Clean and validate page text first | |
cleaned_text = self._ensure_complete_sentences(page_text) | |
if not cleaned_text or len(cleaned_text) < 50: | |
# Skip pages with insufficient content | |
return [] | |
if len(cleaned_text) <= self.max_chunk_size: | |
# Single chunk for small pages | |
return [{ | |
'text': cleaned_text, | |
'title': f"Page {page_num}", | |
'page': page_num, | |
'metadata': { | |
'parsing_method': 'pdfplumber_page_coverage', | |
'quality_score': self._calculate_quality_score(cleaned_text), | |
'full_page_coverage': True | |
} | |
}] | |
else: | |
# Split large pages into chunks with sentence boundaries | |
text_chunks = self._split_text_into_chunks(cleaned_text) | |
page_chunks = [] | |
for i, chunk_text in enumerate(text_chunks): | |
# Ensure each chunk is complete | |
complete_chunk = self._ensure_complete_sentences(chunk_text) | |
if complete_chunk and len(complete_chunk) >= 100: | |
page_chunks.append({ | |
'text': complete_chunk, | |
'title': f"Page {page_num} (Part {i+1})", | |
'page': page_num, | |
'metadata': { | |
'parsing_method': 'pdfplumber_page_coverage', | |
'part_number': i + 1, | |
'total_parts': len(text_chunks), | |
'quality_score': self._calculate_quality_score(complete_chunk), | |
'full_page_coverage': True | |
} | |
}) | |
return page_chunks | |
def _ensure_complete_sentences(self, text: str) -> str: | |
"""Ensure text contains only complete sentences.""" | |
text = text.strip() | |
if not text: | |
return "" | |
# Find last complete sentence | |
last_sentence_end = -1 | |
for i, char in enumerate(reversed(text)): | |
if char in '.!?:': | |
last_sentence_end = len(text) - i | |
break | |
if last_sentence_end > 0: | |
# Return text up to last complete sentence | |
complete_text = text[:last_sentence_end].strip() | |
# Ensure it starts properly (capital letter or common starters) | |
if complete_text and (complete_text[0].isupper() or | |
complete_text.startswith(('The ', 'A ', 'An ', 'This ', 'RISC'))): | |
return complete_text | |
# If no complete sentences found, return empty | |
return "" | |
def parse_document(self, pdf_path: Path, pdf_data: Dict[str, Any] = None) -> List[Dict]: | |
""" | |
Parse document using PDFPlumber (required by HybridParser). | |
Args: | |
pdf_path: Path to PDF file | |
pdf_data: PyMuPDF page data to ensure full page coverage | |
Returns: | |
List of chunks with structure preservation across ALL pages | |
""" | |
if pdf_data and 'pages' in pdf_data: | |
# Use PyMuPDF page data to ensure full coverage | |
return self.extract_with_page_coverage(pdf_path, pdf_data['pages']) | |
else: | |
# Fallback to structure-based extraction | |
return self.extract_with_structure(pdf_path) | |
def parse_pdf_with_pdfplumber(pdf_path: Path, **kwargs) -> List[Dict]: | |
"""Main entry point for PDFPlumber parsing.""" | |
parser = PDFPlumberParser(**kwargs) | |
chunks = parser.extract_with_structure(pdf_path) | |
print(f"PDFPlumber extracted {len(chunks)} chunks") | |
return chunks |