samyak152002's picture
Update pdf_processing.py
eb20090 verified
raw
history blame
8.46 kB
# pdf_processing.py
import fitz # PyMuPDF
import pymupdf4llm
import os
import tempfile
import traceback
from typing import Tuple, Optional, List, Dict, Any
from collections import Counter
def convert_rect_to_dict(rect: fitz.Rect) -> Optional[Dict[str, float]]:
"""Converts a fitz.Rect object to a dictionary."""
if not rect or not isinstance(rect, fitz.Rect):
print(f"Warning: Invalid rect object received: {rect}")
return None
return {
"x0": rect.x0,
"y0": rect.y0,
"x1": rect.x1,
"y1": rect.y1,
"width": rect.width,
"height": rect.height
}
def try_map_issues_to_page_rects(
issues_to_map_for_context: List[Dict[str, Any]],
pdf_rects: List[fitz.Rect],
page_number_for_mapping: int # 1-based page number
) -> int:
"""Helper function for mapping LT issues to PDF rectangles."""
mapped_count = 0
num_issues_to_try = len(issues_to_map_for_context)
num_available_rects = len(pdf_rects)
limit = min(num_issues_to_try, num_available_rects)
for i in range(limit):
issue_to_update = issues_to_map_for_context[i]
if issue_to_update['is_mapped_to_pdf']: # Check the correct flag name
continue
pdf_rect = pdf_rects[i]
coord_dict = convert_rect_to_dict(pdf_rect)
if coord_dict:
issue_to_update['pdf_coordinates_list'] = [coord_dict] # Store as list of dicts
issue_to_update['is_mapped_to_pdf'] = True
issue_to_update['mapped_page_number'] = page_number_for_mapping
mapped_count += 1
else:
print(f" Warning: Could not convert rect for context '{issue_to_update['context_text'][:30]}...' on page {page_number_for_mapping}")
return mapped_count
# The function is modified as requested.
def extract_pdf_text(file_input: Any) -> str:
"""
Extracts text from a PDF, filters it to include only the majority font,
and then converts this filtered text to Markdown using PyMuPDF4LLM.
The "majority font" is defined by the combination of font name and
(rounded) font size that accounts for the most characters in the document.
"""
input_temp_file_path = None # For when file_input is a stream
actual_path_to_process = None
original_doc = None
new_doc = None # The new document we will build
try:
# 1. Handle Input to get actual_path_to_process
if isinstance(file_input, str):
actual_path_to_process = file_input
elif hasattr(file_input, 'read') and callable(file_input.read):
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_file_obj:
input_temp_file_path = temp_file_obj.name
file_input.seek(0) # Ensure reading from the beginning of the stream
temp_file_obj.write(file_input.read())
actual_path_to_process = input_temp_file_path
else:
raise ValueError("Input 'file_input' must be a file path (str) or a file-like object.")
original_doc = fitz.open(actual_path_to_process)
if not original_doc.page_count:
print("PDF has no pages.")
if input_temp_file_path and os.path.exists(input_temp_file_path):
os.remove(input_temp_file_path) # clean up if we exit early
return ""
# 2. Collect Font Data & Text from all Spans
all_spans_details: List[Dict[str, Any]] = [] # Explicitly type for clarity
font_char_counts: Counter = Counter()
print(f"Original PDF ('{os.path.basename(actual_path_to_process if isinstance(actual_path_to_process, str) else 'stream')}') has {original_doc.page_count} pages. Analyzing fonts...")
for page_num in range(original_doc.page_count):
page = original_doc[page_num]
text_dict = page.get_text("dict")
for block in text_dict.get("blocks", []):
if block.get("type") == 0: # Process only text blocks (type 0)
for line in block.get("lines", []):
for span in line.get("spans", []):
font_name = span["font"]
font_size_rounded = int(round(span["size"]))
text = span["text"]
span_detail = {
"text": text,
"font_name": font_name,
"font_size_rounded": font_size_rounded,
"original_font_size": span["size"],
"bbox": span["bbox"],
"page_num": page_num
}
all_spans_details.append(span_detail)
font_char_counts[(font_name, font_size_rounded)] += len(text)
if not font_char_counts:
print("No text with font information found in PDF.")
# Cleanup and return if no text info
if original_doc: original_doc.close()
if input_temp_file_path and os.path.exists(input_temp_file_path):
os.remove(input_temp_file_path)
return ""
# 3. Determine Majority Font
majority_font_tuple_info = font_char_counts.most_common(1)[0]
(majority_font_name, majority_font_size_rounded) = majority_font_tuple_info[0]
char_count = majority_font_tuple_info[1]
print(f"Majority font combination: Name='{majority_font_name}', RoundedSize={majority_font_size_rounded}pt (with {char_count} characters).")
# 4. Create a New PDF Document with Only the Majority Font Text
new_doc = fitz.Document()
print("Constructing new PDF with text from majority font only...")
for p_num in range(original_doc.page_count):
original_page_for_dim = original_doc[p_num]
new_pdf_page = new_doc.new_page(width=original_page_for_dim.rect.width,
height=original_page_for_dim.rect.height)
spans_to_write = [
s_detail for s_detail in all_spans_details
if s_detail["page_num"] == p_num and \
s_detail["font_name"] == majority_font_name and \
s_detail["font_size_rounded"] == majority_font_size_rounded
]
for span_data in spans_to_write:
text_to_insert = span_data["text"]
original_bbox = fitz.Rect(span_data["bbox"])
font_size_for_render = span_data["original_font_size"]
insertion_result = new_pdf_page.insert_textbox(
original_bbox,
text_to_insert,
fontsize=font_size_for_render,
fontname="helv", # Using Helvetica for simplicity
align=0
)
if insertion_result < 0:
print(f"Warning: Textbox insertion for '{text_to_insert[:30].replace(chr(10), ' ')}...' in rect {original_bbox} on new page {p_num} might have issues (code: {insertion_result}).")
print(f"New PDF constructed with {new_doc.page_count} pages.")
# 5. Convert the In-Memory Filtered PDF Document to Markdown
if new_doc.page_count > 0:
print(f"Converting filtered PDF Document object to Markdown using pymupdf4llm...")
markdown_text = pymupdf4llm.to_markdown(new_doc)
else:
print("The new PDF document (filtered) is empty. No markdown will be generated.")
markdown_text = ""
print(f"Total Markdown text length from filtered PDF: {len(markdown_text)} characters.")
return markdown_text
except Exception as e:
print(f"Error in extract_pdf_text: {str(e)}")
traceback.print_exc()
return ""
finally:
if original_doc:
original_doc.close()
if new_doc:
new_doc.close()
if input_temp_file_path and os.path.exists(input_temp_file_path):
try:
os.remove(input_temp_file_path)
print(f"Cleaned up temporary input file: {input_temp_file_path}")
except Exception as e_clean:
print(f"Error cleaning up temporary input file {input_temp_file_path}: {e_clean}")