|
""" |
|
Comprehensive error handling and logging system for the RAG application. |
|
""" |
|
|
|
import logging |
|
import logging.handlers |
|
import sys |
|
import traceback |
|
from pathlib import Path |
|
from typing import Any, Dict, Optional, Type, Union |
|
from functools import wraps |
|
import json |
|
|
|
|
|
class RAGError(Exception): |
|
"""Base exception class for RAG system errors.""" |
|
|
|
def __init__(self, message: str, error_code: str = None, details: Dict[str, Any] = None): |
|
super().__init__(message) |
|
self.message = message |
|
self.error_code = error_code or self.__class__.__name__ |
|
self.details = details or {} |
|
|
|
|
|
class DocumentProcessingError(RAGError): |
|
"""Raised when document processing fails.""" |
|
pass |
|
|
|
|
|
class EmbeddingError(RAGError): |
|
"""Raised when embedding generation fails.""" |
|
pass |
|
|
|
|
|
class SearchError(RAGError): |
|
"""Raised when search operations fail.""" |
|
pass |
|
|
|
|
|
class ConfigurationError(RAGError): |
|
"""Raised when configuration is invalid.""" |
|
pass |
|
|
|
|
|
class ResourceError(RAGError): |
|
"""Raised when system resources are insufficient.""" |
|
pass |
|
|
|
|
|
class ErrorHandler: |
|
"""Centralized error handling and logging system.""" |
|
|
|
def __init__(self, config: Dict[str, Any]): |
|
self.config = config |
|
self.logger = self._setup_logging() |
|
|
|
def _setup_logging(self) -> logging.Logger: |
|
"""Set up logging configuration.""" |
|
logger = logging.getLogger("rag_system") |
|
logger.setLevel(getattr(logging, self.config.get("logging", {}).get("level", "INFO"))) |
|
|
|
|
|
logger.handlers = [] |
|
|
|
|
|
console_handler = logging.StreamHandler(sys.stdout) |
|
console_formatter = logging.Formatter( |
|
self.config.get("logging", {}).get("format", |
|
"%(asctime)s - %(name)s - %(levelname)s - %(message)s") |
|
) |
|
console_handler.setFormatter(console_formatter) |
|
logger.addHandler(console_handler) |
|
|
|
|
|
log_file = self.config.get("logging", {}).get("file") |
|
if log_file: |
|
log_path = Path(log_file) |
|
log_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
file_handler = logging.handlers.RotatingFileHandler( |
|
log_file, |
|
maxBytes=self._parse_size(self.config.get("logging", {}).get("max_size", "10MB")), |
|
backupCount=self.config.get("logging", {}).get("backup_count", 5) |
|
) |
|
file_handler.setFormatter(console_formatter) |
|
logger.addHandler(file_handler) |
|
|
|
return logger |
|
|
|
def _parse_size(self, size_str: str) -> int: |
|
"""Parse size string (e.g., '10MB') to bytes.""" |
|
size_str = size_str.upper() |
|
if size_str.endswith('KB'): |
|
return int(size_str[:-2]) * 1024 |
|
elif size_str.endswith('MB'): |
|
return int(size_str[:-2]) * 1024 * 1024 |
|
elif size_str.endswith('GB'): |
|
return int(size_str[:-2]) * 1024 * 1024 * 1024 |
|
else: |
|
return int(size_str) |
|
|
|
def log_error(self, error: Exception, context: Dict[str, Any] = None) -> str: |
|
"""Log an error with context and return user-friendly message.""" |
|
error_id = f"ERR_{id(error)}" |
|
context = context or {} |
|
|
|
|
|
self.logger.error( |
|
f"Error {error_id}: {str(error)}", |
|
extra={ |
|
"error_type": type(error).__name__, |
|
"error_id": error_id, |
|
"context": context, |
|
"traceback": traceback.format_exc() |
|
} |
|
) |
|
|
|
|
|
return self._get_user_friendly_message(error) |
|
|
|
def _get_user_friendly_message(self, error: Exception) -> str: |
|
"""Convert technical error to user-friendly message.""" |
|
if isinstance(error, DocumentProcessingError): |
|
return f"Document processing failed: {error.message}" |
|
elif isinstance(error, EmbeddingError): |
|
return "Failed to generate document embeddings. Please try again." |
|
elif isinstance(error, SearchError): |
|
return "Search operation failed. Please try again with a different query." |
|
elif isinstance(error, ConfigurationError): |
|
return f"Configuration error: {error.message}" |
|
elif isinstance(error, ResourceError): |
|
return "System resources are insufficient. Please try with smaller documents." |
|
elif isinstance(error, FileNotFoundError): |
|
return "The requested file could not be found." |
|
elif isinstance(error, PermissionError): |
|
return "Permission denied accessing the file." |
|
elif isinstance(error, MemoryError): |
|
return "Not enough memory to process the request. Please try with smaller documents." |
|
else: |
|
return "An unexpected error occurred. Please try again." |
|
|
|
def handle_exception(self, func): |
|
"""Decorator for handling exceptions in functions.""" |
|
@wraps(func) |
|
def wrapper(*args, **kwargs): |
|
try: |
|
return func(*args, **kwargs) |
|
except Exception as e: |
|
error_message = self.log_error(e, { |
|
"function": func.__name__, |
|
"args": str(args)[:200], |
|
"kwargs": str(kwargs)[:200] |
|
}) |
|
raise RAGError(error_message) from e |
|
return wrapper |
|
|
|
|
|
def validate_file_upload(file_path: str, max_size: int, allowed_extensions: list) -> None: |
|
"""Validate uploaded file.""" |
|
if not file_path or not Path(file_path).exists(): |
|
raise DocumentProcessingError("File not found or invalid path") |
|
|
|
file_path = Path(file_path) |
|
|
|
|
|
if file_path.stat().st_size > max_size: |
|
size_mb = max_size / (1024 * 1024) |
|
raise DocumentProcessingError(f"File size exceeds maximum allowed size of {size_mb:.1f}MB") |
|
|
|
|
|
if file_path.suffix.lower() not in allowed_extensions: |
|
raise DocumentProcessingError( |
|
f"File type not supported. Allowed types: {', '.join(allowed_extensions)}" |
|
) |
|
|
|
|
|
def validate_config(config: Dict[str, Any]) -> None: |
|
"""Validate configuration dictionary.""" |
|
required_sections = ["app", "models", "processing", "search"] |
|
|
|
for section in required_sections: |
|
if section not in config: |
|
raise ConfigurationError(f"Missing required configuration section: {section}") |
|
|
|
|
|
if "embedding" not in config["models"]: |
|
raise ConfigurationError("Missing embedding model configuration") |
|
|
|
|
|
processing = config["processing"] |
|
if processing.get("chunk_size", 0) <= 0: |
|
raise ConfigurationError("chunk_size must be positive") |
|
|
|
if processing.get("chunk_overlap", -1) < 0: |
|
raise ConfigurationError("chunk_overlap must be non-negative") |
|
|
|
|
|
class ContextualLogger: |
|
"""Logger with contextual information.""" |
|
|
|
def __init__(self, logger: logging.Logger, context: Dict[str, Any] = None): |
|
self.logger = logger |
|
self.context = context or {} |
|
|
|
def info(self, message: str, **kwargs): |
|
self.logger.info(message, extra={**self.context, **kwargs}) |
|
|
|
def warning(self, message: str, **kwargs): |
|
self.logger.warning(message, extra={**self.context, **kwargs}) |
|
|
|
def error(self, message: str, **kwargs): |
|
self.logger.error(message, extra={**self.context, **kwargs}) |
|
|
|
def debug(self, message: str, **kwargs): |
|
self.logger.debug(message, extra={**self.context, **kwargs}) |
|
|
|
|
|
def create_error_response(error: Exception, request_id: str = None) -> Dict[str, Any]: |
|
"""Create standardized error response.""" |
|
return { |
|
"success": False, |
|
"error": { |
|
"type": type(error).__name__, |
|
"message": str(error), |
|
"request_id": request_id |
|
}, |
|
"data": None |
|
} |
|
|
|
|
|
def create_success_response(data: Any, request_id: str = None) -> Dict[str, Any]: |
|
"""Create standardized success response.""" |
|
return { |
|
"success": True, |
|
"error": None, |
|
"data": data, |
|
"request_id": request_id |
|
} |