# ---------------------------------------------------------------------- # IMPORTS # ---------------------------------------------------------------------- import io import json import re import traceback import time import logging import os from typing import Dict, List, Optional, Tuple, Any, Callable, Union from PIL import Image from pydantic import BaseModel, Field import torch import numpy as np # Import from logging_utils try: from .logging_utils import get_system_info, cleanup_memory, LOG_LEVEL_MAP, EMOJI_MAP except ImportError: # Try alternative import paths try: from logging_utils import get_system_info, cleanup_memory, LOG_LEVEL_MAP, EMOJI_MAP except ImportError: # Fallback implementations def get_system_info(): return {} def cleanup_memory(): pass LOG_LEVEL_MAP = {} EMOJI_MAP = {} # ---------------------------------------------------------------------- # CUSTOM EXCEPTIONS # ---------------------------------------------------------------------- class ModelNotLoadedException(Exception): pass class PipelineExecutionError(Exception): def __init__(self, message: str, step: Optional[str] = None): self.message = message self.step = step super().__init__(self.message) class ConfigurationError(Exception): pass class ModelInferenceError(Exception): pass class ImageProcessingError(Exception): pass # ---------------------------------------------------------------------- # REQUEST/RESPONSE MODELS # ---------------------------------------------------------------------- class DetectRequest(BaseModel): data: List[Any] options: Optional[Dict[str, Any]] = Field(default_factory=dict) class ProcessingError(BaseModel): type: str message: str step: str traceback: Optional[str] = None class ProcessingWarning(BaseModel): type: str message: str step: str class ProcessedImage(BaseModel): url: str status: str base64_image: Optional[str] = None color: Optional[str] = None image_type: Optional[str] = None artifacts: Optional[str] = None processing_time: Optional[float] = None detections: Optional[Dict[str, int]] = None class ProcessingResponse(BaseModel): processed_images: List[ProcessedImage] status: Optional[str] = None warnings: Optional[List[ProcessingWarning]] = None total_processing_time: Optional[float] = None system_info: Optional[Dict[str, Any]] = None # ---------------------------------------------------------------------- # DETECTION RESULT MODELS # ---------------------------------------------------------------------- class BoundingBox(BaseModel): x1: int y1: int x2: int y2: int def to_list(self) -> List[int]: return [self.x1, self.y1, self.x2, self.y2] def area(self) -> int: return (self.x2 - self.x1) * (self.y2 - self.y1) def center(self) -> Tuple[float, float]: return ((self.x1 + self.x2) / 2, (self.y1 + self.y2) / 2) class Detection(BaseModel): box: BoundingBox label: str score: float model: str keyword: Optional[str] = None # ---------------------------------------------------------------------- # PROCESSING CONTEXT CLASS # ---------------------------------------------------------------------- class ProcessingContext: def __init__(self, url: str, product_type: str, keywords: List[str]): self.url = url self.product_type = product_type self.keywords = keywords self.skip_run = False self.skip_processing = False self.filename: Optional[str] = None self.final_base64: Optional[str] = None self.pil_img: Dict[str, Any] = {} self.define_result: Dict[str, Any] = {} self.detection_result: Dict[str, Any] = {} self.grounding_dino_result: Dict[str, Any] = {} self.box_colors: List[Tuple[int, int, int, int]] = [] self.adjusted_blue_box: Optional[Tuple[int, int, int, int]] = None self.final_color: str = "none" self.final_image_type: str = "none" self.pad_info = {"left": 0, "right": 0, "top": 0, "bottom": 0} # Performance tracking self.timing: Dict[str, float] = {} self.memory_usage: Dict[str, Dict[str, float]] = {} def add_timing(self, step: str, duration: float): self.timing[step] = duration def add_memory_usage(self, step: str): self.memory_usage[step] = get_system_info() # ---------------------------------------------------------------------- # PIPELINE DECORATOR # ---------------------------------------------------------------------- def create_pipeline_step(ensure_models_loaded_func: Callable) -> Callable: def pipeline_step(func: Callable) -> Callable: def wrapper(contexts: List[ProcessingContext], batch_logs: Optional[List[Dict]] = None) -> Any: if batch_logs is None: batch_logs = [] # Only load models if not already loaded # The ensure_models_loaded_func should internally check if models are loaded ensure_models_loaded_func() start_time = time.time() try: # Memory cleanup before processing if len(contexts) > 10: # For large batches cleanup_memory() result = func(contexts, batch_logs) processing_time = round(time.time() - start_time, 3) processed_count = sum( not context.skip_run and not context.skip_processing for context in contexts ) log_data = { "function_name": func.__name__, "spent_time_seconds": processing_time, "processed_image_count": processed_count, "batch_log": batch_logs, "system_info": get_system_info() } log_content = custom_dumps(log_data) print(log_content, flush=True) # Memory cleanup after processing large batches if processed_count > 10: cleanup_memory() return result except Exception as e: error_trace = traceback.format_exc() processing_time = round(time.time() - start_time, 3) logging.error(f"Error in {func.__name__}: {str(e)}") error_log = { "function_name": func.__name__, "spent_time_seconds": processing_time, "error": str(e), "error_type": type(e).__name__, "traceback": error_trace, "system_info": get_system_info() } log_content = custom_dumps(error_log) print(log_content, flush=True) for context in contexts: context.skip_run = True batch_logs.append({ "function": func.__name__, "status": "error", "error": str(e), "error_type": type(e).__name__ }) # Cleanup on error - but skip if CUDA initialization error if "CUDA must not be initialized" not in str(e): try: cleanup_memory() except Exception: pass # Ignore cleanup errors raise return wrapper return pipeline_step # ---------------------------------------------------------------------- # IMAGE UTILITIES # ---------------------------------------------------------------------- def validate_image(image: Union[Image.Image, np.ndarray]) -> bool: if isinstance(image, Image.Image): return image.size[0] > 0 and image.size[1] > 0 elif isinstance(image, np.ndarray): return image.shape[0] > 0 and image.shape[1] > 0 return False def resize_image_aspect_ratio(image: Image.Image, max_size: int = 1920) -> Image.Image: width, height = image.size if width > max_size or height > max_size: if width > height: new_width = max_size new_height = int(height * (max_size / width)) else: new_height = max_size new_width = int(width * (max_size / height)) return image.resize((new_width, new_height), Image.Resampling.LANCZOS) return image # ---------------------------------------------------------------------- # JSON FORMATTING UTILITIES # ---------------------------------------------------------------------- def custom_dumps(data: Any) -> str: def default_handler(obj): if isinstance(obj, (BoundingBox, Detection)): return obj.model_dump() elif isinstance(obj, np.ndarray): return obj.tolist() elif torch.is_tensor(obj): return obj.cpu().numpy().tolist() elif isinstance(obj, (np.integer, np.floating)): return obj.item() elif hasattr(obj, '__dict__'): return obj.__dict__ else: return str(obj) text = json.dumps(data, indent=2, default=default_handler) # Format bounding boxes on single lines box_pattern = re.compile( r'\[\s*(-?\d+(?:\.\d+)?),\s*(-?\d+(?:\.\d+)?),\s*(-?\d+(?:\.\d+)?),\s*(-?\d+(?:\.\d+)?)(?:,\s*)?\s*\]', re.MULTILINE ) text = box_pattern.sub(r'[\1, \2, \3, \4]', text) return text # ---------------------------------------------------------------------- # ERROR RECOVERY UTILITIES # ---------------------------------------------------------------------- def safe_model_inference(model_func: Callable, *args, **kwargs) -> Any: max_retries = kwargs.pop('max_retries', 3) retry_delay = kwargs.pop('retry_delay', 1.0) for attempt in range(max_retries): try: return model_func(*args, **kwargs) except (torch.cuda.OutOfMemoryError, RuntimeError) as e: if "out of memory" in str(e).lower(): logging.warning(f"GPU OOM on attempt {attempt + 1}, cleaning memory...") cleanup_memory() if attempt < max_retries - 1: time.sleep(retry_delay) continue raise ModelInferenceError(f"Model inference failed: {str(e)}") except Exception as e: if attempt < max_retries - 1: logging.warning(f"Model inference attempt {attempt + 1} failed: {str(e)}") time.sleep(retry_delay) continue raise ModelInferenceError(f"Model inference failed after {max_retries} attempts: {str(e)}") raise ModelInferenceError("Model inference failed: max retries exceeded")