Spaces:
Running
Running
File size: 15,837 Bytes
84aedaf 000109f 84aedaf 000109f 84aedaf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 |
import io
import mimetypes
from typing import Tuple, Optional, BinaryIO
from PIL import Image, ImageOps
# Import PyMuPDF for PDF processing
try:
import fitz # PyMuPDF for PDF processing
FITZ_AVAILABLE = True
except ImportError:
try:
from PyMuPDF import fitz # Alternative import method
FITZ_AVAILABLE = True
except ImportError:
try:
import PyMuPDF as fitz # Another alternative import
FITZ_AVAILABLE = True
except ImportError:
fitz = None # PDF processing will be disabled
FITZ_AVAILABLE = False
import tempfile
import os
class ImagePreprocessor:
"""Service for preprocessing various image formats before storage"""
# Configuration options for performance tuning
PDF_ZOOM_FACTOR = 1.5 # Reduce from 2.0 for better performance
PDF_COMPRESS_LEVEL = 6 # PNG compression level (0-9, higher = smaller but slower)
PDF_QUALITY_MODE = 'balanced' # 'fast', 'balanced', or 'quality'
# PyMuPDF availability flag
FITZ_AVAILABLE = FITZ_AVAILABLE
SUPPORTED_IMAGE_MIME_TYPES = {
'image/png',
'image/jpeg',
'image/jpg',
'image/heic',
'image/heif',
'image/webp',
'image/gif',
'image/tiff',
'image/tif',
'application/pdf'
}
@staticmethod
def detect_mime_type(file_content: bytes, filename: str) -> str:
"""Detect MIME type from file content and filename"""
# First try to detect from content
mime_type, _ = mimetypes.guess_type(filename)
# If no MIME type detected, try to infer from content
if not mime_type:
# Check for common file signatures
if file_content.startswith(b'\x89PNG\r\n\x1a\n'):
mime_type = 'image/png'
elif file_content.startswith(b'\xff\xd8\xff'):
mime_type = 'image/jpeg'
elif file_content.startswith(b'\x49\x49\x2a\x00') or file_content.startswith(b'\x4d\x4d\x00\x2a'):
mime_type = 'image/tiff'
elif file_content.startswith(b'%PDF'):
mime_type = 'application/pdf'
elif file_content.startswith(b'GIF87a') or file_content.startswith(b'GIF89a'):
mime_type = 'image/gif'
elif file_content.startswith(b'RIFF') and file_content[8:12] == b'WEBP':
mime_type = 'image/webp'
elif file_content.startswith(b'\x00\x00\x00\x20ftypheic') or file_content.startswith(b'\x00\x00\x00\x20ftypheix'):
mime_type = 'image/heic'
else:
mime_type = 'application/octet-stream'
return mime_type
@staticmethod
def needs_preprocessing(mime_type: str) -> bool:
"""Check if the file needs preprocessing"""
return mime_type not in {'image/png', 'image/jpeg', 'image/jpg'}
@staticmethod
def preprocess_image(
file_content: bytes,
filename: str,
target_format: str = 'PNG',
quality: int = 95
) -> Tuple[bytes, str, str]:
"""
Preprocess image and return processed content, new filename, and MIME type
Args:
file_content: Raw file content
filename: Original filename
target_format: Target format ('PNG' or 'JPEG')
quality: JPEG quality (1-100, only used for JPEG)
Returns:
Tuple of (processed_content, new_filename, mime_type)
"""
mime_type = ImagePreprocessor.detect_mime_type(file_content, filename)
if not ImagePreprocessor.needs_preprocessing(mime_type):
# No preprocessing needed
return file_content, filename, mime_type
try:
if mime_type == 'application/pdf':
return ImagePreprocessor._process_pdf(file_content, filename, target_format, quality)
elif mime_type in {'image/tiff', 'image/tif'}:
return ImagePreprocessor._process_tiff(file_content, filename, target_format, quality)
elif mime_type in {'image/heic', 'image/heif'}:
return ImagePreprocessor._process_heic(file_content, filename, target_format, quality)
elif mime_type == 'image/webp':
return ImagePreprocessor._process_webp(file_content, filename, target_format, quality)
elif mime_type == 'image/gif':
return ImagePreprocessor._process_gif(file_content, filename, target_format, quality)
else:
# Unsupported format, try to open with PIL as fallback
return ImagePreprocessor._process_generic(file_content, filename, target_format, quality)
except Exception as e:
raise ValueError(f"Failed to preprocess {mime_type} file: {str(e)}")
@staticmethod
def configure_pdf_processing(zoom_factor: float = 1.5, compress_level: int = 6, quality_mode: str = 'balanced'):
"""
Configure PDF processing performance settings
Args:
zoom_factor: PDF zoom factor (1.0 = original size, 2.0 = 2x size)
Lower values = faster processing, lower quality
Higher values = slower processing, higher quality
compress_level: PNG compression level (0-9)
Lower values = faster compression, larger files
Higher values = slower compression, smaller files
quality_mode: Processing mode ('fast', 'balanced', 'quality')
"""
if quality_mode == 'fast':
ImagePreprocessor.PDF_ZOOM_FACTOR = 1.0
ImagePreprocessor.PDF_COMPRESS_LEVEL = 3
elif quality_mode == 'quality':
ImagePreprocessor.PDF_ZOOM_FACTOR = 2.0
ImagePreprocessor.PDF_COMPRESS_LEVEL = 9
else: # balanced
ImagePreprocessor.PDF_ZOOM_FACTOR = zoom_factor
ImagePreprocessor.PDF_COMPRESS_LEVEL = compress_level
print(f"PDF processing configured: zoom={ImagePreprocessor.PDF_ZOOM_FACTOR}, "
f"compression={ImagePreprocessor.PDF_COMPRESS_LEVEL}, mode={quality_mode}")
@staticmethod
def _process_pdf(
file_content: bytes,
filename: str,
target_format: str,
quality: int
) -> Tuple[bytes, str, str]:
"""Process PDF files by rasterizing the first page"""
if not ImagePreprocessor.FITZ_AVAILABLE:
raise ValueError("PDF processing is not available. PyMuPDF is not installed.")
try:
print(f"Starting PDF processing for {filename}...")
# Open PDF with PyMuPDF
pdf_document = fitz.open(stream=file_content, filetype="pdf")
if len(pdf_document) == 0:
raise ValueError("PDF has no pages")
print(f"PDF opened successfully, processing page 1 of {len(pdf_document)}...")
# Get first page
page = pdf_document[0]
# Use configurable zoom factor for performance tuning
zoom = ImagePreprocessor.PDF_ZOOM_FACTOR
mat = fitz.Matrix(zoom, zoom)
print(f"Rendering page at {zoom}x zoom...")
# Render page to image with optimized settings
pix = page.get_pixmap(
matrix=mat,
alpha=False, # No alpha channel needed
colorspace="rgb" # Force RGB colorspace
)
print(f"Page rendered, size: {pix.width}x{pix.height}")
# Convert to PIL Image - use more efficient method
img_data = pix.tobytes("png")
img = Image.open(io.BytesIO(img_data))
# Convert to RGB if needed
if img.mode in ('RGBA', 'LA', 'P'):
img = img.convert('RGB')
print(f"Image converted to RGB, mode: {img.mode}")
# Save to bytes with optimization
output_buffer = io.BytesIO()
if target_format == 'PNG':
img.save(output_buffer, format='PNG', optimize=True, compress_level=ImagePreprocessor.PDF_COMPRESS_LEVEL)
new_mime_type = 'image/png'
new_extension = '.png'
else:
img.save(output_buffer, format='JPEG', quality=quality, optimize=True)
new_mime_type = 'image/jpeg'
new_extension = '.jpg'
# Clean up resources immediately
pdf_document.close()
del pix # Free memory
# Generate new filename
base_name = os.path.splitext(filename)[0]
new_filename = f"{base_name}{new_extension}"
print(f"PDF processing completed: {filename} -> {new_filename}")
return output_buffer.getvalue(), new_filename, new_mime_type
except Exception as e:
print(f"PDF processing failed: {str(e)}")
raise ValueError(f"Failed to process PDF: {str(e)}")
@staticmethod
def _process_tiff(
file_content: bytes,
filename: str,
target_format: str,
quality: int
) -> Tuple[bytes, str, str]:
"""Process TIFF/GeoTIFF files by rendering RGB view"""
try:
img = Image.open(io.BytesIO(file_content))
# Convert to RGB if needed
if img.mode in ('RGBA', 'LA', 'P', 'CMYK', 'LAB', 'HSV', 'I', 'F'):
img = img.convert('RGB')
# Save to bytes
output_buffer = io.BytesIO()
if target_format == 'PNG':
img.save(output_buffer, format='PNG', optimize=True)
new_mime_type = 'image/png'
new_extension = '.png'
else:
img.save(output_buffer, format='JPEG', quality=quality, optimize=True)
new_mime_type = 'image/jpeg'
new_extension = '.jpg'
# Generate new filename
base_name = os.path.splitext(filename)[0]
new_filename = f"{base_name}{new_extension}"
return output_buffer.getvalue(), new_filename, new_mime_type
except Exception as e:
raise ValueError(f"Failed to process TIFF: {str(e)}")
@staticmethod
def _process_heic(
file_content: bytes,
filename: str,
target_format: str,
quality: int
) -> Tuple[bytes, str, str]:
"""Process HEIC/HEIF files"""
try:
img = Image.open(io.BytesIO(file_content))
# Convert to RGB if needed
if img.mode in ('RGBA', 'LA', 'P'):
img = img.convert('RGB')
# Save to bytes
output_buffer = io.BytesIO()
if target_format == 'PNG':
img.save(output_buffer, format='PNG', optimize=True)
new_mime_type = 'image/png'
new_extension = '.png'
else:
img.save(output_buffer, format='JPEG', quality=quality, optimize=True)
new_mime_type = 'image/jpeg'
new_extension = '.jpg'
# Generate new filename
base_name = os.path.splitext(filename)[0]
new_filename = f"{base_name}{new_extension}"
return output_buffer.getvalue(), new_filename, new_mime_type
except Exception as e:
raise ValueError(f"Failed to process HEIC: {str(e)}")
@staticmethod
def _process_webp(
file_content: bytes,
filename: str,
target_format: str,
quality: int
) -> Tuple[bytes, str, str]:
"""Process WebP files"""
try:
img = Image.open(io.BytesIO(file_content))
# Convert to RGB if needed
if img.mode in ('RGBA', 'LA', 'P'):
img = img.convert('RGB')
# Save to bytes
output_buffer = io.BytesIO()
if target_format == 'PNG':
img.save(output_buffer, format='PNG', optimize=True)
new_mime_type = 'image/png'
new_extension = '.png'
else:
img.save(output_buffer, format='JPEG', quality=quality, optimize=True)
new_mime_type = 'image/jpeg'
new_extension = '.jpg'
# Generate new filename
base_name = os.path.splitext(filename)[0]
new_filename = f"{base_name}{new_extension}"
return output_buffer.getvalue(), new_filename, new_mime_type
except Exception as e:
raise ValueError(f"Failed to process WebP: {str(e)}")
@staticmethod
def _process_gif(
file_content: bytes,
filename: str,
target_format: str,
quality: int
) -> Tuple[bytes, str, str]:
"""Process GIF files (static only)"""
try:
img = Image.open(io.BytesIO(file_content))
# Check if GIF is animated
if hasattr(img, 'n_frames') and img.n_frames > 1:
# Take first frame for animated GIFs
img.seek(0)
# Convert to RGB if needed
if img.mode in ('RGBA', 'LA', 'P'):
img = img.convert('RGB')
# Save to bytes
output_buffer = io.BytesIO()
if target_format == 'PNG':
img.save(output_buffer, format='PNG', optimize=True)
new_mime_type = 'image/png'
new_extension = '.png'
else:
img.save(output_buffer, format='JPEG', quality=quality, optimize=True)
new_mime_type = 'image/jpeg'
new_extension = '.jpg'
# Generate new filename
base_name = os.path.splitext(filename)[0]
new_filename = f"{base_name}{new_extension}"
return output_buffer.getvalue(), new_filename, new_mime_type
except Exception as e:
raise ValueError(f"Failed to process GIF: {str(e)}")
@staticmethod
def _process_generic(
file_content: bytes,
filename: str,
target_format: str,
quality: int
) -> Tuple[bytes, str, str]:
"""Generic processing for other formats"""
try:
img = Image.open(io.BytesIO(file_content))
# Convert to RGB if needed
if img.mode in ('RGBA', 'LA', 'P', 'CMYK', 'LAB', 'HSV', 'I', 'F'):
img = img.convert('RGB')
# Save to bytes
output_buffer = io.BytesIO()
if target_format == 'PNG':
img.save(output_buffer, format='PNG', optimize=True)
new_mime_type = 'image/png'
new_extension = '.png'
else:
img.save(output_buffer, format='JPEG', quality=quality, optimize=True)
new_mime_type = 'image/jpeg'
new_extension = '.jpg'
# Generate new filename
base_name = os.path.splitext(filename)[0]
new_filename = f"{base_name}{new_extension}"
return output_buffer.getvalue(), new_filename, new_mime_type
except Exception as e:
raise ValueError(f"Failed to process generic format: {str(e)}")
|