""" Enhanced Logging Framework for Gaia System This module provides a comprehensive logging framework for the Gaia system, with features including: - Detailed logging at all stages of the workflow - Timing information for performance analysis - Tool selection decisions and reasoning - API request and response logging - Execution path tracing - Error condition logging with stack traces The framework is designed to be minimally invasive to normal operation while providing detailed insights for debugging and performance analysis. """ import datetime import inspect import json import logging import os import sys import time import traceback import uuid from contextlib import contextmanager from dataclasses import dataclass, field from typing import Dict, Any, Optional, List, Union, Callable # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', handlers=[ logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger("gaia") # Global trace ID for the current execution _TRACE_ID = None # Directory for log files LOG_DIR = os.environ.get("GAIA_LOG_DIR", "logs") # Ensure log directory exists os.makedirs(LOG_DIR, exist_ok=True) @dataclass class LogEntry: """Class representing a structured log entry.""" timestamp: datetime.datetime level: str message: str trace_id: str category: str details: Optional[Dict[str, Any]] = None source_file: Optional[str] = None source_line: Optional[int] = None source_function: Optional[str] = None def to_dict(self) -> Dict[str, Any]: """Convert the log entry to a dictionary.""" return { "timestamp": self.timestamp.isoformat(), "level": self.level, "message": self.message, "trace_id": self.trace_id, "category": self.category, "details": self.details, "source_file": self.source_file, "source_line": self.source_line, "source_function": self.source_function } def to_json(self) -> str: """Convert the log entry to a JSON string.""" return json.dumps(self.to_dict(), default=str) def get_trace_id() -> str: """Get the current trace ID.""" global _TRACE_ID if _TRACE_ID is None: _TRACE_ID = generate_trace_id() return _TRACE_ID def set_trace_id(trace_id: str): """Set the current trace ID.""" global _TRACE_ID _TRACE_ID = trace_id def generate_trace_id() -> str: """Generate a new trace ID.""" return str(uuid.uuid4()) def get_caller_info() -> Dict[str, Any]: """Get information about the caller of a function.""" frame = inspect.currentframe() try: # Go back 3 frames to get the caller of the logging function frame = frame.f_back.f_back.f_back filename = frame.f_code.co_filename lineno = frame.f_lineno function = frame.f_code.co_name return { "file": os.path.basename(filename), "line": lineno, "function": function } except (AttributeError, ValueError): return {"file": "unknown", "line": 0, "function": "unknown"} finally: del frame # Avoid reference cycles def log_entry(level: str, message: str, category: str, details: Optional[Dict[str, Any]] = None): """ Log a structured entry to the log file. Args: level: Log level (INFO, WARNING, ERROR, etc.) message: Log message category: Category of the log entry (agent, tool, api, etc.) details: Additional details to include in the log entry """ trace_id = get_trace_id() timestamp = datetime.datetime.now() # Get caller information caller_info = get_caller_info() # Create log entry entry = LogEntry( timestamp=timestamp, level=level, message=message, trace_id=trace_id, category=category, details=details, source_file=caller_info["file"], source_line=caller_info["line"], source_function=caller_info["function"] ) # Log to console logger.log( getattr(logging, level), f"[{trace_id[:8]}] [{category}] {message}" ) # Write to log file log_file = os.path.join(LOG_DIR, f"gaia_{trace_id}.log") with open(log_file, "a", encoding="utf-8") as f: f.write(entry.to_json() + "\n") # Write to combined log file combined_log_file = os.path.join(LOG_DIR, "gaia_combined.log") with open(combined_log_file, "a", encoding="utf-8") as f: f.write(entry.to_json() + "\n") return entry def log_info(message: str, category: str = "general", details: Optional[Dict[str, Any]] = None): """Log an info message.""" return log_entry("INFO", message, category, details) def log_warning(message: str, category: str = "general", details: Optional[Dict[str, Any]] = None): """Log a warning message.""" return log_entry("WARNING", message, category, details) def log_error(error: Union[Exception, str], category: str = "error", context: Optional[Dict[str, Any]] = None, critical: bool = False): """ Log an error message with stack trace. Args: error: Exception object or error message category: Category of the error context: Additional context for the error critical: Whether this is a critical error """ if isinstance(error, Exception): error_type = type(error).__name__ error_message = str(error) stack_trace = traceback.format_exc() else: error_type = "Error" error_message = error stack_trace = "".join(traceback.format_stack()[:-1]) details = { "error_type": error_type, "error_message": error_message, "stack_trace": stack_trace } if context: details["context"] = context level = "CRITICAL" if critical else "ERROR" message = f"ERROR: {error_type} - {error_message}" return log_entry(level, message, category, details) def log_api_request(api_name: str, endpoint: str, method: str, params: Optional[Dict[str, Any]] = None): """ Log an API request. Args: api_name: Name of the API (e.g., "OpenAI", "Serper") endpoint: API endpoint method: HTTP method (GET, POST, etc.) params: Request parameters """ message = f"API REQUEST: {api_name} - {method} {endpoint}" details = { "api_name": api_name, "endpoint": endpoint, "method": method, "params": params } return log_entry("INFO", message, "api", details) def log_api_response(api_name: str, endpoint: str, status_code: int, response: Any, duration: float): """ Log an API response. Args: api_name: Name of the API endpoint: API endpoint status_code: HTTP status code response: Response data duration: Request duration in seconds """ message = f"API RESPONSE: {api_name} - {status_code} ({duration:.2f}s)" # Sanitize response for logging if isinstance(response, dict): # Create a copy to avoid modifying the original response_copy = response.copy() # Truncate large text fields for key, value in response_copy.items(): if isinstance(value, str) and len(value) > 500: response_copy[key] = value[:500] + "... [truncated]" sanitized_response = response_copy elif isinstance(response, str): if len(response) > 500: sanitized_response = response[:500] + "... [truncated]" else: sanitized_response = response else: # Try to convert to string try: response_str = str(response) if len(response_str) > 500: sanitized_response = response_str[:500] + "... [truncated]" else: sanitized_response = response_str except: sanitized_response = "[Response could not be converted to string]" details = { "api_name": api_name, "endpoint": endpoint, "status_code": status_code, "response": sanitized_response, "duration": duration } return log_entry("INFO", message, "api", details) def log_tool_selection(tool_name: str, reason: str, inputs: Optional[Dict[str, Any]] = None): """ Log a tool selection decision. Args: tool_name: Name of the selected tool reason: Reason for selecting this tool inputs: Tool inputs """ message = f"TOOL SELECTION: {tool_name}" details = { "tool_name": tool_name, "reason": reason } if inputs: # Sanitize inputs for logging sanitized_inputs = {} for key, value in inputs.items(): if isinstance(value, str) and len(value) > 200: sanitized_inputs[key] = value[:200] + "... [truncated]" else: sanitized_inputs[key] = value details["inputs"] = sanitized_inputs return log_entry("INFO", message, "tool", details) def log_tool_execution(tool_name: str, success: bool, result: Any = None, error: Optional[str] = None, duration: Optional[float] = None): """ Log a tool execution result. Args: tool_name: Name of the tool success: Whether the execution was successful result: Tool execution result (if successful) error: Error message (if failed) duration: Execution duration in seconds """ status = "SUCCESS" if success else "FAILURE" message = f"TOOL EXECUTION: {tool_name} - {status}" details = { "tool_name": tool_name, "success": success } if duration is not None: details["duration"] = duration if success and result is not None: # Sanitize result for logging if isinstance(result, dict): # Create a copy to avoid modifying the original result_copy = result.copy() # Truncate large text fields for key, value in result_copy.items(): if isinstance(value, str) and len(value) > 200: result_copy[key] = value[:200] + "... [truncated]" details["result"] = result_copy elif isinstance(result, str): if len(result) > 200: details["result"] = result[:200] + "... [truncated]" else: details["result"] = result else: # Try to convert to string try: result_str = str(result) if len(result_str) > 200: details["result"] = result_str[:200] + "... [truncated]" else: details["result"] = result_str except: details["result"] = "[Result could not be converted to string]" if not success and error: details["error"] = error return log_entry("INFO", message, "tool", details) def log_workflow_step(step_name: str, description: str, inputs: Optional[Dict[str, Any]] = None, outputs: Optional[Dict[str, Any]] = None): """ Log a workflow step. Args: step_name: Name of the workflow step description: Description of the step inputs: Step inputs outputs: Step outputs """ message = f"WORKFLOW STEP: {step_name} - {description}" details = {} if inputs: # Sanitize inputs for logging sanitized_inputs = {} for key, value in inputs.items(): if isinstance(value, str) and len(value) > 200: sanitized_inputs[key] = value[:200] + "... [truncated]" else: sanitized_inputs[key] = value details["inputs"] = sanitized_inputs if outputs: # Sanitize outputs for logging sanitized_outputs = {} for key, value in outputs.items(): if isinstance(value, str) and len(value) > 200: sanitized_outputs[key] = value[:200] + "... [truncated]" else: sanitized_outputs[key] = value details["outputs"] = sanitized_outputs return log_entry("INFO", message, "workflow", details) def log_memory_operation(operation: str, key: str, value_type: str, success: bool, error: Optional[str] = None): """ Log a memory operation. Args: operation: Type of operation (store, get, update, delete) key: Memory key value_type: Type of value being stored/retrieved success: Whether the operation was successful error: Error message (if failed) """ status = "SUCCESS" if success else "FAILURE" message = f"MEMORY OPERATION: {operation.upper()} {key} - {status}" details = { "operation": operation, "key": key, "value_type": value_type, "success": success } if not success and error: details["error"] = error return log_entry("INFO", message, "memory", details) @contextmanager def TimingContext(name: str, category: str = "timing"): """ Context manager for timing operations. Args: name: Name of the operation being timed category: Category of the operation Example: with TimingContext("process_question", "agent"): result = agent.process_question(question) """ start_time = time.time() try: yield finally: end_time = time.time() duration = end_time - start_time message = f"TIMING_DATA: {name} - {duration:.4f}s" details = { "name": name, "category": category, "duration": duration, "start_time": datetime.datetime.fromtimestamp(start_time).isoformat(), "end_time": datetime.datetime.fromtimestamp(end_time).isoformat() } log_entry("INFO", message, "performance", details) def initialize_logging(verbose: bool = False, log_dir: Optional[str] = None): """ Initialize the logging framework. Args: verbose: Whether to enable verbose logging log_dir: Directory for log files """ global LOG_DIR if log_dir: LOG_DIR = log_dir os.makedirs(LOG_DIR, exist_ok=True) # Set logging level if verbose: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) # Log initialization log_info("Logging framework initialized", "system", { "verbose": verbose, "log_dir": LOG_DIR }) return True