Update pdf_processing.py
Browse files- pdf_processing.py +119 -18
pdf_processing.py
CHANGED
@@ -5,6 +5,7 @@ import os
|
|
5 |
import tempfile
|
6 |
import traceback
|
7 |
from typing import Tuple, Optional, List, Dict, Any
|
|
|
8 |
|
9 |
def convert_rect_to_dict(rect: fitz.Rect) -> Optional[Dict[str, float]]:
|
10 |
"""Converts a fitz.Rect object to a dictionary."""
|
@@ -46,37 +47,137 @@ def try_map_issues_to_page_rects(
|
|
46 |
print(f" Warning: Could not convert rect for context '{issue_to_update['context_text'][:30]}...' on page {page_number_for_mapping}")
|
47 |
return mapped_count
|
48 |
|
|
|
49 |
def extract_pdf_text(file_input: Any) -> str:
|
50 |
-
"""
|
51 |
-
|
|
|
|
|
|
|
|
|
|
|
52 |
actual_path_to_process = None
|
|
|
|
|
|
|
53 |
try:
|
|
|
54 |
if isinstance(file_input, str):
|
55 |
actual_path_to_process = file_input
|
56 |
elif hasattr(file_input, 'read') and callable(file_input.read):
|
57 |
-
|
58 |
-
|
59 |
-
|
60 |
-
|
61 |
-
|
62 |
-
actual_path_to_process = temp_file_path_for_pymupdf4llm
|
63 |
else:
|
64 |
raise ValueError("Input 'file_input' must be a file path (str) or a file-like object.")
|
65 |
|
66 |
-
|
67 |
-
|
68 |
-
|
69 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
70 |
|
71 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
72 |
|
73 |
-
print(f"Total
|
74 |
return markdown_text
|
75 |
-
|
76 |
except Exception as e:
|
77 |
-
print(f"Error
|
78 |
traceback.print_exc()
|
79 |
return ""
|
80 |
finally:
|
81 |
-
if
|
82 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
5 |
import tempfile
|
6 |
import traceback
|
7 |
from typing import Tuple, Optional, List, Dict, Any
|
8 |
+
from collections import Counter
|
9 |
|
10 |
def convert_rect_to_dict(rect: fitz.Rect) -> Optional[Dict[str, float]]:
|
11 |
"""Converts a fitz.Rect object to a dictionary."""
|
|
|
47 |
print(f" Warning: Could not convert rect for context '{issue_to_update['context_text'][:30]}...' on page {page_number_for_mapping}")
|
48 |
return mapped_count
|
49 |
|
50 |
+
# The function is modified as requested.
|
51 |
def extract_pdf_text(file_input: Any) -> str:
|
52 |
+
"""
|
53 |
+
Extracts text from a PDF, filters it to include only the majority font,
|
54 |
+
and then converts this filtered text to Markdown using PyMuPDF4LLM.
|
55 |
+
The "majority font" is defined by the combination of font name and
|
56 |
+
(rounded) font size that accounts for the most characters in the document.
|
57 |
+
"""
|
58 |
+
input_temp_file_path = None # For when file_input is a stream
|
59 |
actual_path_to_process = None
|
60 |
+
original_doc = None
|
61 |
+
new_doc = None # The new document we will build
|
62 |
+
|
63 |
try:
|
64 |
+
# 1. Handle Input to get actual_path_to_process
|
65 |
if isinstance(file_input, str):
|
66 |
actual_path_to_process = file_input
|
67 |
elif hasattr(file_input, 'read') and callable(file_input.read):
|
68 |
+
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_file_obj:
|
69 |
+
input_temp_file_path = temp_file_obj.name
|
70 |
+
file_input.seek(0) # Ensure reading from the beginning of the stream
|
71 |
+
temp_file_obj.write(file_input.read())
|
72 |
+
actual_path_to_process = input_temp_file_path
|
|
|
73 |
else:
|
74 |
raise ValueError("Input 'file_input' must be a file path (str) or a file-like object.")
|
75 |
|
76 |
+
original_doc = fitz.open(actual_path_to_process)
|
77 |
+
if not original_doc.page_count:
|
78 |
+
print("PDF has no pages.")
|
79 |
+
if input_temp_file_path and os.path.exists(input_temp_file_path):
|
80 |
+
os.remove(input_temp_file_path) # clean up if we exit early
|
81 |
+
return ""
|
82 |
+
|
83 |
+
# 2. Collect Font Data & Text from all Spans
|
84 |
+
all_spans_details: List[Dict[str, Any]] = [] # Explicitly type for clarity
|
85 |
+
font_char_counts: Counter = Counter()
|
86 |
+
|
87 |
+
print(f"Original PDF ('{os.path.basename(actual_path_to_process if isinstance(actual_path_to_process, str) else 'stream')}') has {original_doc.page_count} pages. Analyzing fonts...")
|
88 |
+
for page_num in range(original_doc.page_count):
|
89 |
+
page = original_doc[page_num]
|
90 |
+
text_dict = page.get_text("dict")
|
91 |
+
for block in text_dict.get("blocks", []):
|
92 |
+
if block.get("type") == 0: # Process only text blocks (type 0)
|
93 |
+
for line in block.get("lines", []):
|
94 |
+
for span in line.get("spans", []):
|
95 |
+
font_name = span["font"]
|
96 |
+
font_size_rounded = int(round(span["size"]))
|
97 |
+
text = span["text"]
|
98 |
+
|
99 |
+
span_detail = {
|
100 |
+
"text": text,
|
101 |
+
"font_name": font_name,
|
102 |
+
"font_size_rounded": font_size_rounded,
|
103 |
+
"original_font_size": span["size"],
|
104 |
+
"bbox": span["bbox"],
|
105 |
+
"page_num": page_num
|
106 |
+
}
|
107 |
+
all_spans_details.append(span_detail)
|
108 |
+
font_char_counts[(font_name, font_size_rounded)] += len(text)
|
109 |
+
|
110 |
+
if not font_char_counts:
|
111 |
+
print("No text with font information found in PDF.")
|
112 |
+
# Cleanup and return if no text info
|
113 |
+
if original_doc: original_doc.close()
|
114 |
+
if input_temp_file_path and os.path.exists(input_temp_file_path):
|
115 |
+
os.remove(input_temp_file_path)
|
116 |
+
return ""
|
117 |
+
|
118 |
+
# 3. Determine Majority Font
|
119 |
+
majority_font_tuple_info = font_char_counts.most_common(1)[0]
|
120 |
+
(majority_font_name, majority_font_size_rounded) = majority_font_tuple_info[0]
|
121 |
+
char_count = majority_font_tuple_info[1]
|
122 |
+
print(f"Majority font combination: Name='{majority_font_name}', RoundedSize={majority_font_size_rounded}pt (with {char_count} characters).")
|
123 |
+
|
124 |
+
# 4. Create a New PDF Document with Only the Majority Font Text
|
125 |
+
new_doc = fitz.Document()
|
126 |
+
print("Constructing new PDF with text from majority font only...")
|
127 |
+
|
128 |
+
for p_num in range(original_doc.page_count):
|
129 |
+
original_page_for_dim = original_doc[p_num]
|
130 |
+
new_pdf_page = new_doc.new_page(width=original_page_for_dim.rect.width,
|
131 |
+
height=original_page_for_dim.rect.height)
|
132 |
+
|
133 |
+
spans_to_write = [
|
134 |
+
s_detail for s_detail in all_spans_details
|
135 |
+
if s_detail["page_num"] == p_num and \
|
136 |
+
s_detail["font_name"] == majority_font_name and \
|
137 |
+
s_detail["font_size_rounded"] == majority_font_size_rounded
|
138 |
+
]
|
139 |
+
|
140 |
+
for span_data in spans_to_write:
|
141 |
+
text_to_insert = span_data["text"]
|
142 |
+
original_bbox = fitz.Rect(span_data["bbox"])
|
143 |
+
font_size_for_render = span_data["original_font_size"]
|
144 |
+
|
145 |
+
insertion_result = new_pdf_page.insert_textbox(
|
146 |
+
original_bbox,
|
147 |
+
text_to_insert,
|
148 |
+
fontsize=font_size_for_render,
|
149 |
+
fontname="helv", # Using Helvetica for simplicity
|
150 |
+
align=0
|
151 |
+
)
|
152 |
+
if insertion_result < 0:
|
153 |
+
print(f"Warning: Textbox insertion for '{text_to_insert[:30].replace(chr(10), ' ')}...' in rect {original_bbox} on new page {p_num} might have issues (code: {insertion_result}).")
|
154 |
|
155 |
+
print(f"New PDF constructed with {new_doc.page_count} pages.")
|
156 |
+
|
157 |
+
# 5. Convert the In-Memory Filtered PDF Document to Markdown
|
158 |
+
if new_doc.page_count > 0:
|
159 |
+
print(f"Converting filtered PDF Document object to Markdown using pymupdf4llm...")
|
160 |
+
markdown_text = pymupdf4llm.to_markdown(new_doc)
|
161 |
+
else:
|
162 |
+
print("The new PDF document (filtered) is empty. No markdown will be generated.")
|
163 |
+
markdown_text = ""
|
164 |
|
165 |
+
print(f"Total Markdown text length from filtered PDF: {len(markdown_text)} characters.")
|
166 |
return markdown_text
|
167 |
+
|
168 |
except Exception as e:
|
169 |
+
print(f"Error in extract_pdf_text: {str(e)}")
|
170 |
traceback.print_exc()
|
171 |
return ""
|
172 |
finally:
|
173 |
+
if original_doc:
|
174 |
+
original_doc.close()
|
175 |
+
if new_doc:
|
176 |
+
new_doc.close()
|
177 |
+
|
178 |
+
if input_temp_file_path and os.path.exists(input_temp_file_path):
|
179 |
+
try:
|
180 |
+
os.remove(input_temp_file_path)
|
181 |
+
print(f"Cleaned up temporary input file: {input_temp_file_path}")
|
182 |
+
except Exception as e_clean:
|
183 |
+
print(f"Error cleaning up temporary input file {input_temp_file_path}: {e_clean}")
|