|
""" |
|
Logging Integration for Gaia System |
|
|
|
This module provides functions to integrate the enhanced logging framework |
|
with the existing Gaia system components, including: |
|
|
|
- Agent integration |
|
- Tool registry integration |
|
- API client integration |
|
- Memory integration |
|
- Graph workflow integration |
|
|
|
It provides decorators and wrapper functions to add logging to existing components |
|
with minimal changes to the original code. |
|
""" |
|
|
|
import functools |
|
import inspect |
|
import json |
|
import time |
|
import traceback |
|
from typing import Dict, Any, Optional, Callable, List, Union, Type |
|
|
|
from utils.logging_framework import ( |
|
log_timing, |
|
log_error, |
|
log_api_request, |
|
log_api_response, |
|
log_tool_selection, |
|
log_tool_execution, |
|
log_workflow_step, |
|
log_memory_operation, |
|
TimingContext, |
|
get_trace_id, |
|
set_trace_id, |
|
generate_trace_id, |
|
initialize_logging |
|
) |
|
|
|
def integrate_agent_logging(agent_class: Type): |
|
""" |
|
Integrate logging with the GAIAAgent class. |
|
|
|
This function monkey-patches the GAIAAgent class to add logging to key methods. |
|
|
|
Args: |
|
agent_class: The GAIAAgent class to patch |
|
""" |
|
original_init = agent_class.__init__ |
|
original_process_question = agent_class.process_question |
|
|
|
@functools.wraps(original_init) |
|
def init_with_logging(self, config=None, *args, **kwargs): |
|
|
|
trace_id = generate_trace_id() |
|
set_trace_id(trace_id) |
|
|
|
|
|
with TimingContext("agent initialization", "agent"): |
|
original_init(self, config, *args, **kwargs) |
|
|
|
|
|
self._trace_id = trace_id |
|
|
|
@functools.wraps(original_process_question) |
|
def process_question_with_logging(self, question, *args, **kwargs): |
|
|
|
if hasattr(self, '_trace_id'): |
|
set_trace_id(self._trace_id) |
|
else: |
|
self._trace_id = generate_trace_id() |
|
set_trace_id(self._trace_id) |
|
|
|
|
|
log_workflow_step("process_question", f"Processing question: {question}", |
|
inputs={"question": question}) |
|
|
|
|
|
with TimingContext("process_question", "agent"): |
|
try: |
|
result = original_process_question(self, question, *args, **kwargs) |
|
|
|
|
|
log_workflow_step("generate_answer", "Generated answer", |
|
outputs={"answer": result}) |
|
|
|
return result |
|
except Exception as e: |
|
|
|
log_error(e, context={"question": question}, critical=True) |
|
raise |
|
|
|
|
|
agent_class.__init__ = init_with_logging |
|
agent_class.process_question = process_question_with_logging |
|
|
|
def integrate_tool_registry_logging(registry_class: Type): |
|
""" |
|
Integrate logging with the ToolRegistry class. |
|
|
|
This function monkey-patches the ToolRegistry class to add logging to key methods. |
|
|
|
Args: |
|
registry_class: The ToolRegistry class to patch |
|
""" |
|
original_execute_tool = registry_class.execute_tool |
|
|
|
@functools.wraps(original_execute_tool) |
|
def execute_tool_with_logging(self, name, **kwargs): |
|
|
|
log_tool_selection(name, f"Selected tool: {name}", inputs=kwargs) |
|
|
|
|
|
start_time = time.time() |
|
try: |
|
with TimingContext(f"tool_execution_{name}", "tool"): |
|
result = original_execute_tool(self, name, **kwargs) |
|
|
|
|
|
end_time = time.time() |
|
duration = end_time - start_time |
|
|
|
|
|
log_tool_execution(name, True, result=result, duration=duration) |
|
|
|
return result |
|
except Exception as e: |
|
|
|
end_time = time.time() |
|
duration = end_time - start_time |
|
|
|
|
|
log_tool_execution(name, False, error=str(e), duration=duration) |
|
|
|
|
|
log_error(e, context={"tool_name": name, "inputs": kwargs}) |
|
|
|
raise |
|
|
|
|
|
registry_class.execute_tool = execute_tool_with_logging |
|
|
|
def patch_api_client(client_class: Type, api_name: str): |
|
""" |
|
Patch an API client class to add logging. |
|
|
|
Args: |
|
client_class: The API client class to patch |
|
api_name: The name of the API (e.g., "OpenAI", "Serper") |
|
""" |
|
|
|
for name, method in inspect.getmembers(client_class, inspect.isfunction): |
|
if name.startswith('_'): |
|
continue |
|
|
|
original_method = getattr(client_class, name) |
|
|
|
@functools.wraps(original_method) |
|
def api_method_with_logging(self, *args, method_name=name, **kwargs): |
|
|
|
endpoint = kwargs.get('endpoint', method_name) |
|
|
|
|
|
http_method = kwargs.get('method', 'POST') |
|
|
|
|
|
log_api_request(api_name, endpoint, http_method, params=kwargs) |
|
|
|
|
|
start_time = time.time() |
|
try: |
|
with TimingContext(f"{api_name}_{method_name}", "api"): |
|
response = original_method(self, *args, **kwargs) |
|
|
|
|
|
end_time = time.time() |
|
duration = end_time - start_time |
|
|
|
|
|
status_code = 200 |
|
if hasattr(response, 'status_code'): |
|
status_code = response.status_code |
|
elif isinstance(response, dict) and 'status_code' in response: |
|
status_code = response['status_code'] |
|
|
|
|
|
log_api_response(api_name, endpoint, status_code, response, duration) |
|
|
|
return response |
|
except Exception as e: |
|
|
|
end_time = time.time() |
|
duration = end_time - start_time |
|
|
|
|
|
log_error(e, context={"api_name": api_name, "endpoint": endpoint, "params": kwargs}) |
|
|
|
raise |
|
|
|
|
|
def create_closure(method_name): |
|
@functools.wraps(original_method) |
|
def wrapper(self, *args, **kwargs): |
|
return api_method_with_logging(self, *args, method_name=method_name, **kwargs) |
|
return wrapper |
|
|
|
|
|
setattr(client_class, name, create_closure(name)) |
|
|
|
def integrate_memory_logging(memory_class: Type): |
|
""" |
|
Integrate logging with the memory class. |
|
|
|
This function monkey-patches the memory class to add logging to key methods. |
|
|
|
Args: |
|
memory_class: The memory class to patch |
|
""" |
|
|
|
for name, method in inspect.getmembers(memory_class, inspect.isfunction): |
|
if name.startswith('_'): |
|
continue |
|
|
|
|
|
operation = None |
|
if 'store' in name or 'save' in name or 'add' in name or 'insert' in name: |
|
operation = 'store' |
|
elif 'get' in name or 'retrieve' in name or 'load' in name or 'fetch' in name: |
|
operation = 'get' |
|
elif 'update' in name or 'modify' in name: |
|
operation = 'update' |
|
elif 'delete' in name or 'remove' in name: |
|
operation = 'delete' |
|
else: |
|
continue |
|
|
|
original_method = getattr(memory_class, name) |
|
|
|
@functools.wraps(original_method) |
|
def memory_method_with_logging(self, *args, method_name=name, op_type=operation, **kwargs): |
|
|
|
key = None |
|
if args: |
|
key = str(args[0]) |
|
elif 'key' in kwargs: |
|
key = kwargs['key'] |
|
|
|
|
|
value_type = "unknown" |
|
if op_type in ['store', 'update']: |
|
value = args[1] if len(args) > 1 else kwargs.get('value') |
|
if value is not None: |
|
value_type = type(value).__name__ |
|
|
|
try: |
|
|
|
with TimingContext(f"memory_{op_type}", "memory"): |
|
result = original_method(self, *args, **kwargs) |
|
|
|
|
|
log_memory_operation(op_type, key, value_type, True) |
|
|
|
return result |
|
except Exception as e: |
|
|
|
log_memory_operation(op_type, key, value_type, False, str(e)) |
|
|
|
|
|
log_error(e, context={"operation": op_type, "key": key}) |
|
|
|
raise |
|
|
|
|
|
def create_closure(method_name, op_type): |
|
@functools.wraps(original_method) |
|
def wrapper(self, *args, **kwargs): |
|
return memory_method_with_logging(self, *args, method_name=method_name, op_type=op_type, **kwargs) |
|
return wrapper |
|
|
|
|
|
setattr(memory_class, name, create_closure(name, operation)) |
|
|
|
def integrate_graph_logging(graph_module): |
|
""" |
|
Integrate logging with the LangGraph workflow. |
|
|
|
This function monkey-patches the graph module functions to add logging. |
|
|
|
Args: |
|
graph_module: The graph module to patch |
|
""" |
|
|
|
for name in dir(graph_module): |
|
if name.startswith('_'): |
|
continue |
|
|
|
attr = getattr(graph_module, name) |
|
if not callable(attr): |
|
continue |
|
|
|
original_func = attr |
|
|
|
@functools.wraps(original_func) |
|
def graph_func_with_logging(*args, func_name=name, **kwargs): |
|
|
|
log_workflow_step(func_name, f"Executing graph step: {func_name}", |
|
inputs=kwargs if kwargs else {"args": str(args)}) |
|
|
|
|
|
with TimingContext(f"graph_{func_name}", "graph"): |
|
try: |
|
result = original_func(*args, **kwargs) |
|
|
|
|
|
log_workflow_step(func_name, f"Completed graph step: {func_name}", |
|
outputs={"result": str(result)[:200] + "..." if isinstance(result, str) and len(str(result)) > 200 else str(result)}) |
|
|
|
return result |
|
except Exception as e: |
|
|
|
log_error(e, context={"step": func_name, "inputs": kwargs if kwargs else {"args": str(args)}}) |
|
|
|
raise |
|
|
|
|
|
def create_closure(func_name): |
|
@functools.wraps(original_func) |
|
def wrapper(*args, **kwargs): |
|
return graph_func_with_logging(*args, func_name=func_name, **kwargs) |
|
return wrapper |
|
|
|
|
|
setattr(graph_module, name, create_closure(name)) |
|
|
|
def setup_logging_integration(): |
|
""" |
|
Set up logging integration for all Gaia components. |
|
|
|
This function integrates logging with all major components of the Gaia system. |
|
""" |
|
|
|
initialize_logging(verbose=True) |
|
|
|
|
|
from agent.agent import GAIAAgent |
|
from agent.tool_registry import ToolRegistry |
|
from agent import graph |
|
|
|
try: |
|
|
|
integrate_agent_logging(GAIAAgent) |
|
|
|
|
|
integrate_tool_registry_logging(ToolRegistry) |
|
|
|
|
|
integrate_graph_logging(graph) |
|
|
|
|
|
try: |
|
from langchain.chat_models import ChatOpenAI |
|
patch_api_client(ChatOpenAI, "OpenAI") |
|
except ImportError: |
|
pass |
|
|
|
try: |
|
from tools.web_tools import SerperSearchTool |
|
patch_api_client(SerperSearchTool, "Serper") |
|
except ImportError: |
|
pass |
|
|
|
try: |
|
from tools.perplexity_tool import PerplexityTool |
|
patch_api_client(PerplexityTool, "Perplexity") |
|
except ImportError: |
|
pass |
|
|
|
|
|
try: |
|
from memory.supabase_memory import SupabaseMemory |
|
integrate_memory_logging(SupabaseMemory) |
|
except ImportError: |
|
pass |
|
|
|
return True |
|
except Exception as e: |
|
print(f"Error setting up logging integration: {str(e)}") |
|
traceback.print_exc() |
|
return False |
|
|
|
if __name__ == "__main__": |
|
|
|
setup_logging_integration() |
|
print("Logging integration set up successfully") |