|
""" |
|
Log Analyzer for Gaia System |
|
|
|
This module provides tools for analyzing log files generated by the Gaia system. |
|
It can parse log files, identify patterns, correlate errors across components, |
|
generate reports, and visualize execution flow. |
|
|
|
Features: |
|
- Parse and analyze log files |
|
- Identify error patterns and frequencies |
|
- Correlate errors across different components |
|
- Generate diagnostic reports |
|
- Visualize execution flow and timing |
|
""" |
|
|
|
import os |
|
import re |
|
import json |
|
import logging |
|
import datetime |
|
from typing import Dict, List, Any, Optional, Tuple, Set |
|
from collections import defaultdict, Counter |
|
import pandas as pd |
|
import matplotlib.pyplot as plt |
|
import numpy as np |
|
from pathlib import Path |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" |
|
) |
|
logger = logging.getLogger("gaia_log_analyzer") |
|
|
|
|
|
DEFAULT_LOG_DIR = "logs" |
|
|
|
class LogEntry: |
|
"""Represents a parsed log entry.""" |
|
|
|
def __init__( |
|
self, |
|
timestamp: datetime.datetime, |
|
logger_name: str, |
|
level: str, |
|
trace_id: str, |
|
message: str, |
|
details: Optional[Dict[str, Any]] = None |
|
): |
|
self.timestamp = timestamp |
|
self.logger_name = logger_name |
|
self.level = level |
|
self.trace_id = trace_id |
|
self.message = message |
|
self.details = details or {} |
|
|
|
def __repr__(self): |
|
return f"LogEntry({self.timestamp}, {self.logger_name}, {self.level}, {self.trace_id}, {self.message[:30]}...)" |
|
|
|
@classmethod |
|
def from_line(cls, line: str) -> Optional['LogEntry']: |
|
""" |
|
Parse a log line into a LogEntry object. |
|
|
|
Args: |
|
line: The log line to parse |
|
|
|
Returns: |
|
LogEntry object or None if parsing failed |
|
""" |
|
|
|
|
|
pattern = r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) - ([^-]+) - ([^-]+) - \[([^\]]+)\] - (.+)" |
|
match = re.match(pattern, line) |
|
|
|
if not match: |
|
return None |
|
|
|
timestamp_str, logger_name, level, trace_id, message = match.groups() |
|
|
|
try: |
|
timestamp = datetime.datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S,%f") |
|
except ValueError: |
|
timestamp = datetime.datetime.now() |
|
|
|
|
|
details = None |
|
if ": {" in message and message.endswith("}"): |
|
try: |
|
json_start = message.index(": {") + 2 |
|
json_str = message[json_start:] |
|
details = json.loads(json_str) |
|
message = message[:json_start-2] |
|
except (ValueError, json.JSONDecodeError): |
|
pass |
|
|
|
return cls( |
|
timestamp=timestamp, |
|
logger_name=logger_name.strip(), |
|
level=level.strip(), |
|
trace_id=trace_id.strip(), |
|
message=message.strip(), |
|
details=details |
|
) |
|
class LogAnalyzer: |
|
""" |
|
Analyzes log files to identify patterns and generate reports. |
|
""" |
|
|
|
def __init__(self, log_dir: str = DEFAULT_LOG_DIR): |
|
""" |
|
Initialize the log analyzer. |
|
|
|
Args: |
|
log_dir: Directory containing log files |
|
""" |
|
self.log_dir = log_dir |
|
self.log_files = { |
|
"main": os.path.join(log_dir, "gaia_main.log"), |
|
"error": os.path.join(log_dir, "gaia_errors.log"), |
|
"performance": os.path.join(log_dir, "gaia_performance.log"), |
|
"api": os.path.join(log_dir, "gaia_api.log"), |
|
"tool": os.path.join(log_dir, "gaia_tools.log") |
|
} |
|
|
|
|
|
self.entries_by_trace: Dict[str, List[LogEntry]] = defaultdict(list) |
|
self.errors_by_trace: Dict[str, List[LogEntry]] = defaultdict(list) |
|
self.api_calls_by_trace: Dict[str, List[LogEntry]] = defaultdict(list) |
|
self.tool_usage_by_trace: Dict[str, List[LogEntry]] = defaultdict(list) |
|
self.performance_by_trace: Dict[str, List[LogEntry]] = defaultdict(list) |
|
|
|
|
|
self.error_counts: Counter = Counter() |
|
self.api_error_counts: Counter = Counter() |
|
self.tool_error_counts: Counter = Counter() |
|
self.trace_durations: Dict[str, float] = {} |
|
self.trace_error_counts: Dict[str, int] = {} |
|
|
|
|
|
self._load_logs() |
|
|
|
def _load_logs(self): |
|
"""Load and parse all log files.""" |
|
logger.info(f"Loading logs from {self.log_dir}") |
|
|
|
for log_type, log_file in self.log_files.items(): |
|
if not os.path.exists(log_file): |
|
logger.warning(f"Log file not found: {log_file}") |
|
continue |
|
|
|
logger.info(f"Parsing {log_type} log: {log_file}") |
|
self._parse_log_file(log_file, log_type) |
|
|
|
logger.info(f"Parsed logs for {len(self.entries_by_trace)} trace IDs") |
|
|
|
|
|
self._calculate_statistics() |
|
|
|
def _parse_log_file(self, log_file: str, log_type: str): |
|
""" |
|
Parse a log file and extract entries. |
|
|
|
Args: |
|
log_file: Path to the log file |
|
log_type: Type of log file (main, error, etc.) |
|
""" |
|
try: |
|
with open(log_file, 'r', encoding='utf-8') as f: |
|
for line in f: |
|
entry = LogEntry.from_line(line.strip()) |
|
if entry: |
|
|
|
self.entries_by_trace[entry.trace_id].append(entry) |
|
|
|
|
|
if log_type == "error" or entry.level in ["ERROR", "CRITICAL"]: |
|
self.errors_by_trace[entry.trace_id].append(entry) |
|
elif log_type == "api": |
|
self.api_calls_by_trace[entry.trace_id].append(entry) |
|
elif log_type == "tool": |
|
self.tool_usage_by_trace[entry.trace_id].append(entry) |
|
elif log_type == "performance": |
|
self.performance_by_trace[entry.trace_id].append(entry) |
|
except Exception as e: |
|
logger.error(f"Error parsing log file {log_file}: {str(e)}") |
|
|
|
def _calculate_statistics(self): |
|
"""Calculate statistics from parsed logs.""" |
|
|
|
for trace_id, errors in self.errors_by_trace.items(): |
|
for error in errors: |
|
if "ERROR_DETAILS" in error.message: |
|
if error.details and "error_type" in error.details: |
|
self.error_counts[error.details["error_type"]] += 1 |
|
else: |
|
|
|
error_type_match = re.search(r"ERROR: ([^-]+) -", error.message) |
|
if error_type_match: |
|
self.error_counts[error_type_match.group(1).strip()] += 1 |
|
else: |
|
self.error_counts["Unknown"] += 1 |
|
|
|
|
|
self.trace_error_counts[trace_id] = len(errors) |
|
|
|
|
|
for trace_id, api_calls in self.api_calls_by_trace.items(): |
|
for call in api_calls: |
|
if "API ERROR" in call.message: |
|
api_name = "Unknown" |
|
api_match = re.search(r"API ERROR: ([^-]+) -", call.message) |
|
if api_match: |
|
api_name = api_match.group(1).strip() |
|
self.api_error_counts[api_name] += 1 |
|
|
|
|
|
for trace_id, tool_usages in self.tool_usage_by_trace.items(): |
|
for usage in tool_usages: |
|
if "TOOL EXECUTION FAILURE" in usage.message: |
|
tool_name = "Unknown" |
|
tool_match = re.search(r"TOOL EXECUTION FAILURE: ([^-]+) -", usage.message) |
|
if tool_match: |
|
tool_name = tool_match.group(1).strip() |
|
self.tool_error_counts[tool_name] += 1 |
|
|
|
|
|
for trace_id, entries in self.entries_by_trace.items(): |
|
if entries: |
|
|
|
sorted_entries = sorted(entries, key=lambda e: e.timestamp) |
|
start_time = sorted_entries[0].timestamp |
|
end_time = sorted_entries[-1].timestamp |
|
duration = (end_time - start_time).total_seconds() |
|
self.trace_durations[trace_id] = duration |
|
|
|
def get_error_summary(self) -> Dict[str, Any]: |
|
""" |
|
Get a summary of errors. |
|
|
|
Returns: |
|
Dict containing error statistics |
|
""" |
|
return { |
|
"total_errors": sum(self.error_counts.values()), |
|
"error_types": dict(self.error_counts.most_common()), |
|
"api_errors": dict(self.api_error_counts.most_common()), |
|
"tool_errors": dict(self.tool_error_counts.most_common()), |
|
"traces_with_errors": len(self.errors_by_trace), |
|
"avg_errors_per_trace": sum(self.trace_error_counts.values()) / len(self.trace_error_counts) if self.trace_error_counts else 0 |
|
} |
|
|
|
def get_performance_summary(self) -> Dict[str, Any]: |
|
""" |
|
Get a summary of performance metrics. |
|
|
|
Returns: |
|
Dict containing performance statistics |
|
""" |
|
|
|
timing_data = [] |
|
|
|
for trace_id, entries in self.performance_by_trace.items(): |
|
for entry in entries: |
|
if "TIMING_DATA" in entry.message and entry.details: |
|
timing_data.append(entry.details) |
|
|
|
|
|
timing_by_category = defaultdict(list) |
|
for data in timing_data: |
|
if "category" in data and "duration" in data: |
|
timing_by_category[data["category"]].append(data["duration"]) |
|
|
|
|
|
category_stats = {} |
|
for category, durations in timing_by_category.items(): |
|
if durations: |
|
category_stats[category] = { |
|
"count": len(durations), |
|
"avg_duration": sum(durations) / len(durations), |
|
"min_duration": min(durations), |
|
"max_duration": max(durations), |
|
"total_duration": sum(durations) |
|
} |
|
|
|
return { |
|
"trace_count": len(self.entries_by_trace), |
|
"avg_trace_duration": sum(self.trace_durations.values()) / len(self.trace_durations) if self.trace_durations else 0, |
|
"min_trace_duration": min(self.trace_durations.values()) if self.trace_durations else 0, |
|
"max_trace_duration": max(self.trace_durations.values()) if self.trace_durations else 0, |
|
"category_stats": category_stats |
|
} |
|
|
|
def get_api_summary(self) -> Dict[str, Any]: |
|
""" |
|
Get a summary of API usage. |
|
|
|
Returns: |
|
Dict containing API usage statistics |
|
""" |
|
|
|
api_calls = [] |
|
|
|
for trace_id, entries in self.api_calls_by_trace.items(): |
|
for entry in entries: |
|
if "API REQUEST DETAILS" in entry.message and entry.details: |
|
api_calls.append({ |
|
"trace_id": trace_id, |
|
"type": "request", |
|
"api_name": entry.details.get("api_name", "Unknown"), |
|
"endpoint": entry.details.get("endpoint", "Unknown"), |
|
"method": entry.details.get("method", "Unknown"), |
|
"timestamp": entry.timestamp |
|
}) |
|
elif "API RESPONSE DETAILS" in entry.message and entry.details: |
|
api_calls.append({ |
|
"trace_id": trace_id, |
|
"type": "response", |
|
"api_name": entry.details.get("api_name", "Unknown"), |
|
"endpoint": entry.details.get("endpoint", "Unknown"), |
|
"status_code": entry.details.get("status_code", 0), |
|
"duration": entry.details.get("duration", 0), |
|
"timestamp": entry.timestamp |
|
}) |
|
|
|
|
|
api_call_counts = Counter() |
|
api_error_counts = Counter() |
|
api_durations = defaultdict(list) |
|
|
|
for call in api_calls: |
|
if call["type"] == "request": |
|
api_call_counts[call["api_name"]] += 1 |
|
elif call["type"] == "response": |
|
if call["status_code"] >= 400: |
|
api_error_counts[call["api_name"]] += 1 |
|
if "duration" in call: |
|
api_durations[call["api_name"]].append(call["duration"]) |
|
|
|
|
|
api_avg_durations = {} |
|
for api_name, durations in api_durations.items(): |
|
if durations: |
|
api_avg_durations[api_name] = sum(durations) / len(durations) |
|
|
|
return { |
|
"total_api_calls": sum(api_call_counts.values()), |
|
"api_call_counts": dict(api_call_counts.most_common()), |
|
"api_error_counts": dict(api_error_counts.most_common()), |
|
"api_avg_durations": api_avg_durations |
|
} |
|
|
|
def get_tool_summary(self) -> Dict[str, Any]: |
|
""" |
|
Get a summary of tool usage. |
|
|
|
Returns: |
|
Dict containing tool usage statistics |
|
""" |
|
|
|
tool_usages = [] |
|
|
|
for trace_id, entries in self.tool_usage_by_trace.items(): |
|
for entry in entries: |
|
if "TOOL SELECTION DETAILS" in entry.message and entry.details: |
|
tool_usages.append({ |
|
"trace_id": trace_id, |
|
"type": "selection", |
|
"tool_name": entry.details.get("tool_name", "Unknown"), |
|
"reason": entry.details.get("reason", "Unknown"), |
|
"timestamp": entry.timestamp |
|
}) |
|
elif "TOOL EXECUTION DETAILS" in entry.message and entry.details: |
|
tool_usages.append({ |
|
"trace_id": trace_id, |
|
"type": "execution", |
|
"tool_name": entry.details.get("tool_name", "Unknown"), |
|
"success": entry.details.get("success", False), |
|
"duration": entry.details.get("duration", 0), |
|
"timestamp": entry.timestamp |
|
}) |
|
|
|
|
|
tool_selection_counts = Counter() |
|
tool_execution_counts = Counter() |
|
tool_success_counts = Counter() |
|
tool_failure_counts = Counter() |
|
tool_durations = defaultdict(list) |
|
|
|
for usage in tool_usages: |
|
if usage["type"] == "selection": |
|
tool_selection_counts[usage["tool_name"]] += 1 |
|
elif usage["type"] == "execution": |
|
tool_execution_counts[usage["tool_name"]] += 1 |
|
if usage["success"]: |
|
tool_success_counts[usage["tool_name"]] += 1 |
|
else: |
|
tool_failure_counts[usage["tool_name"]] += 1 |
|
if "duration" in usage: |
|
tool_durations[usage["tool_name"]].append(usage["duration"]) |
|
|
|
|
|
tool_avg_durations = {} |
|
tool_success_rates = {} |
|
|
|
for tool_name, durations in tool_durations.items(): |
|
if durations: |
|
tool_avg_durations[tool_name] = sum(durations) / len(durations) |
|
|
|
for tool_name, executions in tool_execution_counts.items(): |
|
if executions > 0: |
|
successes = tool_success_counts.get(tool_name, 0) |
|
tool_success_rates[tool_name] = (successes / executions) * 100 |
|
|
|
return { |
|
"total_tool_selections": sum(tool_selection_counts.values()), |
|
"total_tool_executions": sum(tool_execution_counts.values()), |
|
"tool_selection_counts": dict(tool_selection_counts.most_common()), |
|
"tool_execution_counts": dict(tool_execution_counts.most_common()), |
|
"tool_success_counts": dict(tool_success_counts.most_common()), |
|
"tool_failure_counts": dict(tool_failure_counts.most_common()), |
|
"tool_avg_durations": tool_avg_durations, |
|
"tool_success_rates": tool_success_rates |
|
} |
|
|
|
def get_trace_summary(self, trace_id: str) -> Dict[str, Any]: |
|
""" |
|
Get a summary of a specific trace. |
|
|
|
Args: |
|
trace_id: The trace ID to summarize |
|
|
|
Returns: |
|
Dict containing trace summary |
|
""" |
|
if trace_id not in self.entries_by_trace: |
|
return {"error": f"Trace ID {trace_id} not found"} |
|
|
|
entries = self.entries_by_trace[trace_id] |
|
errors = self.errors_by_trace.get(trace_id, []) |
|
api_calls = self.api_calls_by_trace.get(trace_id, []) |
|
tool_usages = self.tool_usage_by_trace.get(trace_id, []) |
|
performance = self.performance_by_trace.get(trace_id, []) |
|
|
|
|
|
sorted_entries = sorted(entries, key=lambda e: e.timestamp) |
|
|
|
|
|
workflow_steps = [] |
|
for entry in entries: |
|
if "WORKFLOW STEP" in entry.message: |
|
step_match = re.search(r"WORKFLOW STEP: ([^-]+) - (.+)", entry.message) |
|
if step_match: |
|
step_name, description = step_match.groups() |
|
workflow_steps.append({ |
|
"step_name": step_name.strip(), |
|
"description": description.strip(), |
|
"timestamp": entry.timestamp |
|
}) |
|
|
|
|
|
timing_data = [] |
|
for entry in performance: |
|
if "TIMING_DATA" in entry.message and entry.details: |
|
timing_data.append(entry.details) |
|
|
|
|
|
duration = 0 |
|
if sorted_entries: |
|
start_time = sorted_entries[0].timestamp |
|
end_time = sorted_entries[-1].timestamp |
|
duration = (end_time - start_time).total_seconds() |
|
|
|
return { |
|
"trace_id": trace_id, |
|
"entry_count": len(entries), |
|
"error_count": len(errors), |
|
"api_call_count": len(api_calls), |
|
"tool_usage_count": len(tool_usages), |
|
"duration": duration, |
|
"start_time": sorted_entries[0].timestamp if sorted_entries else None, |
|
"end_time": sorted_entries[-1].timestamp if sorted_entries else None, |
|
"workflow_steps": workflow_steps, |
|
"timing_data": timing_data, |
|
"errors": [{"message": e.message, "timestamp": e.timestamp} for e in errors] |
|
} |
|
|
|
def find_common_error_patterns(self) -> List[Dict[str, Any]]: |
|
""" |
|
Find common error patterns across traces. |
|
|
|
Returns: |
|
List of error patterns with frequency and examples |
|
""" |
|
|
|
error_messages = [] |
|
|
|
for trace_id, errors in self.errors_by_trace.items(): |
|
for error in errors: |
|
error_type = "Unknown" |
|
error_msg = error.message |
|
|
|
|
|
if "ERROR_DETAILS" in error.message and error.details: |
|
if "error_type" in error.details: |
|
error_type = error.details["error_type"] |
|
if "error_message" in error.details: |
|
error_msg = error.details["error_message"] |
|
else: |
|
|
|
error_match = re.search(r"ERROR: ([^-]+) - (.+)", error.message) |
|
if error_match: |
|
error_type, error_msg = error_match.groups() |
|
|
|
error_messages.append({ |
|
"trace_id": trace_id, |
|
"error_type": error_type.strip(), |
|
"error_message": error_msg.strip(), |
|
"timestamp": error.timestamp |
|
}) |
|
|
|
|
|
error_patterns = defaultdict(list) |
|
|
|
for error in error_messages: |
|
|
|
|
|
simplified_msg = re.sub(r'\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b', '<UUID>', error["error_message"]) |
|
simplified_msg = re.sub(r'\b\d+\.\d+\.\d+\.\d+\b', '<IP>', simplified_msg) |
|
simplified_msg = re.sub(r'\b\d{4}-\d{2}-\d{2}\b', '<DATE>', simplified_msg) |
|
simplified_msg = re.sub(r'\b\d+\b', '<NUM>', simplified_msg) |
|
|
|
key = f"{error['error_type']}:{simplified_msg}" |
|
error_patterns[key].append(error) |
|
|
|
|
|
patterns = [] |
|
|
|
for key, errors in error_patterns.items(): |
|
if len(errors) >= 2: |
|
patterns.append({ |
|
"error_type": errors[0]["error_type"], |
|
"pattern": errors[0]["error_message"], |
|
"count": len(errors), |
|
"examples": [e["trace_id"] for e in errors[:5]], |
|
"first_occurrence": min(e["timestamp"] for e in errors), |
|
"last_occurrence": max(e["timestamp"] for e in errors) |
|
}) |
|
|
|
|
|
patterns.sort(key=lambda p: p["count"], reverse=True) |
|
|
|
return patterns |
|
|
|
def find_correlated_errors(self) -> List[Dict[str, Any]]: |
|
""" |
|
Find errors that are correlated (occur together). |
|
|
|
Returns: |
|
List of correlated error pairs |
|
""" |
|
|
|
errors_by_trace = {} |
|
|
|
for trace_id, errors in self.errors_by_trace.items(): |
|
error_types = set() |
|
|
|
for error in errors: |
|
error_type = "Unknown" |
|
|
|
if "ERROR_DETAILS" in error.message and error.details: |
|
if "error_type" in error.details: |
|
error_type = error.details["error_type"] |
|
else: |
|
|
|
error_match = re.search(r"ERROR: ([^-]+) -", error.message) |
|
if error_match: |
|
error_type = error_match.group(1).strip() |
|
|
|
error_types.add(error_type) |
|
|
|
errors_by_trace[trace_id] = error_types |
|
|
|
|
|
error_pairs = Counter() |
|
|
|
for trace_id, error_types in errors_by_trace.items(): |
|
if len(error_types) >= 2: |
|
|
|
for error1 in error_types: |
|
for error2 in error_types: |
|
if error1 < error2: |
|
error_pairs[(error1, error2)] += 1 |
|
|
|
|
|
correlations = [] |
|
|
|
for (error1, error2), count in error_pairs.most_common(): |
|
correlations.append({ |
|
"error_types": [error1, error2], |
|
"count": count, |
|
"examples": [ |
|
trace_id for trace_id, error_types in errors_by_trace.items() |
|
if error1 in error_types and error2 in error_types |
|
][:5] |
|
}) |
|
|
|
return correlations |
|
|
|
def identify_bottlenecks(self) -> List[Dict[str, Any]]: |
|
""" |
|
Identify performance bottlenecks. |
|
|
|
Returns: |
|
List of identified bottlenecks |
|
""" |
|
|
|
timing_data = [] |
|
|
|
for trace_id, entries in self.performance_by_trace.items(): |
|
for entry in entries: |
|
if "TIMING_DATA" in entry.message and entry.details: |
|
timing_data.append({ |
|
"trace_id": trace_id, |
|
**entry.details |
|
}) |
|
|
|
|
|
timing_by_key = defaultdict(list) |
|
|
|
for data in timing_data: |
|
if "category" in data and "duration" in data: |
|
key = data["category"] |
|
|
|
if "function" in data: |
|
key += f":{data['function']}" |
|
elif "name" in data: |
|
key += f":{data['name']}" |
|
|
|
timing_by_key[key].append(data["duration"]) |
|
|
|
|
|
bottlenecks = [] |
|
|
|
for key, durations in timing_by_key.items(): |
|
if len(durations) >= 3: |
|
avg_duration = sum(durations) / len(durations) |
|
max_duration = max(durations) |
|
min_duration = min(durations) |
|
total_duration = sum(durations) |
|
|
|
|
|
|
|
is_bottleneck = avg_duration > 1.0 or (max_duration / min_duration > 5.0 if min_duration > 0 else False) |
|
|
|
if is_bottleneck: |
|
category, name = key.split(":") if ":" in key else (key, "") |
|
|
|
bottlenecks.append({ |
|
"category": category, |
|
"name": name, |
|
"count": len(durations), |
|
"avg_duration": avg_duration, |
|
"min_duration": min_duration, |
|
"max_duration": max_duration, |
|
"total_duration": total_duration, |
|
"variability": max_duration / min_duration if min_duration > 0 else float('inf') |
|
}) |
|
|
|
|
|
bottlenecks.sort(key=lambda b: b["avg_duration"], reverse=True) |
|
|
|
return bottlenecks |
|
|
|
def generate_diagnostic_report(self, output_file: str = "gaia_diagnostic_report.json") -> Dict[str, Any]: |
|
""" |
|
Generate a comprehensive diagnostic report. |
|
|
|
Args: |
|
output_file: File to save the report to |
|
|
|
Returns: |
|
Dict containing the diagnostic report |
|
""" |
|
|
|
report = { |
|
"timestamp": datetime.datetime.now().isoformat(), |
|
"trace_count": len(self.entries_by_trace), |
|
"error_summary": self.get_error_summary(), |
|
"performance_summary": self.get_performance_summary(), |
|
"api_summary": self.get_api_summary(), |
|
"tool_summary": self.get_tool_summary(), |
|
"error_patterns": self.find_common_error_patterns(), |
|
"correlated_errors": self.find_correlated_errors(), |
|
"bottlenecks": self.identify_bottlenecks() |
|
} |
|
|
|
|
|
with open(output_file, 'w', encoding='utf-8') as f: |
|
json.dump(report, f, indent=2, default=str) |
|
|
|
logger.info(f"Diagnostic report saved to {output_file}") |
|
|
|
return report |
|
|
|
def visualize_error_distribution(self, output_file: str = "error_distribution.png"): |
|
""" |
|
Visualize the distribution of errors by type. |
|
|
|
Args: |
|
output_file: File to save the visualization to |
|
""" |
|
if not self.error_counts: |
|
logger.warning("No errors found to visualize") |
|
return |
|
|
|
|
|
top_errors = self.error_counts.most_common(10) |
|
|
|
|
|
plt.figure(figsize=(12, 6)) |
|
|
|
labels = [e[0] for e in top_errors] |
|
values = [e[1] for e in top_errors] |
|
|
|
plt.bar(labels, values, color='salmon') |
|
plt.xlabel('Error Type') |
|
plt.ylabel('Count') |
|
plt.title('Error Distribution by Type') |
|
plt.xticks(rotation=45, ha='right') |
|
plt.tight_layout() |
|
|
|
|
|
plt.savefig(output_file) |
|
logger.info(f"Error distribution visualization saved to {output_file}") |
|
|
|
plt.close() |
|
|
|
def visualize_performance_breakdown(self, output_file: str = "performance_breakdown.png"): |
|
""" |
|
Visualize the performance breakdown by category. |
|
|
|
Args: |
|
output_file: File to save the visualization to |
|
""" |
|
|
|
timing_by_category = defaultdict(list) |
|
|
|
for trace_id, entries in self.performance_by_trace.items(): |
|
for entry in entries: |
|
if "TIMING_DATA" in entry.message and entry.details: |
|
if "category" in entry.details and "duration" in entry.details: |
|
timing_by_category[entry.details["category"]].append(entry.details["duration"]) |
|
|
|
if not timing_by_category: |
|
logger.warning("No performance data found to visualize") |
|
return |
|
|
|
|
|
categories = [] |
|
avg_durations = [] |
|
|
|
for category, durations in timing_by_category.items(): |
|
if durations: |
|
categories.append(category) |
|
avg_durations.append(sum(durations) / len(durations)) |
|
|
|
|
|
sorted_indices = np.argsort(avg_durations)[::-1] |
|
categories = [categories[i] for i in sorted_indices] |
|
avg_durations = [avg_durations[i] for i in sorted_indices] |
|
|
|
|
|
plt.figure(figsize=(12, 6)) |
|
|
|
plt.bar(categories, avg_durations, color='skyblue') |
|
plt.xlabel('Category') |
|
plt.ylabel('Average Duration (seconds)') |
|
plt.title('Performance Breakdown by Category') |
|
plt.xticks(rotation=45, ha='right') |
|
plt.tight_layout() |
|
|
|
|
|
plt.savefig(output_file) |
|
logger.info(f"Performance breakdown visualization saved to {output_file}") |
|
|
|
plt.close() |
|
def main(): |
|
"""Main entry point for the log analyzer.""" |
|
import argparse |
|
|
|
parser = argparse.ArgumentParser(description="Gaia Log Analyzer") |
|
parser.add_argument("--log-dir", type=str, default="logs", help="Directory containing log files") |
|
parser.add_argument("--report", type=str, default="gaia_diagnostic_report.json", help="Output file for diagnostic report") |
|
parser.add_argument("--trace-id", type=str, help="Specific trace ID to analyze") |
|
parser.add_argument("--visualize", action="store_true", help="Generate visualizations") |
|
args = parser.parse_args() |
|
|
|
|
|
analyzer = LogAnalyzer(log_dir=args.log_dir) |
|
|
|
|
|
report = analyzer.generate_diagnostic_report(output_file=args.report) |
|
|
|
|
|
print(f"Analyzed {report['trace_count']} traces") |
|
print(f"Found {report['error_summary']['total_errors']} errors") |
|
print(f"Average trace duration: {report['performance_summary']['avg_trace_duration']:.2f} seconds") |
|
|
|
|
|
if args.visualize: |
|
analyzer.visualize_error_distribution() |
|
analyzer.visualize_performance_breakdown() |
|
|
|
|
|
if args.trace_id: |
|
trace_summary = analyzer.get_trace_summary(args.trace_id) |
|
if "error" in trace_summary: |
|
print(f"Error: {trace_summary['error']}") |
|
else: |
|
print(f"\nTrace ID: {args.trace_id}") |
|
print(f"Duration: {trace_summary['duration']:.2f} seconds") |
|
print(f"Errors: {trace_summary['error_count']}") |
|
print(f"API calls: {trace_summary['api_call_count']}") |
|
print(f"Tool usages: {trace_summary['tool_usage_count']}") |
|
|
|
if trace_summary['workflow_steps']: |
|
print("\nWorkflow steps:") |
|
for step in trace_summary['workflow_steps']: |
|
print(f"- {step['step_name']}: {step['description']}") |
|
|
|
if trace_summary['errors']: |
|
print("\nErrors:") |
|
for error in trace_summary['errors']: |
|
print(f"- {error['message']}") |
|
|
|
if __name__ == "__main__": |
|
main() |