|
|
|
import fitz |
|
import pymupdf4llm |
|
import os |
|
import tempfile |
|
import traceback |
|
from typing import Tuple, Optional, List, Dict, Any |
|
from collections import Counter |
|
|
|
def convert_rect_to_dict(rect: fitz.Rect) -> Optional[Dict[str, float]]: |
|
"""Converts a fitz.Rect object to a dictionary.""" |
|
if not rect or not isinstance(rect, fitz.Rect): |
|
print(f"Warning: Invalid rect object received: {rect}") |
|
return None |
|
return { |
|
"x0": rect.x0, |
|
"y0": rect.y0, |
|
"x1": rect.x1, |
|
"y1": rect.y1, |
|
"width": rect.width, |
|
"height": rect.height |
|
} |
|
|
|
def try_map_issues_to_page_rects( |
|
issues_to_map_for_context: List[Dict[str, Any]], |
|
pdf_rects: List[fitz.Rect], |
|
page_number_for_mapping: int |
|
) -> int: |
|
"""Helper function for mapping LT issues to PDF rectangles.""" |
|
mapped_count = 0 |
|
num_issues_to_try = len(issues_to_map_for_context) |
|
num_available_rects = len(pdf_rects) |
|
limit = min(num_issues_to_try, num_available_rects) |
|
|
|
for i in range(limit): |
|
issue_to_update = issues_to_map_for_context[i] |
|
if issue_to_update['is_mapped_to_pdf']: |
|
continue |
|
pdf_rect = pdf_rects[i] |
|
coord_dict = convert_rect_to_dict(pdf_rect) |
|
if coord_dict: |
|
issue_to_update['pdf_coordinates_list'] = [coord_dict] |
|
issue_to_update['is_mapped_to_pdf'] = True |
|
issue_to_update['mapped_page_number'] = page_number_for_mapping |
|
mapped_count += 1 |
|
else: |
|
print(f" Warning: Could not convert rect for context '{issue_to_update['context_text'][:30]}...' on page {page_number_for_mapping}") |
|
return mapped_count |
|
|
|
|
|
def extract_pdf_text(file_input: Any) -> str: |
|
""" |
|
Extracts text from a PDF, filters it to include only the majority font, |
|
and then converts this filtered text to Markdown using PyMuPDF4LLM. |
|
The "majority font" is defined by the combination of font name and |
|
(rounded) font size that accounts for the most characters in the document. |
|
""" |
|
input_temp_file_path = None |
|
actual_path_to_process = None |
|
original_doc = None |
|
new_doc = None |
|
|
|
try: |
|
|
|
if isinstance(file_input, str): |
|
actual_path_to_process = file_input |
|
elif hasattr(file_input, 'read') and callable(file_input.read): |
|
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_file_obj: |
|
input_temp_file_path = temp_file_obj.name |
|
file_input.seek(0) |
|
temp_file_obj.write(file_input.read()) |
|
actual_path_to_process = input_temp_file_path |
|
else: |
|
raise ValueError("Input 'file_input' must be a file path (str) or a file-like object.") |
|
|
|
original_doc = fitz.open(actual_path_to_process) |
|
if not original_doc.page_count: |
|
print("PDF has no pages.") |
|
if input_temp_file_path and os.path.exists(input_temp_file_path): |
|
os.remove(input_temp_file_path) |
|
return "" |
|
|
|
|
|
all_spans_details: List[Dict[str, Any]] = [] |
|
font_char_counts: Counter = Counter() |
|
|
|
print(f"Original PDF ('{os.path.basename(actual_path_to_process if isinstance(actual_path_to_process, str) else 'stream')}') has {original_doc.page_count} pages. Analyzing fonts...") |
|
for page_num in range(original_doc.page_count): |
|
page = original_doc[page_num] |
|
text_dict = page.get_text("dict") |
|
for block in text_dict.get("blocks", []): |
|
if block.get("type") == 0: |
|
for line in block.get("lines", []): |
|
for span in line.get("spans", []): |
|
font_name = span["font"] |
|
font_size_rounded = int(round(span["size"])) |
|
text = span["text"] |
|
|
|
span_detail = { |
|
"text": text, |
|
"font_name": font_name, |
|
"font_size_rounded": font_size_rounded, |
|
"original_font_size": span["size"], |
|
"bbox": span["bbox"], |
|
"page_num": page_num |
|
} |
|
all_spans_details.append(span_detail) |
|
font_char_counts[(font_name, font_size_rounded)] += len(text) |
|
|
|
if not font_char_counts: |
|
print("No text with font information found in PDF.") |
|
|
|
if original_doc: original_doc.close() |
|
if input_temp_file_path and os.path.exists(input_temp_file_path): |
|
os.remove(input_temp_file_path) |
|
return "" |
|
|
|
|
|
majority_font_tuple_info = font_char_counts.most_common(1)[0] |
|
(majority_font_name, majority_font_size_rounded) = majority_font_tuple_info[0] |
|
char_count = majority_font_tuple_info[1] |
|
print(f"Majority font combination: Name='{majority_font_name}', RoundedSize={majority_font_size_rounded}pt (with {char_count} characters).") |
|
|
|
|
|
new_doc = fitz.Document() |
|
print("Constructing new PDF with text from majority font only...") |
|
|
|
for p_num in range(original_doc.page_count): |
|
original_page_for_dim = original_doc[p_num] |
|
new_pdf_page = new_doc.new_page(width=original_page_for_dim.rect.width, |
|
height=original_page_for_dim.rect.height) |
|
|
|
spans_to_write = [ |
|
s_detail for s_detail in all_spans_details |
|
if s_detail["page_num"] == p_num and \ |
|
s_detail["font_name"] == majority_font_name and \ |
|
s_detail["font_size_rounded"] == majority_font_size_rounded |
|
] |
|
|
|
for span_data in spans_to_write: |
|
text_to_insert = span_data["text"] |
|
original_bbox = fitz.Rect(span_data["bbox"]) |
|
font_size_for_render = span_data["original_font_size"] |
|
|
|
insertion_result = new_pdf_page.insert_textbox( |
|
original_bbox, |
|
text_to_insert, |
|
fontsize=font_size_for_render, |
|
fontname="helv", |
|
align=0 |
|
) |
|
if insertion_result < 0: |
|
print(f"Warning: Textbox insertion for '{text_to_insert[:30].replace(chr(10), ' ')}...' in rect {original_bbox} on new page {p_num} might have issues (code: {insertion_result}).") |
|
|
|
print(f"New PDF constructed with {new_doc.page_count} pages.") |
|
|
|
|
|
if new_doc.page_count > 0: |
|
print(f"Converting filtered PDF Document object to Markdown using pymupdf4llm...") |
|
markdown_text = pymupdf4llm.to_markdown(new_doc) |
|
else: |
|
print("The new PDF document (filtered) is empty. No markdown will be generated.") |
|
markdown_text = "" |
|
|
|
print(f"Total Markdown text length from filtered PDF: {len(markdown_text)} characters.") |
|
return markdown_text |
|
|
|
except Exception as e: |
|
print(f"Error in extract_pdf_text: {str(e)}") |
|
traceback.print_exc() |
|
return "" |
|
finally: |
|
if original_doc: |
|
original_doc.close() |
|
if new_doc: |
|
new_doc.close() |
|
|
|
if input_temp_file_path and os.path.exists(input_temp_file_path): |
|
try: |
|
os.remove(input_temp_file_path) |
|
print(f"Cleaned up temporary input file: {input_temp_file_path}") |
|
except Exception as e_clean: |
|
print(f"Error cleaning up temporary input file {input_temp_file_path}: {e_clean}") |