|
""" |
|
Enhanced Logging Framework for Gaia System |
|
|
|
This module provides a comprehensive logging framework for the Gaia system, |
|
with features including: |
|
|
|
- Detailed logging at all stages of the workflow |
|
- Timing information for performance analysis |
|
- Tool selection decisions and reasoning |
|
- API request and response logging |
|
- Execution path tracing |
|
- Error condition logging with stack traces |
|
|
|
The framework is designed to be minimally invasive to normal operation |
|
while providing detailed insights for debugging and performance analysis. |
|
""" |
|
|
|
import datetime |
|
import inspect |
|
import json |
|
import logging |
|
import os |
|
import sys |
|
import time |
|
import traceback |
|
import uuid |
|
from contextlib import contextmanager |
|
from dataclasses import dataclass, field |
|
from typing import Dict, Any, Optional, List, Union, Callable |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s [%(levelname)s] %(message)s', |
|
handlers=[ |
|
logging.StreamHandler(sys.stdout) |
|
] |
|
) |
|
|
|
logger = logging.getLogger("gaia") |
|
|
|
|
|
_TRACE_ID = None |
|
|
|
|
|
LOG_DIR = os.environ.get("GAIA_LOG_DIR", "logs") |
|
|
|
|
|
os.makedirs(LOG_DIR, exist_ok=True) |
|
|
|
@dataclass |
|
class LogEntry: |
|
"""Class representing a structured log entry.""" |
|
timestamp: datetime.datetime |
|
level: str |
|
message: str |
|
trace_id: str |
|
category: str |
|
details: Optional[Dict[str, Any]] = None |
|
source_file: Optional[str] = None |
|
source_line: Optional[int] = None |
|
source_function: Optional[str] = None |
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
"""Convert the log entry to a dictionary.""" |
|
return { |
|
"timestamp": self.timestamp.isoformat(), |
|
"level": self.level, |
|
"message": self.message, |
|
"trace_id": self.trace_id, |
|
"category": self.category, |
|
"details": self.details, |
|
"source_file": self.source_file, |
|
"source_line": self.source_line, |
|
"source_function": self.source_function |
|
} |
|
|
|
def to_json(self) -> str: |
|
"""Convert the log entry to a JSON string.""" |
|
return json.dumps(self.to_dict(), default=str) |
|
|
|
def get_trace_id() -> str: |
|
"""Get the current trace ID.""" |
|
global _TRACE_ID |
|
if _TRACE_ID is None: |
|
_TRACE_ID = generate_trace_id() |
|
return _TRACE_ID |
|
|
|
def set_trace_id(trace_id: str): |
|
"""Set the current trace ID.""" |
|
global _TRACE_ID |
|
_TRACE_ID = trace_id |
|
|
|
def generate_trace_id() -> str: |
|
"""Generate a new trace ID.""" |
|
return str(uuid.uuid4()) |
|
|
|
def get_caller_info() -> Dict[str, Any]: |
|
"""Get information about the caller of a function.""" |
|
frame = inspect.currentframe() |
|
try: |
|
|
|
frame = frame.f_back.f_back.f_back |
|
filename = frame.f_code.co_filename |
|
lineno = frame.f_lineno |
|
function = frame.f_code.co_name |
|
return { |
|
"file": os.path.basename(filename), |
|
"line": lineno, |
|
"function": function |
|
} |
|
except (AttributeError, ValueError): |
|
return {"file": "unknown", "line": 0, "function": "unknown"} |
|
finally: |
|
del frame |
|
|
|
def log_entry(level: str, message: str, category: str, details: Optional[Dict[str, Any]] = None): |
|
""" |
|
Log a structured entry to the log file. |
|
|
|
Args: |
|
level: Log level (INFO, WARNING, ERROR, etc.) |
|
message: Log message |
|
category: Category of the log entry (agent, tool, api, etc.) |
|
details: Additional details to include in the log entry |
|
""" |
|
trace_id = get_trace_id() |
|
timestamp = datetime.datetime.now() |
|
|
|
|
|
caller_info = get_caller_info() |
|
|
|
|
|
entry = LogEntry( |
|
timestamp=timestamp, |
|
level=level, |
|
message=message, |
|
trace_id=trace_id, |
|
category=category, |
|
details=details, |
|
source_file=caller_info["file"], |
|
source_line=caller_info["line"], |
|
source_function=caller_info["function"] |
|
) |
|
|
|
|
|
logger.log( |
|
getattr(logging, level), |
|
f"[{trace_id[:8]}] [{category}] {message}" |
|
) |
|
|
|
|
|
log_file = os.path.join(LOG_DIR, f"gaia_{trace_id}.log") |
|
with open(log_file, "a", encoding="utf-8") as f: |
|
f.write(entry.to_json() + "\n") |
|
|
|
|
|
combined_log_file = os.path.join(LOG_DIR, "gaia_combined.log") |
|
with open(combined_log_file, "a", encoding="utf-8") as f: |
|
f.write(entry.to_json() + "\n") |
|
|
|
return entry |
|
|
|
def log_info(message: str, category: str = "general", details: Optional[Dict[str, Any]] = None): |
|
"""Log an info message.""" |
|
return log_entry("INFO", message, category, details) |
|
|
|
def log_warning(message: str, category: str = "general", details: Optional[Dict[str, Any]] = None): |
|
"""Log a warning message.""" |
|
return log_entry("WARNING", message, category, details) |
|
|
|
def log_error(error: Union[Exception, str], category: str = "error", |
|
context: Optional[Dict[str, Any]] = None, critical: bool = False): |
|
""" |
|
Log an error message with stack trace. |
|
|
|
Args: |
|
error: Exception object or error message |
|
category: Category of the error |
|
context: Additional context for the error |
|
critical: Whether this is a critical error |
|
""" |
|
if isinstance(error, Exception): |
|
error_type = type(error).__name__ |
|
error_message = str(error) |
|
stack_trace = traceback.format_exc() |
|
else: |
|
error_type = "Error" |
|
error_message = error |
|
stack_trace = "".join(traceback.format_stack()[:-1]) |
|
|
|
details = { |
|
"error_type": error_type, |
|
"error_message": error_message, |
|
"stack_trace": stack_trace |
|
} |
|
|
|
if context: |
|
details["context"] = context |
|
|
|
level = "CRITICAL" if critical else "ERROR" |
|
message = f"ERROR: {error_type} - {error_message}" |
|
|
|
return log_entry(level, message, category, details) |
|
|
|
def log_api_request(api_name: str, endpoint: str, method: str, params: Optional[Dict[str, Any]] = None): |
|
""" |
|
Log an API request. |
|
|
|
Args: |
|
api_name: Name of the API (e.g., "OpenAI", "Serper") |
|
endpoint: API endpoint |
|
method: HTTP method (GET, POST, etc.) |
|
params: Request parameters |
|
""" |
|
message = f"API REQUEST: {api_name} - {method} {endpoint}" |
|
details = { |
|
"api_name": api_name, |
|
"endpoint": endpoint, |
|
"method": method, |
|
"params": params |
|
} |
|
|
|
return log_entry("INFO", message, "api", details) |
|
|
|
def log_api_response(api_name: str, endpoint: str, status_code: int, |
|
response: Any, duration: float): |
|
""" |
|
Log an API response. |
|
|
|
Args: |
|
api_name: Name of the API |
|
endpoint: API endpoint |
|
status_code: HTTP status code |
|
response: Response data |
|
duration: Request duration in seconds |
|
""" |
|
message = f"API RESPONSE: {api_name} - {status_code} ({duration:.2f}s)" |
|
|
|
|
|
if isinstance(response, dict): |
|
|
|
response_copy = response.copy() |
|
|
|
|
|
for key, value in response_copy.items(): |
|
if isinstance(value, str) and len(value) > 500: |
|
response_copy[key] = value[:500] + "... [truncated]" |
|
|
|
sanitized_response = response_copy |
|
elif isinstance(response, str): |
|
if len(response) > 500: |
|
sanitized_response = response[:500] + "... [truncated]" |
|
else: |
|
sanitized_response = response |
|
else: |
|
|
|
try: |
|
response_str = str(response) |
|
if len(response_str) > 500: |
|
sanitized_response = response_str[:500] + "... [truncated]" |
|
else: |
|
sanitized_response = response_str |
|
except: |
|
sanitized_response = "[Response could not be converted to string]" |
|
|
|
details = { |
|
"api_name": api_name, |
|
"endpoint": endpoint, |
|
"status_code": status_code, |
|
"response": sanitized_response, |
|
"duration": duration |
|
} |
|
|
|
return log_entry("INFO", message, "api", details) |
|
|
|
def log_tool_selection(tool_name: str, reason: str, inputs: Optional[Dict[str, Any]] = None): |
|
""" |
|
Log a tool selection decision. |
|
|
|
Args: |
|
tool_name: Name of the selected tool |
|
reason: Reason for selecting this tool |
|
inputs: Tool inputs |
|
""" |
|
message = f"TOOL SELECTION: {tool_name}" |
|
details = { |
|
"tool_name": tool_name, |
|
"reason": reason |
|
} |
|
|
|
if inputs: |
|
|
|
sanitized_inputs = {} |
|
for key, value in inputs.items(): |
|
if isinstance(value, str) and len(value) > 200: |
|
sanitized_inputs[key] = value[:200] + "... [truncated]" |
|
else: |
|
sanitized_inputs[key] = value |
|
|
|
details["inputs"] = sanitized_inputs |
|
|
|
return log_entry("INFO", message, "tool", details) |
|
|
|
def log_tool_execution(tool_name: str, success: bool, result: Any = None, |
|
error: Optional[str] = None, duration: Optional[float] = None): |
|
""" |
|
Log a tool execution result. |
|
|
|
Args: |
|
tool_name: Name of the tool |
|
success: Whether the execution was successful |
|
result: Tool execution result (if successful) |
|
error: Error message (if failed) |
|
duration: Execution duration in seconds |
|
""" |
|
status = "SUCCESS" if success else "FAILURE" |
|
message = f"TOOL EXECUTION: {tool_name} - {status}" |
|
|
|
details = { |
|
"tool_name": tool_name, |
|
"success": success |
|
} |
|
|
|
if duration is not None: |
|
details["duration"] = duration |
|
|
|
if success and result is not None: |
|
|
|
if isinstance(result, dict): |
|
|
|
result_copy = result.copy() |
|
|
|
|
|
for key, value in result_copy.items(): |
|
if isinstance(value, str) and len(value) > 200: |
|
result_copy[key] = value[:200] + "... [truncated]" |
|
|
|
details["result"] = result_copy |
|
elif isinstance(result, str): |
|
if len(result) > 200: |
|
details["result"] = result[:200] + "... [truncated]" |
|
else: |
|
details["result"] = result |
|
else: |
|
|
|
try: |
|
result_str = str(result) |
|
if len(result_str) > 200: |
|
details["result"] = result_str[:200] + "... [truncated]" |
|
else: |
|
details["result"] = result_str |
|
except: |
|
details["result"] = "[Result could not be converted to string]" |
|
|
|
if not success and error: |
|
details["error"] = error |
|
|
|
return log_entry("INFO", message, "tool", details) |
|
|
|
def log_workflow_step(step_name: str, description: str, |
|
inputs: Optional[Dict[str, Any]] = None, |
|
outputs: Optional[Dict[str, Any]] = None): |
|
""" |
|
Log a workflow step. |
|
|
|
Args: |
|
step_name: Name of the workflow step |
|
description: Description of the step |
|
inputs: Step inputs |
|
outputs: Step outputs |
|
""" |
|
message = f"WORKFLOW STEP: {step_name} - {description}" |
|
details = {} |
|
|
|
if inputs: |
|
|
|
sanitized_inputs = {} |
|
for key, value in inputs.items(): |
|
if isinstance(value, str) and len(value) > 200: |
|
sanitized_inputs[key] = value[:200] + "... [truncated]" |
|
else: |
|
sanitized_inputs[key] = value |
|
|
|
details["inputs"] = sanitized_inputs |
|
|
|
if outputs: |
|
|
|
sanitized_outputs = {} |
|
for key, value in outputs.items(): |
|
if isinstance(value, str) and len(value) > 200: |
|
sanitized_outputs[key] = value[:200] + "... [truncated]" |
|
else: |
|
sanitized_outputs[key] = value |
|
|
|
details["outputs"] = sanitized_outputs |
|
|
|
return log_entry("INFO", message, "workflow", details) |
|
|
|
def log_memory_operation(operation: str, key: str, value_type: str, |
|
success: bool, error: Optional[str] = None): |
|
""" |
|
Log a memory operation. |
|
|
|
Args: |
|
operation: Type of operation (store, get, update, delete) |
|
key: Memory key |
|
value_type: Type of value being stored/retrieved |
|
success: Whether the operation was successful |
|
error: Error message (if failed) |
|
""" |
|
status = "SUCCESS" if success else "FAILURE" |
|
message = f"MEMORY OPERATION: {operation.upper()} {key} - {status}" |
|
|
|
details = { |
|
"operation": operation, |
|
"key": key, |
|
"value_type": value_type, |
|
"success": success |
|
} |
|
|
|
if not success and error: |
|
details["error"] = error |
|
|
|
return log_entry("INFO", message, "memory", details) |
|
|
|
@contextmanager |
|
def TimingContext(name: str, category: str = "timing"): |
|
""" |
|
Context manager for timing operations. |
|
|
|
Args: |
|
name: Name of the operation being timed |
|
category: Category of the operation |
|
|
|
Example: |
|
with TimingContext("process_question", "agent"): |
|
result = agent.process_question(question) |
|
""" |
|
start_time = time.time() |
|
try: |
|
yield |
|
finally: |
|
end_time = time.time() |
|
duration = end_time - start_time |
|
|
|
message = f"TIMING_DATA: {name} - {duration:.4f}s" |
|
details = { |
|
"name": name, |
|
"category": category, |
|
"duration": duration, |
|
"start_time": datetime.datetime.fromtimestamp(start_time).isoformat(), |
|
"end_time": datetime.datetime.fromtimestamp(end_time).isoformat() |
|
} |
|
|
|
log_entry("INFO", message, "performance", details) |
|
|
|
def initialize_logging(verbose: bool = False, log_dir: Optional[str] = None): |
|
""" |
|
Initialize the logging framework. |
|
|
|
Args: |
|
verbose: Whether to enable verbose logging |
|
log_dir: Directory for log files |
|
""" |
|
global LOG_DIR |
|
|
|
if log_dir: |
|
LOG_DIR = log_dir |
|
os.makedirs(LOG_DIR, exist_ok=True) |
|
|
|
|
|
if verbose: |
|
logger.setLevel(logging.DEBUG) |
|
else: |
|
logger.setLevel(logging.INFO) |
|
|
|
|
|
log_info("Logging framework initialized", "system", { |
|
"verbose": verbose, |
|
"log_dir": LOG_DIR |
|
}) |
|
|
|
return True |