File size: 11,105 Bytes
c2e3cf5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 |
import fitz # PyMuPDF
import camelot # For table extraction
import pandas as pd
from bs4 import BeautifulSoup
import re
from pathlib import Path
import traceback
# Path configuration (assuming these are passed or relative to run_pipeline.py)
# For module, these should ideally be arguments or imported from a config
# BASE_DIR = Path("/content/")
# PDF_DIRECTORY = BASE_DIR / "docs"
# IMAGE_SAVE_SUBDIR = "extracted_graphs"
# TABLE_SAVE_SUBDIR = "extracted_tables"
# STATIC_DIR = BASE_DIR / "static"
# IMAGE_SAVE_DIR = STATIC_DIR / IMAGE_SAVE_SUBDIR
# TABLE_SAVE_DIR = STATIC_DIR / TABLE_SAVE_SUBDIR
# These should be passed as arguments or configured at a higher level
IMAGE_MIN_WIDTH = 100 # Ignore very small images (likely logos/icons)
IMAGE_MIN_HEIGHT = 100
def clean_text(text):
"""Normalize whitespace and clean text while preserving paragraph breaks"""
if not text:
return ""
# Replace tabs with spaces, but preserve paragraph breaks
text = text.replace('\t', ' ')
# Normalize multiple spaces to single spaces
text = re.sub(r' +', ' ', text)
# Preserve paragraph breaks but normalize them
text = re.sub(r'\n{3,}', '\n\n', text)
return text.strip()
def extract_page_data_pymupdf(pdf_path, image_save_dir, table_save_dir, image_save_subdir, table_save_subdir):
"""Extract text, tables and save images from each page using PyMuPDF and Camelot."""
page_data_list = []
try:
doc = fitz.open(pdf_path)
metadata = doc.metadata or {}
pdf_data = {
'pdf_title': metadata.get('title', pdf_path.name),
'pdf_subject': metadata.get('subject', 'Statistiques'),
'pdf_keywords': metadata.get('keywords', '')
}
for page_num in range(len(doc)):
page = doc.load_page(page_num)
page_index = page_num + 1 # 1-based index
print(f" Extraction des données de la page {page_index}...")
# Extract tables first
table_data = extract_tables_and_images_from_page(pdf_path, page, page_index, table_save_dir, image_save_dir, image_save_subdir, table_save_subdir)
# Track table regions to avoid double-processing text
table_regions = []
for item in table_data:
if 'rect' in item and item['rect'] and len(item['rect']) == 4:
table_regions.append(fitz.Rect(item['rect']))
else:
print(f" Warning: Invalid rect for table on page {page_index}")
# Extract text excluding table regions
page_text = ""
if table_regions:
# Get text blocks
blocks = page.get_text("blocks")
for block in blocks:
block_rect = fitz.Rect(block[:4])
is_in_table = False
for table_rect in table_regions:
if block_rect.intersects(table_rect):
is_in_table = True
break
if not is_in_table:
page_text += block[4] + "\n" # Add text content
else:
# If no tables, get all text
page_text = page.get_text("text")
page_text = clean_text(page_text)
# Extract and save images (excluding those identified as tables)
image_data = extract_images_from_page(pdf_path, page, page_index, image_save_dir, image_save_subdir, excluded_rects=table_regions)
page_data_list.append({
'pdf_file': pdf_path.name,
'page_number': page_index,
'text': page_text,
'images': image_data, # Includes non-table images
'tables': [item for item in table_data if item['content_type'] == 'table'], # Only table data here
'pdf_title': pdf_data.get('pdf_title'),
'pdf_subject': pdf_data.get('pdf_subject'),
'pdf_keywords': pdf_data.get('pdf_keywords')
})
doc.close()
except Exception as e:
print(f"Erreur lors du traitement du PDF {pdf_path.name} avec PyMuPDF : {str(e)}")
traceback.print_exc() # Print traceback for debugging
return page_data_list
def extract_tables_and_images_from_page(pdf_path, page, page_num, table_save_dir, image_save_dir, image_save_subdir, table_save_subdir):
"""Extract tables using Camelot and capture images of table areas."""
table_and_image_data = []
try:
tables = camelot.read_pdf(
str(pdf_path),
pages=str(page_num),
flavor='lattice',
)
if len(tables) == 0:
tables = camelot.read_pdf(
str(pdf_path),
pages=str(page_num),
flavor='stream'
)
for i, table in enumerate(tables):
if table.accuracy < 70:
print(f" Skipping low accuracy table ({table.accuracy:.2f}%) on page {page_num}")
continue
table_bbox = table.parsing_report.get('page_bbox', [0, 0, 0, 0])
if not table_bbox or len(table_bbox) != 4:
print(f" Warning: Invalid bounding box for table {i} on page {page_num}. Skipping image capture.")
table_rect = None
else:
table_rect = fitz.Rect(table_bbox)
safe_pdf_name = "".join(c if c.isalnum() else "_" for c in pdf_path.stem)
table_html_filename = f"{safe_pdf_name}_p{page_num}_table{i}.html"
table_html_save_path = table_save_dir / table_html_filename
relative_html_url_path = f"/static/{table_save_subdir}/{table_html_filename}"
table_image_filename = f"{safe_pdf_name}_p{page_num}_table{i}.png"
table_image_save_path = image_save_dir / table_image_filename
relative_image_url_path = f"/static/{image_save_subdir}/{table_image_filename}"
df = table.df
html = f"<caption>Table extrait de {pdf_path.name}, page {page_num}</caption>\n" + df.to_html(index=False)
soup = BeautifulSoup(html, 'html.parser')
table_tag = soup.find('table')
if table_tag:
table_tag['class'] = 'table table-bordered table-striped'
table_tag['style'] = 'width:100%; border-collapse:collapse;'
style_tag = soup.new_tag('style')
style_tag.string = """
.table { border-collapse: collapse; width: 100%; margin-bottom: 1rem;}
.table caption { caption-side: top; padding: 0.5rem; text-align: left; font-weight: bold; }
.table th, .table td { border: 1px solid #ddd; padding: 8px; text-align: left; }
.table th { background-color: #f2f2f2; font-weight: bold; }
.table-striped tbody tr:nth-of-type(odd) { background-color: rgba(0,0,0,.05); }
.table-responsive { overflow-x: auto; margin-bottom: 1rem; }
"""
soup.insert(0, style_tag)
div = soup.new_tag('div')
div['class'] = 'table-responsive'
table_tag.wrap(div)
with open(table_html_save_path, 'w', encoding='utf-8') as f:
f.write(str(soup))
else:
print(f" Warning: Could not find table tag in HTML for table on page {page_num}. Skipping HTML save.")
continue
table_image_bytes = None
if table_rect:
try:
pix = page.get_pixmap(clip=table_rect)
table_image_bytes = pix.tobytes(format='png')
with open(table_image_save_path, "wb") as img_file:
img_file.write(table_image_bytes)
except Exception as img_capture_e:
print(f" Erreur lors de la capture d'image du tableau {i} page {page_num} : {img_capture_e}")
traceback.print_exc()
table_image_bytes = None
table_and_image_data.append({
'content_type': 'table',
'table_html_url': relative_html_url_path,
'table_text_representation': df.to_string(index=False),
'rect': [table_rect.x0, table_rect.y0, table_rect.x1, table_rect.y1] if table_rect else None,
'accuracy': table.accuracy,
'image_bytes': table_image_bytes,
'image_url': relative_image_url_path if table_image_bytes else None
})
return table_and_image_data
except Exception as e:
print(f" Erreur lors de l'extraction des tableaux de la page {page_num} : {str(e)}")
traceback.print_exc()
return []
def extract_images_from_page(pdf_path, page, page_num, image_save_dir, image_save_subdir, excluded_rects=[]):
"""Extract and save images from a page, excluding specified regions (like tables)."""
image_data = []
image_list = page.get_images(full=True)
for img_index, img_info in enumerate(image_list):
xref = img_info[0]
try:
base_image = page.parent.extract_image(xref)
image_bytes = base_image["image"]
image_ext = base_image["ext"]
width = base_image["width"]
height = base_image["height"]
if width < IMAGE_MIN_WIDTH or height < IMAGE_MIN_HEIGHT:
continue
img_rect = None
img_rects = page.get_image_rects(xref)
if img_rects:
img_rect = img_rects[0]
if img_rect is None:
print(f" Warning: Could not find rectangle for image {img_index} on page {page_num}. Skipping.")
continue
is_excluded = False
for excluded_rect in excluded_rects:
if img_rect.intersects(excluded_rect):
is_excluded = True
break
if is_excluded:
print(f" Image {img_index} on page {page_num} is within an excluded region (e.g., table). Skipping.")
continue
safe_pdf_name = "".join(c if c.isalnum() else "_" for c in pdf_path.stem)
image_filename = f"{safe_pdf_name}_p{page_num}_img{img_index}.{image_ext}"
image_save_path = image_save_dir / image_filename
relative_url_path = f"/static/{image_save_subdir}/{image_filename}"
with open(image_save_path, "wb") as img_file:
img_file.write(image_bytes)
image_data.append({
'content_type': 'image',
'image_url': relative_url_path,
'rect': [img_rect.x0, img_rect.y0, img_rect.x1, img_rect.y1],
'image_bytes': image_bytes
})
except Exception as img_save_e:
print(f" Erreur lors du traitement de l'image {img_index} de la page {page_num} : {img_save_e}")
traceback.print_exc()
return image_data
|