# pdf_processing.py import fitz # PyMuPDF import pymupdf4llm import os import tempfile import traceback from typing import Tuple, Optional, List, Dict, Any from collections import Counter def convert_rect_to_dict(rect: fitz.Rect) -> Optional[Dict[str, float]]: """Converts a fitz.Rect object to a dictionary.""" if not rect or not isinstance(rect, fitz.Rect): print(f"Warning: Invalid rect object received: {rect}") return None return { "x0": rect.x0, "y0": rect.y0, "x1": rect.x1, "y1": rect.y1, "width": rect.width, "height": rect.height } def try_map_issues_to_page_rects( issues_to_map_for_context: List[Dict[str, Any]], pdf_rects: List[fitz.Rect], page_number_for_mapping: int # 1-based page number ) -> int: """Helper function for mapping LT issues to PDF rectangles.""" mapped_count = 0 num_issues_to_try = len(issues_to_map_for_context) num_available_rects = len(pdf_rects) limit = min(num_issues_to_try, num_available_rects) for i in range(limit): issue_to_update = issues_to_map_for_context[i] if issue_to_update['is_mapped_to_pdf']: # Check the correct flag name continue pdf_rect = pdf_rects[i] coord_dict = convert_rect_to_dict(pdf_rect) if coord_dict: issue_to_update['pdf_coordinates_list'] = [coord_dict] # Store as list of dicts issue_to_update['is_mapped_to_pdf'] = True issue_to_update['mapped_page_number'] = page_number_for_mapping mapped_count += 1 else: print(f" Warning: Could not convert rect for context '{issue_to_update['context_text'][:30]}...' on page {page_number_for_mapping}") return mapped_count # The function is modified as requested. def extract_pdf_text(file_input: Any) -> str: """ Extracts text from a PDF, filters it to include only the majority font, and then converts this filtered text to Markdown using PyMuPDF4LLM. The "majority font" is defined by the combination of font name and (rounded) font size that accounts for the most characters in the document. """ input_temp_file_path = None # For when file_input is a stream actual_path_to_process = None original_doc = None new_doc = None # The new document we will build try: # 1. Handle Input to get actual_path_to_process if isinstance(file_input, str): actual_path_to_process = file_input elif hasattr(file_input, 'read') and callable(file_input.read): with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_file_obj: input_temp_file_path = temp_file_obj.name file_input.seek(0) # Ensure reading from the beginning of the stream temp_file_obj.write(file_input.read()) actual_path_to_process = input_temp_file_path else: raise ValueError("Input 'file_input' must be a file path (str) or a file-like object.") original_doc = fitz.open(actual_path_to_process) if not original_doc.page_count: print("PDF has no pages.") if input_temp_file_path and os.path.exists(input_temp_file_path): os.remove(input_temp_file_path) # clean up if we exit early return "" # 2. Collect Font Data & Text from all Spans all_spans_details: List[Dict[str, Any]] = [] # Explicitly type for clarity font_char_counts: Counter = Counter() print(f"Original PDF ('{os.path.basename(actual_path_to_process if isinstance(actual_path_to_process, str) else 'stream')}') has {original_doc.page_count} pages. Analyzing fonts...") for page_num in range(original_doc.page_count): page = original_doc[page_num] text_dict = page.get_text("dict") for block in text_dict.get("blocks", []): if block.get("type") == 0: # Process only text blocks (type 0) for line in block.get("lines", []): for span in line.get("spans", []): font_name = span["font"] font_size_rounded = int(round(span["size"])) text = span["text"] span_detail = { "text": text, "font_name": font_name, "font_size_rounded": font_size_rounded, "original_font_size": span["size"], "bbox": span["bbox"], "page_num": page_num } all_spans_details.append(span_detail) font_char_counts[(font_name, font_size_rounded)] += len(text) if not font_char_counts: print("No text with font information found in PDF.") # Cleanup and return if no text info if original_doc: original_doc.close() if input_temp_file_path and os.path.exists(input_temp_file_path): os.remove(input_temp_file_path) return "" # 3. Determine Majority Font majority_font_tuple_info = font_char_counts.most_common(1)[0] (majority_font_name, majority_font_size_rounded) = majority_font_tuple_info[0] char_count = majority_font_tuple_info[1] print(f"Majority font combination: Name='{majority_font_name}', RoundedSize={majority_font_size_rounded}pt (with {char_count} characters).") # 4. Create a New PDF Document with Only the Majority Font Text new_doc = fitz.Document() print("Constructing new PDF with text from majority font only...") for p_num in range(original_doc.page_count): original_page_for_dim = original_doc[p_num] new_pdf_page = new_doc.new_page(width=original_page_for_dim.rect.width, height=original_page_for_dim.rect.height) spans_to_write = [ s_detail for s_detail in all_spans_details if s_detail["page_num"] == p_num and \ s_detail["font_name"] == majority_font_name and \ s_detail["font_size_rounded"] == majority_font_size_rounded ] for span_data in spans_to_write: text_to_insert = span_data["text"] original_bbox = fitz.Rect(span_data["bbox"]) font_size_for_render = span_data["original_font_size"] insertion_result = new_pdf_page.insert_textbox( original_bbox, text_to_insert, fontsize=font_size_for_render, fontname="helv", # Using Helvetica for simplicity align=0 ) if insertion_result < 0: print(f"Warning: Textbox insertion for '{text_to_insert[:30].replace(chr(10), ' ')}...' in rect {original_bbox} on new page {p_num} might have issues (code: {insertion_result}).") print(f"New PDF constructed with {new_doc.page_count} pages.") # 5. Convert the In-Memory Filtered PDF Document to Markdown if new_doc.page_count > 0: print(f"Converting filtered PDF Document object to Markdown using pymupdf4llm...") markdown_text = pymupdf4llm.to_markdown(new_doc) else: print("The new PDF document (filtered) is empty. No markdown will be generated.") markdown_text = "" print(f"Total Markdown text length from filtered PDF: {len(markdown_text)} characters.") return markdown_text except Exception as e: print(f"Error in extract_pdf_text: {str(e)}") traceback.print_exc() return "" finally: if original_doc: original_doc.close() if new_doc: new_doc.close() if input_temp_file_path and os.path.exists(input_temp_file_path): try: os.remove(input_temp_file_path) print(f"Cleaned up temporary input file: {input_temp_file_path}") except Exception as e_clean: print(f"Error cleaning up temporary input file {input_temp_file_path}: {e_clean}")