Spaces:
Running
Running
""" | |
Comprehensive error handling and logging system for the RAG application. | |
""" | |
import logging | |
import logging.handlers | |
import sys | |
import traceback | |
from pathlib import Path | |
from typing import Any, Dict, Optional, Type, Union | |
from functools import wraps | |
import json | |
class RAGError(Exception): | |
"""Base exception class for RAG system errors.""" | |
def __init__(self, message: str, error_code: str = None, details: Dict[str, Any] = None): | |
super().__init__(message) | |
self.message = message | |
self.error_code = error_code or self.__class__.__name__ | |
self.details = details or {} | |
class DocumentProcessingError(RAGError): | |
"""Raised when document processing fails.""" | |
pass | |
class EmbeddingError(RAGError): | |
"""Raised when embedding generation fails.""" | |
pass | |
class SearchError(RAGError): | |
"""Raised when search operations fail.""" | |
pass | |
class ConfigurationError(RAGError): | |
"""Raised when configuration is invalid.""" | |
pass | |
class ResourceError(RAGError): | |
"""Raised when system resources are insufficient.""" | |
pass | |
class ErrorHandler: | |
"""Centralized error handling and logging system.""" | |
def __init__(self, config: Dict[str, Any]): | |
self.config = config | |
self.logger = self._setup_logging() | |
def _setup_logging(self) -> logging.Logger: | |
"""Set up logging configuration.""" | |
logger = logging.getLogger("rag_system") | |
logger.setLevel(getattr(logging, self.config.get("logging", {}).get("level", "INFO"))) | |
# Clear existing handlers | |
logger.handlers = [] | |
# Console handler | |
console_handler = logging.StreamHandler(sys.stdout) | |
console_formatter = logging.Formatter( | |
self.config.get("logging", {}).get("format", | |
"%(asctime)s - %(name)s - %(levelname)s - %(message)s") | |
) | |
console_handler.setFormatter(console_formatter) | |
logger.addHandler(console_handler) | |
# File handler (if specified) | |
log_file = self.config.get("logging", {}).get("file") | |
if log_file: | |
log_path = Path(log_file) | |
log_path.parent.mkdir(parents=True, exist_ok=True) | |
file_handler = logging.handlers.RotatingFileHandler( | |
log_file, | |
maxBytes=self._parse_size(self.config.get("logging", {}).get("max_size", "10MB")), | |
backupCount=self.config.get("logging", {}).get("backup_count", 5) | |
) | |
file_handler.setFormatter(console_formatter) | |
logger.addHandler(file_handler) | |
return logger | |
def _parse_size(self, size_str: str) -> int: | |
"""Parse size string (e.g., '10MB') to bytes.""" | |
size_str = size_str.upper() | |
if size_str.endswith('KB'): | |
return int(size_str[:-2]) * 1024 | |
elif size_str.endswith('MB'): | |
return int(size_str[:-2]) * 1024 * 1024 | |
elif size_str.endswith('GB'): | |
return int(size_str[:-2]) * 1024 * 1024 * 1024 | |
else: | |
return int(size_str) | |
def log_error(self, error: Exception, context: Dict[str, Any] = None) -> str: | |
"""Log an error with context and return user-friendly message.""" | |
error_id = f"ERR_{id(error)}" | |
context = context or {} | |
# Log detailed error for debugging | |
self.logger.error( | |
f"Error {error_id}: {str(error)}", | |
extra={ | |
"error_type": type(error).__name__, | |
"error_id": error_id, | |
"context": context, | |
"traceback": traceback.format_exc() | |
} | |
) | |
# Return user-friendly message | |
return self._get_user_friendly_message(error) | |
def _get_user_friendly_message(self, error: Exception) -> str: | |
"""Convert technical error to user-friendly message.""" | |
if isinstance(error, DocumentProcessingError): | |
return f"Document processing failed: {error.message}" | |
elif isinstance(error, EmbeddingError): | |
return "Failed to generate document embeddings. Please try again." | |
elif isinstance(error, SearchError): | |
return "Search operation failed. Please try again with a different query." | |
elif isinstance(error, ConfigurationError): | |
return f"Configuration error: {error.message}" | |
elif isinstance(error, ResourceError): | |
return "System resources are insufficient. Please try with smaller documents." | |
elif isinstance(error, FileNotFoundError): | |
return "The requested file could not be found." | |
elif isinstance(error, PermissionError): | |
return "Permission denied accessing the file." | |
elif isinstance(error, MemoryError): | |
return "Not enough memory to process the request. Please try with smaller documents." | |
else: | |
return "An unexpected error occurred. Please try again." | |
def handle_exception(self, func): | |
"""Decorator for handling exceptions in functions.""" | |
def wrapper(*args, **kwargs): | |
try: | |
return func(*args, **kwargs) | |
except Exception as e: | |
error_message = self.log_error(e, { | |
"function": func.__name__, | |
"args": str(args)[:200], # Limit size | |
"kwargs": str(kwargs)[:200] | |
}) | |
raise RAGError(error_message) from e | |
return wrapper | |
def validate_file_upload(file_path: str, max_size: int, allowed_extensions: list) -> None: | |
"""Validate uploaded file.""" | |
if not file_path or not Path(file_path).exists(): | |
raise DocumentProcessingError("File not found or invalid path") | |
file_path = Path(file_path) | |
# Check file size | |
if file_path.stat().st_size > max_size: | |
size_mb = max_size / (1024 * 1024) | |
raise DocumentProcessingError(f"File size exceeds maximum allowed size of {size_mb:.1f}MB") | |
# Check file extension | |
if file_path.suffix.lower() not in allowed_extensions: | |
raise DocumentProcessingError( | |
f"File type not supported. Allowed types: {', '.join(allowed_extensions)}" | |
) | |
def validate_config(config: Dict[str, Any]) -> None: | |
"""Validate configuration dictionary.""" | |
required_sections = ["app", "models", "processing", "search"] | |
for section in required_sections: | |
if section not in config: | |
raise ConfigurationError(f"Missing required configuration section: {section}") | |
# Validate model configurations | |
if "embedding" not in config["models"]: | |
raise ConfigurationError("Missing embedding model configuration") | |
# Validate processing parameters | |
processing = config["processing"] | |
if processing.get("chunk_size", 0) <= 0: | |
raise ConfigurationError("chunk_size must be positive") | |
if processing.get("chunk_overlap", -1) < 0: | |
raise ConfigurationError("chunk_overlap must be non-negative") | |
class ContextualLogger: | |
"""Logger with contextual information.""" | |
def __init__(self, logger: logging.Logger, context: Dict[str, Any] = None): | |
self.logger = logger | |
self.context = context or {} | |
def info(self, message: str, **kwargs): | |
self.logger.info(message, extra={**self.context, **kwargs}) | |
def warning(self, message: str, **kwargs): | |
self.logger.warning(message, extra={**self.context, **kwargs}) | |
def error(self, message: str, **kwargs): | |
self.logger.error(message, extra={**self.context, **kwargs}) | |
def debug(self, message: str, **kwargs): | |
self.logger.debug(message, extra={**self.context, **kwargs}) | |
def create_error_response(error: Exception, request_id: str = None) -> Dict[str, Any]: | |
"""Create standardized error response.""" | |
return { | |
"success": False, | |
"error": { | |
"type": type(error).__name__, | |
"message": str(error), | |
"request_id": request_id | |
}, | |
"data": None | |
} | |
def create_success_response(data: Any, request_id: str = None) -> Dict[str, Any]: | |
"""Create standardized success response.""" | |
return { | |
"success": True, | |
"error": None, | |
"data": data, | |
"request_id": request_id | |
} |