# ---------------------------------------------------------------------- # IMPORTS # ---------------------------------------------------------------------- import io import json import re import traceback import time import logging import os import psutil import gc from datetime import datetime, timezone, timedelta from typing import Dict, List, Optional, Tuple, Any, Callable, Union from PIL import Image from pydantic import BaseModel, Field import torch import numpy as np # ---------------------------------------------------------------------- # SYSTEM MONITORING # ---------------------------------------------------------------------- def get_system_info(): cpu_percent = psutil.cpu_percent(interval=0.1) memory = psutil.virtual_memory() info = { "cpu_percent": round(cpu_percent, 2), "memory_percent": round(memory.percent, 2), "memory_total_gb": round(memory.total / (1024**3), 2), "memory_available_gb": round(memory.available / (1024**3), 2), "memory_used_gb": round(memory.used / (1024**3), 2), } if torch.cuda.is_available() and os.getenv("SPACE_ID") is None: try: info["gpu_count"] = torch.cuda.device_count() for i in range(torch.cuda.device_count()): torch.cuda.set_device(i) gpu_allocated = torch.cuda.memory_allocated(i) / (1024**3) gpu_reserved = torch.cuda.memory_reserved(i) / (1024**3) gpu_total = torch.cuda.get_device_properties(i).total_memory / (1024**3) info[f"gpu_{i}_memory_allocated_gb"] = round(gpu_allocated, 2) info[f"gpu_{i}_memory_reserved_gb"] = round(gpu_reserved, 2) info[f"gpu_{i}_memory_total_gb"] = round(gpu_total, 2) info[f"gpu_{i}_memory_allocated_percent"] = round((gpu_allocated / gpu_total * 100) if gpu_total > 0 else 0, 2) info[f"gpu_{i}_memory_reserved_percent"] = round((gpu_reserved / gpu_total * 100) if gpu_total > 0 else 0, 2) except Exception as e: logging.warning(f"GPU memory monitoring failed: {e}") info["gpu_count"] = 0 elif torch.cuda.is_available(): info["gpu_available"] = True info["gpu_count"] = 0 # Will be set properly within GPU functions return info def cleanup_memory(): gc.collect() if torch.cuda.is_available(): try: torch.cuda.empty_cache() if not (hasattr(torch, '_C') and hasattr(torch._C, '_cuda_getDeviceCount') and torch._C._cuda_getDeviceCount() > 0): torch.cuda.synchronize() except RuntimeError as e: if "CUDA must not be initialized" not in str(e): raise def force_gpu_memory_update(): if torch.cuda.is_available() and torch.cuda.device_count() > 0 and os.getenv("SPACE_ID") is None: try: temp_tensor = torch.randn(1, device='cuda') del temp_tensor torch.cuda.empty_cache() except Exception as e: logging.debug(f"GPU memory update failed: {e}") # ---------------------------------------------------------------------- # LOGGING CONFIGURATION # ---------------------------------------------------------------------- LOG_LEVEL_MAP = { "DEBUG": logging.DEBUG, "INFO": logging.INFO, "SUCCESS": logging.INFO, "WARNING": logging.WARNING, "ERROR": logging.ERROR, "CRITICAL": logging.CRITICAL, "PROCESSING": logging.INFO, "RETRY": logging.WARNING, "JSON": logging.INFO, "PERFORMANCE": logging.INFO, "MEMORY": logging.INFO } EMOJI_MAP = { "DEBUG": "🔍", "INFO": "â„šī¸", "SUCCESS": "✅", "WARNING": "âš ī¸", "ERROR": "❌", "CRITICAL": "đŸ”Ĩ", "PROCESSING": "âš™ī¸", "RETRY": "🔄", "JSON": "📊", "PERFORMANCE": "⚡", "MEMORY": "💾" } def setup_logging(): LOG_FORMAT = "%(asctime)s [%(levelname)s] %(module)s: %(message)s" LOG_LEVEL = logging.INFO if os.getenv("DEBUG", "").lower() == "true": LOG_LEVEL = logging.DEBUG for name, level in LOG_LEVEL_MAP.items(): if not hasattr(logging, name): setattr(logging, name, level) class EmojiFormatter(logging.Formatter): def formatTime(self, record, datefmt=None): dt = datetime.fromtimestamp(record.created, tz=timezone.utc) dt = dt - timedelta(hours=2) if datefmt: return dt.strftime(datefmt) return dt.strftime('%Y-%m-%d %H:%M:%S,%f')[:-3] def format(self, record): if not getattr(record, 'emoji_prefixed', False): for emoji in EMOJI_MAP.values(): if str(record.msg).startswith(emoji): record.emoji_prefixed = True break if not getattr(record, 'emoji_prefixed', False): for name, level in LOG_LEVEL_MAP.items(): if record.levelno == level: record.msg = f"{EMOJI_MAP.get(name, '')} {record.msg}" record.emoji_prefixed = True break return super().format(record) formatter = EmojiFormatter(LOG_FORMAT) handler = logging.StreamHandler() handler.setFormatter(formatter) # Clear all existing loggers to avoid duplicates logging.root.handlers = [] # Configure root logger root = logging.getLogger() root.setLevel(LOG_LEVEL) root.addHandler(handler) # Configure specific loggers with their own handlers to avoid duplicates for logger_name in ["uvicorn", "uvicorn.access", "uvicorn.error", "uvicorn.asgi"]: logger = logging.getLogger(logger_name) logger.handlers = [] # Clear any existing handlers logger.propagate = False # Prevent propagation to root logger # Don't add handler - let them use default formatting # Prevent duplicate logging from libraries for logger_name in ["_client", "httpx._client", "httpcore._sync.connection_pool", "httpcore._sync.http11"]: logger = logging.getLogger(logger_name) logger.handlers = [] if logger_name == "_client": # Keep _client logs but with our handler logger.addHandler(handler) logger.setLevel(LOG_LEVEL) logger.propagate = False # Silence overly verbose loggers for logger_name in ["PIL", "PIL.Image", "transformers", "accelerate"]: logger = logging.getLogger(logger_name) logger.setLevel(logging.WARNING) logging.info("Application logging configured successfully") # Log system info at startup system_info = get_system_info() logging.info(f"System info: {system_info}") # ---------------------------------------------------------------------- # PERFORMANCE DECORATORS # ---------------------------------------------------------------------- def measure_performance(func: Callable) -> Callable: def wrapper(*args, **kwargs): start_time = time.perf_counter() force_gpu_memory_update() start_memory = get_system_info() try: result = func(*args, **kwargs) end_time = time.perf_counter() force_gpu_memory_update() end_memory = get_system_info() duration = end_time - start_time memory_used_delta_gb = end_memory["memory_used_gb"] - start_memory["memory_used_gb"] memory_percent_delta = end_memory["memory_percent"] - start_memory["memory_percent"] total_memory_gb = end_memory.get("memory_total_gb", 0) memory_delta = { "memory_used_delta_gb": round(memory_used_delta_gb, 2), "memory_percent_delta": round(memory_percent_delta, 2), "memory_used_percent": round(end_memory["memory_percent"], 2), "memory_total_gb": round(total_memory_gb, 2) } if torch.cuda.is_available() and torch.cuda.device_count() > 0 and not os.getenv("SPACE_ID"): for i in range(torch.cuda.device_count()): allocated_key = f"gpu_{i}_memory_allocated_gb" reserved_key = f"gpu_{i}_memory_reserved_gb" total_key = f"gpu_{i}_memory_total_gb" start_allocated = start_memory.get(allocated_key, 0) end_allocated = end_memory.get(allocated_key, 0) start_reserved = start_memory.get(reserved_key, 0) end_reserved = end_memory.get(reserved_key, 0) if start_allocated > 0 or end_allocated > 0 or start_reserved > 0 or end_reserved > 0: allocated_delta = end_allocated - start_allocated reserved_delta = end_reserved - start_reserved gpu_total = end_memory.get(total_key, 0) memory_delta[f"gpu_{i}_allocated_delta_gb"] = round(allocated_delta, 2) memory_delta[f"gpu_{i}_reserved_delta_gb"] = round(reserved_delta, 2) memory_delta[f"gpu_{i}_allocated_percent"] = round(end_memory.get(f"gpu_{i}_memory_allocated_percent", 0), 2) memory_delta[f"gpu_{i}_reserved_percent"] = round(end_memory.get(f"gpu_{i}_memory_reserved_percent", 0), 2) memory_delta[f"gpu_{i}_total_gb"] = round(gpu_total, 2) logging.log( LOG_LEVEL_MAP["PERFORMANCE"], f"{EMOJI_MAP['PERFORMANCE']} {func.__name__} completed in {duration:.3f}s | Memory delta: {memory_delta}" ) return result except Exception as e: end_time = time.perf_counter() duration = end_time - start_time logging.error(f"{func.__name__} failed after {duration:.3f}s: {str(e)}") raise return wrapper