Promptaid-VIsion / py_backend /app /services /image_preprocessor.py
SCGR's picture
image preprocess
000109f
import io
import mimetypes
from typing import Tuple, Optional, BinaryIO
from PIL import Image, ImageOps
# Import PyMuPDF for PDF processing
try:
import fitz # PyMuPDF for PDF processing
FITZ_AVAILABLE = True
except ImportError:
try:
from PyMuPDF import fitz # Alternative import method
FITZ_AVAILABLE = True
except ImportError:
try:
import PyMuPDF as fitz # Another alternative import
FITZ_AVAILABLE = True
except ImportError:
fitz = None # PDF processing will be disabled
FITZ_AVAILABLE = False
import tempfile
import os
class ImagePreprocessor:
"""Service for preprocessing various image formats before storage"""
# Configuration options for performance tuning
PDF_ZOOM_FACTOR = 1.5 # Reduce from 2.0 for better performance
PDF_COMPRESS_LEVEL = 6 # PNG compression level (0-9, higher = smaller but slower)
PDF_QUALITY_MODE = 'balanced' # 'fast', 'balanced', or 'quality'
# PyMuPDF availability flag
FITZ_AVAILABLE = FITZ_AVAILABLE
SUPPORTED_IMAGE_MIME_TYPES = {
'image/png',
'image/jpeg',
'image/jpg',
'image/heic',
'image/heif',
'image/webp',
'image/gif',
'image/tiff',
'image/tif',
'application/pdf'
}
@staticmethod
def detect_mime_type(file_content: bytes, filename: str) -> str:
"""Detect MIME type from file content and filename"""
# First try to detect from content
mime_type, _ = mimetypes.guess_type(filename)
# If no MIME type detected, try to infer from content
if not mime_type:
# Check for common file signatures
if file_content.startswith(b'\x89PNG\r\n\x1a\n'):
mime_type = 'image/png'
elif file_content.startswith(b'\xff\xd8\xff'):
mime_type = 'image/jpeg'
elif file_content.startswith(b'\x49\x49\x2a\x00') or file_content.startswith(b'\x4d\x4d\x00\x2a'):
mime_type = 'image/tiff'
elif file_content.startswith(b'%PDF'):
mime_type = 'application/pdf'
elif file_content.startswith(b'GIF87a') or file_content.startswith(b'GIF89a'):
mime_type = 'image/gif'
elif file_content.startswith(b'RIFF') and file_content[8:12] == b'WEBP':
mime_type = 'image/webp'
elif file_content.startswith(b'\x00\x00\x00\x20ftypheic') or file_content.startswith(b'\x00\x00\x00\x20ftypheix'):
mime_type = 'image/heic'
else:
mime_type = 'application/octet-stream'
return mime_type
@staticmethod
def needs_preprocessing(mime_type: str) -> bool:
"""Check if the file needs preprocessing"""
return mime_type not in {'image/png', 'image/jpeg', 'image/jpg'}
@staticmethod
def preprocess_image(
file_content: bytes,
filename: str,
target_format: str = 'PNG',
quality: int = 95
) -> Tuple[bytes, str, str]:
"""
Preprocess image and return processed content, new filename, and MIME type
Args:
file_content: Raw file content
filename: Original filename
target_format: Target format ('PNG' or 'JPEG')
quality: JPEG quality (1-100, only used for JPEG)
Returns:
Tuple of (processed_content, new_filename, mime_type)
"""
mime_type = ImagePreprocessor.detect_mime_type(file_content, filename)
if not ImagePreprocessor.needs_preprocessing(mime_type):
# No preprocessing needed
return file_content, filename, mime_type
try:
if mime_type == 'application/pdf':
return ImagePreprocessor._process_pdf(file_content, filename, target_format, quality)
elif mime_type in {'image/tiff', 'image/tif'}:
return ImagePreprocessor._process_tiff(file_content, filename, target_format, quality)
elif mime_type in {'image/heic', 'image/heif'}:
return ImagePreprocessor._process_heic(file_content, filename, target_format, quality)
elif mime_type == 'image/webp':
return ImagePreprocessor._process_webp(file_content, filename, target_format, quality)
elif mime_type == 'image/gif':
return ImagePreprocessor._process_gif(file_content, filename, target_format, quality)
else:
# Unsupported format, try to open with PIL as fallback
return ImagePreprocessor._process_generic(file_content, filename, target_format, quality)
except Exception as e:
raise ValueError(f"Failed to preprocess {mime_type} file: {str(e)}")
@staticmethod
def configure_pdf_processing(zoom_factor: float = 1.5, compress_level: int = 6, quality_mode: str = 'balanced'):
"""
Configure PDF processing performance settings
Args:
zoom_factor: PDF zoom factor (1.0 = original size, 2.0 = 2x size)
Lower values = faster processing, lower quality
Higher values = slower processing, higher quality
compress_level: PNG compression level (0-9)
Lower values = faster compression, larger files
Higher values = slower compression, smaller files
quality_mode: Processing mode ('fast', 'balanced', 'quality')
"""
if quality_mode == 'fast':
ImagePreprocessor.PDF_ZOOM_FACTOR = 1.0
ImagePreprocessor.PDF_COMPRESS_LEVEL = 3
elif quality_mode == 'quality':
ImagePreprocessor.PDF_ZOOM_FACTOR = 2.0
ImagePreprocessor.PDF_COMPRESS_LEVEL = 9
else: # balanced
ImagePreprocessor.PDF_ZOOM_FACTOR = zoom_factor
ImagePreprocessor.PDF_COMPRESS_LEVEL = compress_level
print(f"PDF processing configured: zoom={ImagePreprocessor.PDF_ZOOM_FACTOR}, "
f"compression={ImagePreprocessor.PDF_COMPRESS_LEVEL}, mode={quality_mode}")
@staticmethod
def _process_pdf(
file_content: bytes,
filename: str,
target_format: str,
quality: int
) -> Tuple[bytes, str, str]:
"""Process PDF files by rasterizing the first page"""
if not ImagePreprocessor.FITZ_AVAILABLE:
raise ValueError("PDF processing is not available. PyMuPDF is not installed.")
try:
print(f"Starting PDF processing for {filename}...")
# Open PDF with PyMuPDF
pdf_document = fitz.open(stream=file_content, filetype="pdf")
if len(pdf_document) == 0:
raise ValueError("PDF has no pages")
print(f"PDF opened successfully, processing page 1 of {len(pdf_document)}...")
# Get first page
page = pdf_document[0]
# Use configurable zoom factor for performance tuning
zoom = ImagePreprocessor.PDF_ZOOM_FACTOR
mat = fitz.Matrix(zoom, zoom)
print(f"Rendering page at {zoom}x zoom...")
# Render page to image with optimized settings
pix = page.get_pixmap(
matrix=mat,
alpha=False, # No alpha channel needed
colorspace="rgb" # Force RGB colorspace
)
print(f"Page rendered, size: {pix.width}x{pix.height}")
# Convert to PIL Image - use more efficient method
img_data = pix.tobytes("png")
img = Image.open(io.BytesIO(img_data))
# Convert to RGB if needed
if img.mode in ('RGBA', 'LA', 'P'):
img = img.convert('RGB')
print(f"Image converted to RGB, mode: {img.mode}")
# Save to bytes with optimization
output_buffer = io.BytesIO()
if target_format == 'PNG':
img.save(output_buffer, format='PNG', optimize=True, compress_level=ImagePreprocessor.PDF_COMPRESS_LEVEL)
new_mime_type = 'image/png'
new_extension = '.png'
else:
img.save(output_buffer, format='JPEG', quality=quality, optimize=True)
new_mime_type = 'image/jpeg'
new_extension = '.jpg'
# Clean up resources immediately
pdf_document.close()
del pix # Free memory
# Generate new filename
base_name = os.path.splitext(filename)[0]
new_filename = f"{base_name}{new_extension}"
print(f"PDF processing completed: {filename} -> {new_filename}")
return output_buffer.getvalue(), new_filename, new_mime_type
except Exception as e:
print(f"PDF processing failed: {str(e)}")
raise ValueError(f"Failed to process PDF: {str(e)}")
@staticmethod
def _process_tiff(
file_content: bytes,
filename: str,
target_format: str,
quality: int
) -> Tuple[bytes, str, str]:
"""Process TIFF/GeoTIFF files by rendering RGB view"""
try:
img = Image.open(io.BytesIO(file_content))
# Convert to RGB if needed
if img.mode in ('RGBA', 'LA', 'P', 'CMYK', 'LAB', 'HSV', 'I', 'F'):
img = img.convert('RGB')
# Save to bytes
output_buffer = io.BytesIO()
if target_format == 'PNG':
img.save(output_buffer, format='PNG', optimize=True)
new_mime_type = 'image/png'
new_extension = '.png'
else:
img.save(output_buffer, format='JPEG', quality=quality, optimize=True)
new_mime_type = 'image/jpeg'
new_extension = '.jpg'
# Generate new filename
base_name = os.path.splitext(filename)[0]
new_filename = f"{base_name}{new_extension}"
return output_buffer.getvalue(), new_filename, new_mime_type
except Exception as e:
raise ValueError(f"Failed to process TIFF: {str(e)}")
@staticmethod
def _process_heic(
file_content: bytes,
filename: str,
target_format: str,
quality: int
) -> Tuple[bytes, str, str]:
"""Process HEIC/HEIF files"""
try:
img = Image.open(io.BytesIO(file_content))
# Convert to RGB if needed
if img.mode in ('RGBA', 'LA', 'P'):
img = img.convert('RGB')
# Save to bytes
output_buffer = io.BytesIO()
if target_format == 'PNG':
img.save(output_buffer, format='PNG', optimize=True)
new_mime_type = 'image/png'
new_extension = '.png'
else:
img.save(output_buffer, format='JPEG', quality=quality, optimize=True)
new_mime_type = 'image/jpeg'
new_extension = '.jpg'
# Generate new filename
base_name = os.path.splitext(filename)[0]
new_filename = f"{base_name}{new_extension}"
return output_buffer.getvalue(), new_filename, new_mime_type
except Exception as e:
raise ValueError(f"Failed to process HEIC: {str(e)}")
@staticmethod
def _process_webp(
file_content: bytes,
filename: str,
target_format: str,
quality: int
) -> Tuple[bytes, str, str]:
"""Process WebP files"""
try:
img = Image.open(io.BytesIO(file_content))
# Convert to RGB if needed
if img.mode in ('RGBA', 'LA', 'P'):
img = img.convert('RGB')
# Save to bytes
output_buffer = io.BytesIO()
if target_format == 'PNG':
img.save(output_buffer, format='PNG', optimize=True)
new_mime_type = 'image/png'
new_extension = '.png'
else:
img.save(output_buffer, format='JPEG', quality=quality, optimize=True)
new_mime_type = 'image/jpeg'
new_extension = '.jpg'
# Generate new filename
base_name = os.path.splitext(filename)[0]
new_filename = f"{base_name}{new_extension}"
return output_buffer.getvalue(), new_filename, new_mime_type
except Exception as e:
raise ValueError(f"Failed to process WebP: {str(e)}")
@staticmethod
def _process_gif(
file_content: bytes,
filename: str,
target_format: str,
quality: int
) -> Tuple[bytes, str, str]:
"""Process GIF files (static only)"""
try:
img = Image.open(io.BytesIO(file_content))
# Check if GIF is animated
if hasattr(img, 'n_frames') and img.n_frames > 1:
# Take first frame for animated GIFs
img.seek(0)
# Convert to RGB if needed
if img.mode in ('RGBA', 'LA', 'P'):
img = img.convert('RGB')
# Save to bytes
output_buffer = io.BytesIO()
if target_format == 'PNG':
img.save(output_buffer, format='PNG', optimize=True)
new_mime_type = 'image/png'
new_extension = '.png'
else:
img.save(output_buffer, format='JPEG', quality=quality, optimize=True)
new_mime_type = 'image/jpeg'
new_extension = '.jpg'
# Generate new filename
base_name = os.path.splitext(filename)[0]
new_filename = f"{base_name}{new_extension}"
return output_buffer.getvalue(), new_filename, new_mime_type
except Exception as e:
raise ValueError(f"Failed to process GIF: {str(e)}")
@staticmethod
def _process_generic(
file_content: bytes,
filename: str,
target_format: str,
quality: int
) -> Tuple[bytes, str, str]:
"""Generic processing for other formats"""
try:
img = Image.open(io.BytesIO(file_content))
# Convert to RGB if needed
if img.mode in ('RGBA', 'LA', 'P', 'CMYK', 'LAB', 'HSV', 'I', 'F'):
img = img.convert('RGB')
# Save to bytes
output_buffer = io.BytesIO()
if target_format == 'PNG':
img.save(output_buffer, format='PNG', optimize=True)
new_mime_type = 'image/png'
new_extension = '.png'
else:
img.save(output_buffer, format='JPEG', quality=quality, optimize=True)
new_mime_type = 'image/jpeg'
new_extension = '.jpg'
# Generate new filename
base_name = os.path.splitext(filename)[0]
new_filename = f"{base_name}{new_extension}"
return output_buffer.getvalue(), new_filename, new_mime_type
except Exception as e:
raise ValueError(f"Failed to process generic format: {str(e)}")