gaia-enhanced-agent / tools /code_analyzer.py
GAIA Agent Deployment
Deploy Complete Enhanced GAIA Agent with Phase 1-6 Improvements
9a6a4dc
"""
Code Analysis Tool for GAIA Agent
Python code parsing, analysis, and execution flow prediction.
Features:
- Python code parsing and AST analysis
- Dependency detection and import analysis
- Execution flow analysis and variable tracking
- Output prediction and result estimation
- Code optimization suggestions
- Error detection and debugging assistance
"""
import ast
import logging
import re
import sys
import inspect
import importlib
from typing import Dict, Any, List, Optional, Set, Tuple, Union
from pathlib import Path
import json
logger = logging.getLogger(__name__)
class CodeStructureAnalyzer:
"""Analyze Python code structure and components."""
def __init__(self):
"""Initialize the code structure analyzer."""
self.builtin_functions = set(dir(__builtins__))
self.standard_modules = {
'math', 'os', 'sys', 'json', 'csv', 'datetime', 'time',
'random', 'collections', 'itertools', 'functools', 'operator',
'string', 're', 'urllib', 'http', 'pathlib', 'typing',
'decimal', 'fractions', 'statistics', 'cmath'
}
def analyze_code_structure(self, code: str) -> Dict[str, Any]:
"""
Analyze the structure of Python code.
Args:
code: Python code to analyze
Returns:
Dictionary with code structure information
"""
try:
tree = ast.parse(code)
analysis = {
'imports': self._extract_imports(tree),
'functions': self._extract_functions(tree),
'classes': self._extract_classes(tree),
'variables': self._extract_variables(tree),
'constants': self._extract_constants(tree),
'control_flow': self._analyze_control_flow(tree),
'complexity': self._calculate_complexity(tree),
'dependencies': self._analyze_dependencies(tree),
'potential_outputs': self._predict_outputs(tree),
'syntax_valid': True
}
return analysis
except SyntaxError as e:
return {
'syntax_valid': False,
'syntax_error': str(e),
'line_number': e.lineno,
'error_text': e.text
}
except Exception as e:
logger.error(f"Code analysis failed: {e}")
return {
'syntax_valid': False,
'analysis_error': str(e)
}
def _extract_imports(self, tree: ast.AST) -> List[Dict[str, Any]]:
"""Extract import statements from AST."""
imports = []
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
imports.append({
'type': 'import',
'module': alias.name,
'alias': alias.asname,
'is_standard': alias.name.split('.')[0] in self.standard_modules
})
elif isinstance(node, ast.ImportFrom):
module = node.module or ''
for alias in node.names:
imports.append({
'type': 'from_import',
'module': module,
'name': alias.name,
'alias': alias.asname,
'is_standard': module.split('.')[0] in self.standard_modules
})
return imports
def _extract_functions(self, tree: ast.AST) -> List[Dict[str, Any]]:
"""Extract function definitions from AST."""
functions = []
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
functions.append({
'name': node.name,
'args': [arg.arg for arg in node.args.args],
'defaults': len(node.args.defaults),
'returns': ast.unparse(node.returns) if node.returns else None,
'docstring': ast.get_docstring(node),
'line_number': node.lineno,
'is_async': False
})
elif isinstance(node, ast.AsyncFunctionDef):
functions.append({
'name': node.name,
'args': [arg.arg for arg in node.args.args],
'defaults': len(node.args.defaults),
'returns': ast.unparse(node.returns) if node.returns else None,
'docstring': ast.get_docstring(node),
'line_number': node.lineno,
'is_async': True
})
return functions
def _extract_classes(self, tree: ast.AST) -> List[Dict[str, Any]]:
"""Extract class definitions from AST."""
classes = []
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
methods = []
for item in node.body:
if isinstance(item, ast.FunctionDef):
methods.append({
'name': item.name,
'args': [arg.arg for arg in item.args.args],
'is_property': any(
isinstance(d, ast.Name) and d.id == 'property'
for d in item.decorator_list
)
})
classes.append({
'name': node.name,
'bases': [ast.unparse(base) for base in node.bases],
'methods': methods,
'docstring': ast.get_docstring(node),
'line_number': node.lineno
})
return classes
def _extract_variables(self, tree: ast.AST) -> List[Dict[str, Any]]:
"""Extract variable assignments from AST."""
variables = []
for node in ast.walk(tree):
if isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name):
variables.append({
'name': target.id,
'type': 'assignment',
'value': ast.unparse(node.value),
'line_number': node.lineno
})
elif isinstance(node, ast.AnnAssign) and node.target:
if isinstance(node.target, ast.Name):
variables.append({
'name': node.target.id,
'type': 'annotated_assignment',
'annotation': ast.unparse(node.annotation),
'value': ast.unparse(node.value) if node.value else None,
'line_number': node.lineno
})
return variables
def _extract_constants(self, tree: ast.AST) -> List[Dict[str, Any]]:
"""Extract constant values from AST."""
constants = []
for node in ast.walk(tree):
if isinstance(node, ast.Constant):
constants.append({
'value': node.value,
'type': type(node.value).__name__,
'line_number': node.lineno
})
return constants
def _analyze_control_flow(self, tree: ast.AST) -> Dict[str, Any]:
"""Analyze control flow structures."""
control_flow = {
'if_statements': 0,
'for_loops': 0,
'while_loops': 0,
'try_except': 0,
'with_statements': 0,
'comprehensions': 0,
'max_nesting_depth': 0
}
def calculate_depth(node, current_depth=0):
max_depth = current_depth
for child in ast.iter_child_nodes(node):
if isinstance(child, (ast.If, ast.For, ast.While, ast.Try, ast.With)):
child_depth = calculate_depth(child, current_depth + 1)
max_depth = max(max_depth, child_depth)
else:
child_depth = calculate_depth(child, current_depth)
max_depth = max(max_depth, child_depth)
return max_depth
for node in ast.walk(tree):
if isinstance(node, ast.If):
control_flow['if_statements'] += 1
elif isinstance(node, ast.For):
control_flow['for_loops'] += 1
elif isinstance(node, ast.While):
control_flow['while_loops'] += 1
elif isinstance(node, ast.Try):
control_flow['try_except'] += 1
elif isinstance(node, ast.With):
control_flow['with_statements'] += 1
elif isinstance(node, (ast.ListComp, ast.DictComp, ast.SetComp, ast.GeneratorExp)):
control_flow['comprehensions'] += 1
control_flow['max_nesting_depth'] = calculate_depth(tree)
return control_flow
def _calculate_complexity(self, tree: ast.AST) -> Dict[str, int]:
"""Calculate code complexity metrics."""
complexity = {
'cyclomatic_complexity': 1, # Base complexity
'lines_of_code': len(ast.unparse(tree).split('\n')),
'number_of_nodes': len(list(ast.walk(tree)))
}
# Calculate cyclomatic complexity
for node in ast.walk(tree):
if isinstance(node, (ast.If, ast.While, ast.For, ast.ExceptHandler)):
complexity['cyclomatic_complexity'] += 1
elif isinstance(node, ast.BoolOp):
complexity['cyclomatic_complexity'] += len(node.values) - 1
return complexity
def _analyze_dependencies(self, tree: ast.AST) -> Dict[str, Any]:
"""Analyze code dependencies."""
dependencies = {
'external_modules': set(),
'standard_modules': set(),
'builtin_functions': set(),
'undefined_names': set()
}
# Track defined names
defined_names = set()
# Extract imports
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
module_name = alias.name.split('.')[0]
if module_name in self.standard_modules:
dependencies['standard_modules'].add(alias.name)
else:
dependencies['external_modules'].add(alias.name)
defined_names.add(alias.asname or alias.name)
elif isinstance(node, ast.ImportFrom):
module = node.module or ''
module_name = module.split('.')[0]
if module_name in self.standard_modules:
dependencies['standard_modules'].add(module)
else:
dependencies['external_modules'].add(module)
for alias in node.names:
defined_names.add(alias.asname or alias.name)
# Track function and class definitions
elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
defined_names.add(node.name)
# Track variable assignments
elif isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name):
defined_names.add(target.id)
# Find undefined names
for node in ast.walk(tree):
if isinstance(node, ast.Name) and isinstance(node.ctx, ast.Load):
if (node.id not in defined_names and
node.id not in self.builtin_functions and
not node.id.startswith('_')):
dependencies['undefined_names'].add(node.id)
elif node.id in self.builtin_functions:
dependencies['builtin_functions'].add(node.id)
# Convert sets to lists for JSON serialization
for key in dependencies:
dependencies[key] = list(dependencies[key])
return dependencies
def _predict_outputs(self, tree: ast.AST) -> List[Dict[str, Any]]:
"""Predict potential outputs from code."""
outputs = []
for node in ast.walk(tree):
# Look for print statements
if isinstance(node, ast.Call):
if isinstance(node.func, ast.Name) and node.func.id == 'print':
outputs.append({
'type': 'print',
'line_number': node.lineno,
'args': [ast.unparse(arg) for arg in node.args]
})
# Look for return statements
elif isinstance(node, ast.Return):
outputs.append({
'type': 'return',
'line_number': node.lineno,
'value': ast.unparse(node.value) if node.value else None
})
# Look for expressions that might produce output
elif isinstance(node, ast.Expr):
# Check if it's a standalone expression that would be printed in REPL
if not isinstance(node.value, ast.Call):
outputs.append({
'type': 'expression',
'line_number': node.lineno,
'expression': ast.unparse(node.value)
})
return outputs
class ExecutionFlowAnalyzer:
"""Analyze execution flow and predict behavior."""
def __init__(self):
"""Initialize execution flow analyzer."""
pass
def analyze_execution_flow(self, code: str) -> Dict[str, Any]:
"""
Analyze the execution flow of Python code.
Args:
code: Python code to analyze
Returns:
Execution flow analysis
"""
try:
tree = ast.parse(code)
analysis = {
'execution_order': self._determine_execution_order(tree),
'variable_lifecycle': self._track_variable_lifecycle(tree),
'function_calls': self._extract_function_calls(tree),
'potential_errors': self._detect_potential_errors(tree),
'performance_notes': self._analyze_performance(tree),
'final_result_prediction': self._predict_final_result(tree, code)
}
return analysis
except Exception as e:
logger.error(f"Execution flow analysis failed: {e}")
return {'error': str(e)}
def _determine_execution_order(self, tree: ast.AST) -> List[Dict[str, Any]]:
"""Determine the order of code execution."""
execution_order = []
for i, node in enumerate(tree.body):
if isinstance(node, ast.FunctionDef):
execution_order.append({
'step': i + 1,
'type': 'function_definition',
'name': node.name,
'line': node.lineno
})
elif isinstance(node, ast.ClassDef):
execution_order.append({
'step': i + 1,
'type': 'class_definition',
'name': node.name,
'line': node.lineno
})
elif isinstance(node, ast.Import):
modules = [alias.name for alias in node.names]
execution_order.append({
'step': i + 1,
'type': 'import',
'modules': modules,
'line': node.lineno
})
elif isinstance(node, ast.ImportFrom):
execution_order.append({
'step': i + 1,
'type': 'from_import',
'module': node.module,
'names': [alias.name for alias in node.names],
'line': node.lineno
})
elif isinstance(node, ast.Assign):
execution_order.append({
'step': i + 1,
'type': 'assignment',
'targets': [ast.unparse(target) for target in node.targets],
'value': ast.unparse(node.value),
'line': node.lineno
})
elif isinstance(node, ast.Expr):
execution_order.append({
'step': i + 1,
'type': 'expression',
'expression': ast.unparse(node.value),
'line': node.lineno
})
else:
execution_order.append({
'step': i + 1,
'type': type(node).__name__.lower(),
'line': node.lineno
})
return execution_order
def _track_variable_lifecycle(self, tree: ast.AST) -> Dict[str, Dict[str, Any]]:
"""Track variable definitions, modifications, and usage."""
variables = {}
for node in ast.walk(tree):
if isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name):
var_name = target.id
if var_name not in variables:
variables[var_name] = {
'first_assignment': node.lineno,
'assignments': [],
'usages': []
}
variables[var_name]['assignments'].append({
'line': node.lineno,
'value': ast.unparse(node.value)
})
elif isinstance(node, ast.Name) and isinstance(node.ctx, ast.Load):
var_name = node.id
if var_name in variables:
variables[var_name]['usages'].append(node.lineno)
return variables
def _extract_function_calls(self, tree: ast.AST) -> List[Dict[str, Any]]:
"""Extract all function calls in execution order."""
function_calls = []
for node in ast.walk(tree):
if isinstance(node, ast.Call):
call_info = {
'line': node.lineno,
'args': [ast.unparse(arg) for arg in node.args],
'kwargs': {kw.arg: ast.unparse(kw.value) for kw in node.keywords}
}
if isinstance(node.func, ast.Name):
call_info['function'] = node.func.id
call_info['type'] = 'simple_call'
elif isinstance(node.func, ast.Attribute):
call_info['function'] = ast.unparse(node.func)
call_info['type'] = 'method_call'
else:
call_info['function'] = ast.unparse(node.func)
call_info['type'] = 'complex_call'
function_calls.append(call_info)
return function_calls
def _detect_potential_errors(self, tree: ast.AST) -> List[Dict[str, Any]]:
"""Detect potential runtime errors."""
potential_errors = []
for node in ast.walk(tree):
# Division by zero
if isinstance(node, ast.BinOp) and isinstance(node.op, ast.Div):
if isinstance(node.right, ast.Constant) and node.right.value == 0:
potential_errors.append({
'type': 'division_by_zero',
'line': node.lineno,
'message': 'Division by zero detected'
})
# Undefined variable usage (basic check)
elif isinstance(node, ast.Name) and isinstance(node.ctx, ast.Load):
# This is a simplified check - would need more sophisticated analysis
pass
# Index out of bounds (basic patterns)
elif isinstance(node, ast.Subscript):
if isinstance(node.slice, ast.Constant):
potential_errors.append({
'type': 'potential_index_error',
'line': node.lineno,
'message': 'Potential index out of bounds'
})
return potential_errors
def _analyze_performance(self, tree: ast.AST) -> List[str]:
"""Analyze potential performance issues."""
performance_notes = []
for node in ast.walk(tree):
# Nested loops
if isinstance(node, ast.For):
for child in ast.walk(node):
if isinstance(child, ast.For) and child != node:
performance_notes.append(
f"Nested loops detected at line {node.lineno} - consider optimization"
)
break
# List comprehensions vs loops
elif isinstance(node, ast.ListComp):
performance_notes.append(
f"List comprehension at line {node.lineno} - good for performance"
)
return performance_notes
def _predict_final_result(self, tree: ast.AST, code: str) -> Dict[str, Any]:
"""Predict the final result of code execution."""
prediction = {
'has_return_statement': False,
'has_print_statements': False,
'last_expression': None,
'predicted_output_type': 'none'
}
# Check for return statements
for node in ast.walk(tree):
if isinstance(node, ast.Return):
prediction['has_return_statement'] = True
if node.value:
prediction['return_value'] = ast.unparse(node.value)
elif isinstance(node, ast.Call):
if isinstance(node.func, ast.Name) and node.func.id == 'print':
prediction['has_print_statements'] = True
# Check last statement
if tree.body:
last_stmt = tree.body[-1]
if isinstance(last_stmt, ast.Expr):
prediction['last_expression'] = ast.unparse(last_stmt.value)
prediction['predicted_output_type'] = 'expression_result'
elif isinstance(last_stmt, ast.Return):
prediction['predicted_output_type'] = 'return_value'
if prediction['has_print_statements']:
prediction['predicted_output_type'] = 'printed_output'
return prediction
class CodeAnalyzerTool:
"""AGNO-compatible code analysis tool."""
def __init__(self):
"""Initialize the code analyzer tool."""
self.structure_analyzer = CodeStructureAnalyzer()
self.flow_analyzer = ExecutionFlowAnalyzer()
self.available = True
logger.info("CodeAnalyzerTool initialized")
def analyze_python_code(self, code: str) -> str:
"""
Analyze Python code structure and execution flow.
Args:
code: Python code to analyze
Returns:
Formatted analysis report
"""
try:
# Analyze code structure
structure = self.structure_analyzer.analyze_code_structure(code)
if not structure.get('syntax_valid', False):
return f"Syntax Error: {structure.get('syntax_error', 'Unknown syntax error')}"
# Analyze execution flow
flow = self.flow_analyzer.analyze_execution_flow(code)
# Format report
report = "Code Analysis Report\n"
report += "=" * 50 + "\n\n"
# Structure analysis
report += "STRUCTURE ANALYSIS:\n"
report += f"- Functions: {len(structure['functions'])}\n"
report += f"- Classes: {len(structure['classes'])}\n"
report += f"- Variables: {len(structure['variables'])}\n"
report += f"- Imports: {len(structure['imports'])}\n"
report += f"- Complexity: {structure['complexity']['cyclomatic_complexity']}\n\n"
# Dependencies
if structure['dependencies']['external_modules']:
report += f"External Dependencies: {', '.join(structure['dependencies']['external_modules'])}\n"
# Execution flow
if 'execution_order' in flow:
report += f"\nEXECUTION STEPS: {len(flow['execution_order'])}\n"
# Predicted output
if 'final_result_prediction' in flow:
pred = flow['final_result_prediction']
report += f"\nPREDICTED OUTPUT TYPE: {pred['predicted_output_type']}\n"
if pred.get('last_expression'):
report += f"Last Expression: {pred['last_expression']}\n"
# Potential issues
if 'potential_errors' in flow and flow['potential_errors']:
report += "\nPOTENTIAL ISSUES:\n"
for error in flow['potential_errors']:
report += f"- Line {error['line']}: {error['message']}\n"
return report
except Exception as e:
return f"Analysis failed: {e}"
def predict_code_output(self, code: str) -> str:
"""
Predict the output of Python code without executing it.
Args:
code: Python code to analyze
Returns:
Predicted output description
"""
try:
structure = self.structure_analyzer.analyze_code_structure(code)
flow = self.flow_analyzer.analyze_execution_flow(code)
if not structure.get('syntax_valid', False):
return f"Cannot predict output - syntax error: {structure.get('syntax_error')}"
prediction = "Output Prediction:\n"
prediction += "-" * 30 + "\n"
# Check for print statements
if structure['potential_outputs']:
print_outputs = [out for out in structure['potential_outputs'] if out['type'] == 'print']
if print_outputs:
prediction += f"Print statements: {len(print_outputs)}\n"
for out in print_outputs[:3]: # Show first 3
prediction += f" Line {out['line_number']}: print({', '.join(out['args'])})\n"
# Check for return statements
returns = [out for out in structure['potential_outputs'] if out['type'] == 'return']
if returns:
prediction += f"Return statements: {len(returns)}\n"
for ret in returns[:3]:
prediction += f" Line {ret['line_number']}: return {ret['value']}\n"
# Check for expressions
expressions = [out for out in structure['potential_outputs'] if out['type'] == 'expression']
if expressions:
prediction += f"Final expression: {expressions[-1]['expression']}\n"
# Final result prediction
if 'final_result_prediction' in flow:
pred = flow['final_result_prediction']
prediction += f"\nFinal result type: {pred['predicted_output_type']}\n"
return prediction
except Exception as e:
return f"Prediction failed: {e}"
def detect_code_dependencies(self, code: str) -> str:
"""
Detect dependencies and imports required by code.
Args:
code: Python code to analyze
Returns:
Dependencies report
"""
try:
structure = self.structure_analyzer.analyze_code_structure(code)
if not structure.get('syntax_valid', False):
return f"Cannot analyze dependencies - syntax error: {structure.get('syntax_error')}"
deps = structure['dependencies']
report = "Dependencies Analysis:\n"
report += "-" * 30 + "\n"
if deps['standard_modules']:
report += f"Standard library modules: {', '.join(deps['standard_modules'])}\n"
if deps['external_modules']:
report += f"External modules: {', '.join(deps['external_modules'])}\n"
if deps['builtin_functions']:
report += f"Built-in functions used: {', '.join(deps['builtin_functions'])}\n"
if deps['undefined_names']:
report += f"Undefined names (potential issues): {', '.join(deps['undefined_names'])}\n"
return report
except Exception as e:
return f"Dependency analysis failed: {e}"
def suggest_code_optimizations(self, code: str) -> str:
"""
Suggest optimizations for Python code.
Args:
code: Python code to analyze
Returns:
Optimization suggestions
"""
try:
structure = self.structure_analyzer.analyze_code_structure(code)
flow = self.flow_analyzer.analyze_execution_flow(code)
suggestions = "Code Optimization Suggestions:\n"
suggestions += "-" * 40 + "\n"
# Complexity suggestions
complexity = structure['complexity']['cyclomatic_complexity']
if complexity > 10:
suggestions += f"- High complexity ({complexity}) - consider breaking into smaller functions\n"
# Control flow suggestions
control = structure['control_flow']
if control['max_nesting_depth'] > 3:
suggestions += f"- Deep nesting ({control['max_nesting_depth']} levels) - consider refactoring\n"
# Performance notes from flow analysis
if 'performance_notes' in flow:
for note in flow['performance_notes']:
suggestions += f"- {note}\n"
# Import suggestions
deps = structure['dependencies']
if len(deps['external_modules']) > 5:
suggestions += "- Many external dependencies - consider reducing for better portability\n"
if not suggestions.strip().endswith(":\n" + "-" * 40):
return suggestions
else:
return suggestions + "No specific optimizations suggested - code looks good!\n"
except Exception as e:
return f"Optimization analysis failed: {e}"
def get_code_analyzer_tools():
"""Get code analyzer tools for AGNO registration."""
tool = CodeAnalyzerTool()
return [
{
'name': 'analyze_python_code',
'function': tool.analyze_python_code,
'description': 'Analyze Python code structure, complexity, and execution flow'
},
{
'name': 'predict_code_output',
'function': tool.predict_code_output,
'description': 'Predict the output of Python code without executing it'
},
{
'name': 'detect_code_dependencies',
'function': tool.detect_code_dependencies,
'description': 'Detect dependencies and imports required by Python code'
},
{
'name': 'suggest_code_optimizations',
'function': tool.suggest_code_optimizations,
'description': 'Suggest optimizations and improvements for Python code'
}
]
if __name__ == "__main__":
# Test the code analyzer
tool = CodeAnalyzerTool()
test_code = """
import math
import numpy as np
def calculate_result(x, y):
result = math.sqrt(x**2 + y**2)
return result * math.pi
data = [1, 2, 3, 4, 5]
mean_value = np.mean(data)
final_result = calculate_result(mean_value, 2.5)
print(f"Final result: {final_result}")
final_result
"""
print("Testing CodeAnalyzerTool:")
print("=" * 50)
analysis = tool.analyze_python_code(test_code)
print(analysis)
print("\n" + "=" * 50)
prediction = tool.predict_code_output(test_code)
print(prediction)