Spaces:
Running
on
Zero
Running
on
Zero
# ---------------------------------------------------------------------- | |
# IMPORTS | |
# ---------------------------------------------------------------------- | |
import io | |
import json | |
import re | |
import traceback | |
import time | |
import logging | |
import os | |
from typing import Dict, List, Optional, Tuple, Any, Callable, Union | |
from PIL import Image | |
from pydantic import BaseModel, Field | |
import torch | |
import numpy as np | |
# Import from logging_utils | |
try: | |
from .logging_utils import get_system_info, cleanup_memory, LOG_LEVEL_MAP, EMOJI_MAP | |
except ImportError: | |
# Try alternative import paths | |
try: | |
from logging_utils import get_system_info, cleanup_memory, LOG_LEVEL_MAP, EMOJI_MAP | |
except ImportError: | |
# Fallback implementations | |
def get_system_info(): | |
return {} | |
def cleanup_memory(): | |
pass | |
LOG_LEVEL_MAP = {} | |
EMOJI_MAP = {} | |
# ---------------------------------------------------------------------- | |
# CUSTOM EXCEPTIONS | |
# ---------------------------------------------------------------------- | |
class ModelNotLoadedException(Exception): | |
pass | |
class PipelineExecutionError(Exception): | |
def __init__(self, message: str, step: Optional[str] = None): | |
self.message = message | |
self.step = step | |
super().__init__(self.message) | |
class ConfigurationError(Exception): | |
pass | |
class ModelInferenceError(Exception): | |
pass | |
class ImageProcessingError(Exception): | |
pass | |
# ---------------------------------------------------------------------- | |
# REQUEST/RESPONSE MODELS | |
# ---------------------------------------------------------------------- | |
class DetectRequest(BaseModel): | |
data: List[Any] | |
options: Optional[Dict[str, Any]] = Field(default_factory=dict) | |
class ProcessingError(BaseModel): | |
type: str | |
message: str | |
step: str | |
traceback: Optional[str] = None | |
class ProcessingWarning(BaseModel): | |
type: str | |
message: str | |
step: str | |
class ProcessedImage(BaseModel): | |
url: str | |
status: str | |
base64_image: Optional[str] = None | |
color: Optional[str] = None | |
image_type: Optional[str] = None | |
artifacts: Optional[str] = None | |
processing_time: Optional[float] = None | |
detections: Optional[Dict[str, int]] = None | |
class ProcessingResponse(BaseModel): | |
processed_images: List[ProcessedImage] | |
status: Optional[str] = None | |
warnings: Optional[List[ProcessingWarning]] = None | |
total_processing_time: Optional[float] = None | |
system_info: Optional[Dict[str, Any]] = None | |
# ---------------------------------------------------------------------- | |
# DETECTION RESULT MODELS | |
# ---------------------------------------------------------------------- | |
class BoundingBox(BaseModel): | |
x1: int | |
y1: int | |
x2: int | |
y2: int | |
def to_list(self) -> List[int]: | |
return [self.x1, self.y1, self.x2, self.y2] | |
def area(self) -> int: | |
return (self.x2 - self.x1) * (self.y2 - self.y1) | |
def center(self) -> Tuple[float, float]: | |
return ((self.x1 + self.x2) / 2, (self.y1 + self.y2) / 2) | |
class Detection(BaseModel): | |
box: BoundingBox | |
label: str | |
score: float | |
model: str | |
keyword: Optional[str] = None | |
# ---------------------------------------------------------------------- | |
# PROCESSING CONTEXT CLASS | |
# ---------------------------------------------------------------------- | |
class ProcessingContext: | |
def __init__(self, url: str, product_type: str, keywords: List[str]): | |
self.url = url | |
self.product_type = product_type | |
self.keywords = keywords | |
self.skip_run = False | |
self.skip_processing = False | |
self.filename: Optional[str] = None | |
self.final_base64: Optional[str] = None | |
self.pil_img: Dict[str, Any] = {} | |
self.define_result: Dict[str, Any] = {} | |
self.detection_result: Dict[str, Any] = {} | |
self.grounding_dino_result: Dict[str, Any] = {} | |
self.box_colors: List[Tuple[int, int, int, int]] = [] | |
self.adjusted_blue_box: Optional[Tuple[int, int, int, int]] = None | |
self.final_color: str = "none" | |
self.final_image_type: str = "none" | |
self.pad_info = {"left": 0, "right": 0, "top": 0, "bottom": 0} | |
# Performance tracking | |
self.timing: Dict[str, float] = {} | |
self.memory_usage: Dict[str, Dict[str, float]] = {} | |
def add_timing(self, step: str, duration: float): | |
self.timing[step] = duration | |
def add_memory_usage(self, step: str): | |
self.memory_usage[step] = get_system_info() | |
# ---------------------------------------------------------------------- | |
# PIPELINE DECORATOR | |
# ---------------------------------------------------------------------- | |
def create_pipeline_step(ensure_models_loaded_func: Callable) -> Callable: | |
def pipeline_step(func: Callable) -> Callable: | |
def wrapper(contexts: List[ProcessingContext], batch_logs: Optional[List[Dict]] = None) -> Any: | |
if batch_logs is None: | |
batch_logs = [] | |
# Only load models if not already loaded | |
# The ensure_models_loaded_func should internally check if models are loaded | |
ensure_models_loaded_func() | |
start_time = time.time() | |
try: | |
# Memory cleanup before processing | |
if len(contexts) > 10: # For large batches | |
cleanup_memory() | |
result = func(contexts, batch_logs) | |
processing_time = round(time.time() - start_time, 3) | |
processed_count = sum( | |
not context.skip_run and not context.skip_processing | |
for context in contexts | |
) | |
log_data = { | |
"function_name": func.__name__, | |
"spent_time_seconds": processing_time, | |
"processed_image_count": processed_count, | |
"batch_log": batch_logs, | |
"system_info": get_system_info() | |
} | |
log_content = custom_dumps(log_data) | |
print(log_content, flush=True) | |
# Memory cleanup after processing large batches | |
if processed_count > 10: | |
cleanup_memory() | |
return result | |
except Exception as e: | |
error_trace = traceback.format_exc() | |
processing_time = round(time.time() - start_time, 3) | |
logging.error(f"Error in {func.__name__}: {str(e)}") | |
error_log = { | |
"function_name": func.__name__, | |
"spent_time_seconds": processing_time, | |
"error": str(e), | |
"error_type": type(e).__name__, | |
"traceback": error_trace, | |
"system_info": get_system_info() | |
} | |
log_content = custom_dumps(error_log) | |
print(log_content, flush=True) | |
for context in contexts: | |
context.skip_run = True | |
batch_logs.append({ | |
"function": func.__name__, | |
"status": "error", | |
"error": str(e), | |
"error_type": type(e).__name__ | |
}) | |
# Cleanup on error - but skip if CUDA initialization error | |
if "CUDA must not be initialized" not in str(e): | |
try: | |
cleanup_memory() | |
except Exception: | |
pass # Ignore cleanup errors | |
raise | |
return wrapper | |
return pipeline_step | |
# ---------------------------------------------------------------------- | |
# IMAGE UTILITIES | |
# ---------------------------------------------------------------------- | |
def validate_image(image: Union[Image.Image, np.ndarray]) -> bool: | |
if isinstance(image, Image.Image): | |
return image.size[0] > 0 and image.size[1] > 0 | |
elif isinstance(image, np.ndarray): | |
return image.shape[0] > 0 and image.shape[1] > 0 | |
return False | |
def resize_image_aspect_ratio(image: Image.Image, max_size: int = 1920) -> Image.Image: | |
width, height = image.size | |
if width > max_size or height > max_size: | |
if width > height: | |
new_width = max_size | |
new_height = int(height * (max_size / width)) | |
else: | |
new_height = max_size | |
new_width = int(width * (max_size / height)) | |
return image.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
return image | |
# ---------------------------------------------------------------------- | |
# JSON FORMATTING UTILITIES | |
# ---------------------------------------------------------------------- | |
def custom_dumps(data: Any) -> str: | |
def default_handler(obj): | |
if isinstance(obj, (BoundingBox, Detection)): | |
return obj.model_dump() | |
elif isinstance(obj, np.ndarray): | |
return obj.tolist() | |
elif torch.is_tensor(obj): | |
return obj.cpu().numpy().tolist() | |
elif isinstance(obj, (np.integer, np.floating)): | |
return obj.item() | |
elif hasattr(obj, '__dict__'): | |
return obj.__dict__ | |
else: | |
return str(obj) | |
text = json.dumps(data, indent=2, default=default_handler) | |
# Format bounding boxes on single lines | |
box_pattern = re.compile( | |
r'\[\s*(-?\d+(?:\.\d+)?),\s*(-?\d+(?:\.\d+)?),\s*(-?\d+(?:\.\d+)?),\s*(-?\d+(?:\.\d+)?)(?:,\s*)?\s*\]', | |
re.MULTILINE | |
) | |
text = box_pattern.sub(r'[\1, \2, \3, \4]', text) | |
return text | |
# ---------------------------------------------------------------------- | |
# ERROR RECOVERY UTILITIES | |
# ---------------------------------------------------------------------- | |
def safe_model_inference(model_func: Callable, *args, **kwargs) -> Any: | |
max_retries = kwargs.pop('max_retries', 3) | |
retry_delay = kwargs.pop('retry_delay', 1.0) | |
for attempt in range(max_retries): | |
try: | |
return model_func(*args, **kwargs) | |
except (torch.cuda.OutOfMemoryError, RuntimeError) as e: | |
if "out of memory" in str(e).lower(): | |
logging.warning(f"GPU OOM on attempt {attempt + 1}, cleaning memory...") | |
cleanup_memory() | |
if attempt < max_retries - 1: | |
time.sleep(retry_delay) | |
continue | |
raise ModelInferenceError(f"Model inference failed: {str(e)}") | |
except Exception as e: | |
if attempt < max_retries - 1: | |
logging.warning(f"Model inference attempt {attempt + 1} failed: {str(e)}") | |
time.sleep(retry_delay) | |
continue | |
raise ModelInferenceError(f"Model inference failed after {max_retries} attempts: {str(e)}") | |
raise ModelInferenceError("Model inference failed: max retries exceeded") |