Spaces:
Running
Running
import io | |
import mimetypes | |
from typing import Tuple, Optional, BinaryIO | |
from PIL import Image, ImageOps | |
# Import PyMuPDF for PDF processing | |
try: | |
import fitz # PyMuPDF for PDF processing | |
FITZ_AVAILABLE = True | |
except ImportError: | |
try: | |
from PyMuPDF import fitz # Alternative import method | |
FITZ_AVAILABLE = True | |
except ImportError: | |
try: | |
import PyMuPDF as fitz # Another alternative import | |
FITZ_AVAILABLE = True | |
except ImportError: | |
fitz = None # PDF processing will be disabled | |
FITZ_AVAILABLE = False | |
import tempfile | |
import os | |
class ImagePreprocessor: | |
"""Service for preprocessing various image formats before storage""" | |
# Configuration options for performance tuning | |
PDF_ZOOM_FACTOR = 1.5 # Reduce from 2.0 for better performance | |
PDF_COMPRESS_LEVEL = 6 # PNG compression level (0-9, higher = smaller but slower) | |
PDF_QUALITY_MODE = 'balanced' # 'fast', 'balanced', or 'quality' | |
# PyMuPDF availability flag | |
FITZ_AVAILABLE = FITZ_AVAILABLE | |
SUPPORTED_IMAGE_MIME_TYPES = { | |
'image/png', | |
'image/jpeg', | |
'image/jpg', | |
'image/heic', | |
'image/heif', | |
'image/webp', | |
'image/gif', | |
'image/tiff', | |
'image/tif', | |
'application/pdf' | |
} | |
def detect_mime_type(file_content: bytes, filename: str) -> str: | |
"""Detect MIME type from file content and filename""" | |
# First try to detect from content | |
mime_type, _ = mimetypes.guess_type(filename) | |
# If no MIME type detected, try to infer from content | |
if not mime_type: | |
# Check for common file signatures | |
if file_content.startswith(b'\x89PNG\r\n\x1a\n'): | |
mime_type = 'image/png' | |
elif file_content.startswith(b'\xff\xd8\xff'): | |
mime_type = 'image/jpeg' | |
elif file_content.startswith(b'\x49\x49\x2a\x00') or file_content.startswith(b'\x4d\x4d\x00\x2a'): | |
mime_type = 'image/tiff' | |
elif file_content.startswith(b'%PDF'): | |
mime_type = 'application/pdf' | |
elif file_content.startswith(b'GIF87a') or file_content.startswith(b'GIF89a'): | |
mime_type = 'image/gif' | |
elif file_content.startswith(b'RIFF') and file_content[8:12] == b'WEBP': | |
mime_type = 'image/webp' | |
elif file_content.startswith(b'\x00\x00\x00\x20ftypheic') or file_content.startswith(b'\x00\x00\x00\x20ftypheix'): | |
mime_type = 'image/heic' | |
else: | |
mime_type = 'application/octet-stream' | |
return mime_type | |
def needs_preprocessing(mime_type: str) -> bool: | |
"""Check if the file needs preprocessing""" | |
return mime_type not in {'image/png', 'image/jpeg', 'image/jpg'} | |
def preprocess_image( | |
file_content: bytes, | |
filename: str, | |
target_format: str = 'PNG', | |
quality: int = 95 | |
) -> Tuple[bytes, str, str]: | |
""" | |
Preprocess image and return processed content, new filename, and MIME type | |
Args: | |
file_content: Raw file content | |
filename: Original filename | |
target_format: Target format ('PNG' or 'JPEG') | |
quality: JPEG quality (1-100, only used for JPEG) | |
Returns: | |
Tuple of (processed_content, new_filename, mime_type) | |
""" | |
mime_type = ImagePreprocessor.detect_mime_type(file_content, filename) | |
if not ImagePreprocessor.needs_preprocessing(mime_type): | |
# No preprocessing needed | |
return file_content, filename, mime_type | |
try: | |
if mime_type == 'application/pdf': | |
return ImagePreprocessor._process_pdf(file_content, filename, target_format, quality) | |
elif mime_type in {'image/tiff', 'image/tif'}: | |
return ImagePreprocessor._process_tiff(file_content, filename, target_format, quality) | |
elif mime_type in {'image/heic', 'image/heif'}: | |
return ImagePreprocessor._process_heic(file_content, filename, target_format, quality) | |
elif mime_type == 'image/webp': | |
return ImagePreprocessor._process_webp(file_content, filename, target_format, quality) | |
elif mime_type == 'image/gif': | |
return ImagePreprocessor._process_gif(file_content, filename, target_format, quality) | |
else: | |
# Unsupported format, try to open with PIL as fallback | |
return ImagePreprocessor._process_generic(file_content, filename, target_format, quality) | |
except Exception as e: | |
raise ValueError(f"Failed to preprocess {mime_type} file: {str(e)}") | |
def configure_pdf_processing(zoom_factor: float = 1.5, compress_level: int = 6, quality_mode: str = 'balanced'): | |
""" | |
Configure PDF processing performance settings | |
Args: | |
zoom_factor: PDF zoom factor (1.0 = original size, 2.0 = 2x size) | |
Lower values = faster processing, lower quality | |
Higher values = slower processing, higher quality | |
compress_level: PNG compression level (0-9) | |
Lower values = faster compression, larger files | |
Higher values = slower compression, smaller files | |
quality_mode: Processing mode ('fast', 'balanced', 'quality') | |
""" | |
if quality_mode == 'fast': | |
ImagePreprocessor.PDF_ZOOM_FACTOR = 1.0 | |
ImagePreprocessor.PDF_COMPRESS_LEVEL = 3 | |
elif quality_mode == 'quality': | |
ImagePreprocessor.PDF_ZOOM_FACTOR = 2.0 | |
ImagePreprocessor.PDF_COMPRESS_LEVEL = 9 | |
else: # balanced | |
ImagePreprocessor.PDF_ZOOM_FACTOR = zoom_factor | |
ImagePreprocessor.PDF_COMPRESS_LEVEL = compress_level | |
print(f"PDF processing configured: zoom={ImagePreprocessor.PDF_ZOOM_FACTOR}, " | |
f"compression={ImagePreprocessor.PDF_COMPRESS_LEVEL}, mode={quality_mode}") | |
def _process_pdf( | |
file_content: bytes, | |
filename: str, | |
target_format: str, | |
quality: int | |
) -> Tuple[bytes, str, str]: | |
"""Process PDF files by rasterizing the first page""" | |
if not ImagePreprocessor.FITZ_AVAILABLE: | |
raise ValueError("PDF processing is not available. PyMuPDF is not installed.") | |
try: | |
print(f"Starting PDF processing for {filename}...") | |
# Open PDF with PyMuPDF | |
pdf_document = fitz.open(stream=file_content, filetype="pdf") | |
if len(pdf_document) == 0: | |
raise ValueError("PDF has no pages") | |
print(f"PDF opened successfully, processing page 1 of {len(pdf_document)}...") | |
# Get first page | |
page = pdf_document[0] | |
# Use configurable zoom factor for performance tuning | |
zoom = ImagePreprocessor.PDF_ZOOM_FACTOR | |
mat = fitz.Matrix(zoom, zoom) | |
print(f"Rendering page at {zoom}x zoom...") | |
# Render page to image with optimized settings | |
pix = page.get_pixmap( | |
matrix=mat, | |
alpha=False, # No alpha channel needed | |
colorspace="rgb" # Force RGB colorspace | |
) | |
print(f"Page rendered, size: {pix.width}x{pix.height}") | |
# Convert to PIL Image - use more efficient method | |
img_data = pix.tobytes("png") | |
img = Image.open(io.BytesIO(img_data)) | |
# Convert to RGB if needed | |
if img.mode in ('RGBA', 'LA', 'P'): | |
img = img.convert('RGB') | |
print(f"Image converted to RGB, mode: {img.mode}") | |
# Save to bytes with optimization | |
output_buffer = io.BytesIO() | |
if target_format == 'PNG': | |
img.save(output_buffer, format='PNG', optimize=True, compress_level=ImagePreprocessor.PDF_COMPRESS_LEVEL) | |
new_mime_type = 'image/png' | |
new_extension = '.png' | |
else: | |
img.save(output_buffer, format='JPEG', quality=quality, optimize=True) | |
new_mime_type = 'image/jpeg' | |
new_extension = '.jpg' | |
# Clean up resources immediately | |
pdf_document.close() | |
del pix # Free memory | |
# Generate new filename | |
base_name = os.path.splitext(filename)[0] | |
new_filename = f"{base_name}{new_extension}" | |
print(f"PDF processing completed: {filename} -> {new_filename}") | |
return output_buffer.getvalue(), new_filename, new_mime_type | |
except Exception as e: | |
print(f"PDF processing failed: {str(e)}") | |
raise ValueError(f"Failed to process PDF: {str(e)}") | |
def _process_tiff( | |
file_content: bytes, | |
filename: str, | |
target_format: str, | |
quality: int | |
) -> Tuple[bytes, str, str]: | |
"""Process TIFF/GeoTIFF files by rendering RGB view""" | |
try: | |
img = Image.open(io.BytesIO(file_content)) | |
# Convert to RGB if needed | |
if img.mode in ('RGBA', 'LA', 'P', 'CMYK', 'LAB', 'HSV', 'I', 'F'): | |
img = img.convert('RGB') | |
# Save to bytes | |
output_buffer = io.BytesIO() | |
if target_format == 'PNG': | |
img.save(output_buffer, format='PNG', optimize=True) | |
new_mime_type = 'image/png' | |
new_extension = '.png' | |
else: | |
img.save(output_buffer, format='JPEG', quality=quality, optimize=True) | |
new_mime_type = 'image/jpeg' | |
new_extension = '.jpg' | |
# Generate new filename | |
base_name = os.path.splitext(filename)[0] | |
new_filename = f"{base_name}{new_extension}" | |
return output_buffer.getvalue(), new_filename, new_mime_type | |
except Exception as e: | |
raise ValueError(f"Failed to process TIFF: {str(e)}") | |
def _process_heic( | |
file_content: bytes, | |
filename: str, | |
target_format: str, | |
quality: int | |
) -> Tuple[bytes, str, str]: | |
"""Process HEIC/HEIF files""" | |
try: | |
img = Image.open(io.BytesIO(file_content)) | |
# Convert to RGB if needed | |
if img.mode in ('RGBA', 'LA', 'P'): | |
img = img.convert('RGB') | |
# Save to bytes | |
output_buffer = io.BytesIO() | |
if target_format == 'PNG': | |
img.save(output_buffer, format='PNG', optimize=True) | |
new_mime_type = 'image/png' | |
new_extension = '.png' | |
else: | |
img.save(output_buffer, format='JPEG', quality=quality, optimize=True) | |
new_mime_type = 'image/jpeg' | |
new_extension = '.jpg' | |
# Generate new filename | |
base_name = os.path.splitext(filename)[0] | |
new_filename = f"{base_name}{new_extension}" | |
return output_buffer.getvalue(), new_filename, new_mime_type | |
except Exception as e: | |
raise ValueError(f"Failed to process HEIC: {str(e)}") | |
def _process_webp( | |
file_content: bytes, | |
filename: str, | |
target_format: str, | |
quality: int | |
) -> Tuple[bytes, str, str]: | |
"""Process WebP files""" | |
try: | |
img = Image.open(io.BytesIO(file_content)) | |
# Convert to RGB if needed | |
if img.mode in ('RGBA', 'LA', 'P'): | |
img = img.convert('RGB') | |
# Save to bytes | |
output_buffer = io.BytesIO() | |
if target_format == 'PNG': | |
img.save(output_buffer, format='PNG', optimize=True) | |
new_mime_type = 'image/png' | |
new_extension = '.png' | |
else: | |
img.save(output_buffer, format='JPEG', quality=quality, optimize=True) | |
new_mime_type = 'image/jpeg' | |
new_extension = '.jpg' | |
# Generate new filename | |
base_name = os.path.splitext(filename)[0] | |
new_filename = f"{base_name}{new_extension}" | |
return output_buffer.getvalue(), new_filename, new_mime_type | |
except Exception as e: | |
raise ValueError(f"Failed to process WebP: {str(e)}") | |
def _process_gif( | |
file_content: bytes, | |
filename: str, | |
target_format: str, | |
quality: int | |
) -> Tuple[bytes, str, str]: | |
"""Process GIF files (static only)""" | |
try: | |
img = Image.open(io.BytesIO(file_content)) | |
# Check if GIF is animated | |
if hasattr(img, 'n_frames') and img.n_frames > 1: | |
# Take first frame for animated GIFs | |
img.seek(0) | |
# Convert to RGB if needed | |
if img.mode in ('RGBA', 'LA', 'P'): | |
img = img.convert('RGB') | |
# Save to bytes | |
output_buffer = io.BytesIO() | |
if target_format == 'PNG': | |
img.save(output_buffer, format='PNG', optimize=True) | |
new_mime_type = 'image/png' | |
new_extension = '.png' | |
else: | |
img.save(output_buffer, format='JPEG', quality=quality, optimize=True) | |
new_mime_type = 'image/jpeg' | |
new_extension = '.jpg' | |
# Generate new filename | |
base_name = os.path.splitext(filename)[0] | |
new_filename = f"{base_name}{new_extension}" | |
return output_buffer.getvalue(), new_filename, new_mime_type | |
except Exception as e: | |
raise ValueError(f"Failed to process GIF: {str(e)}") | |
def _process_generic( | |
file_content: bytes, | |
filename: str, | |
target_format: str, | |
quality: int | |
) -> Tuple[bytes, str, str]: | |
"""Generic processing for other formats""" | |
try: | |
img = Image.open(io.BytesIO(file_content)) | |
# Convert to RGB if needed | |
if img.mode in ('RGBA', 'LA', 'P', 'CMYK', 'LAB', 'HSV', 'I', 'F'): | |
img = img.convert('RGB') | |
# Save to bytes | |
output_buffer = io.BytesIO() | |
if target_format == 'PNG': | |
img.save(output_buffer, format='PNG', optimize=True) | |
new_mime_type = 'image/png' | |
new_extension = '.png' | |
else: | |
img.save(output_buffer, format='JPEG', quality=quality, optimize=True) | |
new_mime_type = 'image/jpeg' | |
new_extension = '.jpg' | |
# Generate new filename | |
base_name = os.path.splitext(filename)[0] | |
new_filename = f"{base_name}{new_extension}" | |
return output_buffer.getvalue(), new_filename, new_mime_type | |
except Exception as e: | |
raise ValueError(f"Failed to process generic format: {str(e)}") | |