Spaces:
Sleeping
Sleeping
| """ | |
| Code Analysis Tool for GAIA Agent | |
| Python code parsing, analysis, and execution flow prediction. | |
| Features: | |
| - Python code parsing and AST analysis | |
| - Dependency detection and import analysis | |
| - Execution flow analysis and variable tracking | |
| - Output prediction and result estimation | |
| - Code optimization suggestions | |
| - Error detection and debugging assistance | |
| """ | |
| import ast | |
| import logging | |
| import re | |
| import sys | |
| import inspect | |
| import importlib | |
| from typing import Dict, Any, List, Optional, Set, Tuple, Union | |
| from pathlib import Path | |
| import json | |
| logger = logging.getLogger(__name__) | |
| class CodeStructureAnalyzer: | |
| """Analyze Python code structure and components.""" | |
| def __init__(self): | |
| """Initialize the code structure analyzer.""" | |
| self.builtin_functions = set(dir(__builtins__)) | |
| self.standard_modules = { | |
| 'math', 'os', 'sys', 'json', 'csv', 'datetime', 'time', | |
| 'random', 'collections', 'itertools', 'functools', 'operator', | |
| 'string', 're', 'urllib', 'http', 'pathlib', 'typing', | |
| 'decimal', 'fractions', 'statistics', 'cmath' | |
| } | |
| def analyze_code_structure(self, code: str) -> Dict[str, Any]: | |
| """ | |
| Analyze the structure of Python code. | |
| Args: | |
| code: Python code to analyze | |
| Returns: | |
| Dictionary with code structure information | |
| """ | |
| try: | |
| tree = ast.parse(code) | |
| analysis = { | |
| 'imports': self._extract_imports(tree), | |
| 'functions': self._extract_functions(tree), | |
| 'classes': self._extract_classes(tree), | |
| 'variables': self._extract_variables(tree), | |
| 'constants': self._extract_constants(tree), | |
| 'control_flow': self._analyze_control_flow(tree), | |
| 'complexity': self._calculate_complexity(tree), | |
| 'dependencies': self._analyze_dependencies(tree), | |
| 'potential_outputs': self._predict_outputs(tree), | |
| 'syntax_valid': True | |
| } | |
| return analysis | |
| except SyntaxError as e: | |
| return { | |
| 'syntax_valid': False, | |
| 'syntax_error': str(e), | |
| 'line_number': e.lineno, | |
| 'error_text': e.text | |
| } | |
| except Exception as e: | |
| logger.error(f"Code analysis failed: {e}") | |
| return { | |
| 'syntax_valid': False, | |
| 'analysis_error': str(e) | |
| } | |
| def _extract_imports(self, tree: ast.AST) -> List[Dict[str, Any]]: | |
| """Extract import statements from AST.""" | |
| imports = [] | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Import): | |
| for alias in node.names: | |
| imports.append({ | |
| 'type': 'import', | |
| 'module': alias.name, | |
| 'alias': alias.asname, | |
| 'is_standard': alias.name.split('.')[0] in self.standard_modules | |
| }) | |
| elif isinstance(node, ast.ImportFrom): | |
| module = node.module or '' | |
| for alias in node.names: | |
| imports.append({ | |
| 'type': 'from_import', | |
| 'module': module, | |
| 'name': alias.name, | |
| 'alias': alias.asname, | |
| 'is_standard': module.split('.')[0] in self.standard_modules | |
| }) | |
| return imports | |
| def _extract_functions(self, tree: ast.AST) -> List[Dict[str, Any]]: | |
| """Extract function definitions from AST.""" | |
| functions = [] | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.FunctionDef): | |
| functions.append({ | |
| 'name': node.name, | |
| 'args': [arg.arg for arg in node.args.args], | |
| 'defaults': len(node.args.defaults), | |
| 'returns': ast.unparse(node.returns) if node.returns else None, | |
| 'docstring': ast.get_docstring(node), | |
| 'line_number': node.lineno, | |
| 'is_async': False | |
| }) | |
| elif isinstance(node, ast.AsyncFunctionDef): | |
| functions.append({ | |
| 'name': node.name, | |
| 'args': [arg.arg for arg in node.args.args], | |
| 'defaults': len(node.args.defaults), | |
| 'returns': ast.unparse(node.returns) if node.returns else None, | |
| 'docstring': ast.get_docstring(node), | |
| 'line_number': node.lineno, | |
| 'is_async': True | |
| }) | |
| return functions | |
| def _extract_classes(self, tree: ast.AST) -> List[Dict[str, Any]]: | |
| """Extract class definitions from AST.""" | |
| classes = [] | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.ClassDef): | |
| methods = [] | |
| for item in node.body: | |
| if isinstance(item, ast.FunctionDef): | |
| methods.append({ | |
| 'name': item.name, | |
| 'args': [arg.arg for arg in item.args.args], | |
| 'is_property': any( | |
| isinstance(d, ast.Name) and d.id == 'property' | |
| for d in item.decorator_list | |
| ) | |
| }) | |
| classes.append({ | |
| 'name': node.name, | |
| 'bases': [ast.unparse(base) for base in node.bases], | |
| 'methods': methods, | |
| 'docstring': ast.get_docstring(node), | |
| 'line_number': node.lineno | |
| }) | |
| return classes | |
| def _extract_variables(self, tree: ast.AST) -> List[Dict[str, Any]]: | |
| """Extract variable assignments from AST.""" | |
| variables = [] | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Assign): | |
| for target in node.targets: | |
| if isinstance(target, ast.Name): | |
| variables.append({ | |
| 'name': target.id, | |
| 'type': 'assignment', | |
| 'value': ast.unparse(node.value), | |
| 'line_number': node.lineno | |
| }) | |
| elif isinstance(node, ast.AnnAssign) and node.target: | |
| if isinstance(node.target, ast.Name): | |
| variables.append({ | |
| 'name': node.target.id, | |
| 'type': 'annotated_assignment', | |
| 'annotation': ast.unparse(node.annotation), | |
| 'value': ast.unparse(node.value) if node.value else None, | |
| 'line_number': node.lineno | |
| }) | |
| return variables | |
| def _extract_constants(self, tree: ast.AST) -> List[Dict[str, Any]]: | |
| """Extract constant values from AST.""" | |
| constants = [] | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Constant): | |
| constants.append({ | |
| 'value': node.value, | |
| 'type': type(node.value).__name__, | |
| 'line_number': node.lineno | |
| }) | |
| return constants | |
| def _analyze_control_flow(self, tree: ast.AST) -> Dict[str, Any]: | |
| """Analyze control flow structures.""" | |
| control_flow = { | |
| 'if_statements': 0, | |
| 'for_loops': 0, | |
| 'while_loops': 0, | |
| 'try_except': 0, | |
| 'with_statements': 0, | |
| 'comprehensions': 0, | |
| 'max_nesting_depth': 0 | |
| } | |
| def calculate_depth(node, current_depth=0): | |
| max_depth = current_depth | |
| for child in ast.iter_child_nodes(node): | |
| if isinstance(child, (ast.If, ast.For, ast.While, ast.Try, ast.With)): | |
| child_depth = calculate_depth(child, current_depth + 1) | |
| max_depth = max(max_depth, child_depth) | |
| else: | |
| child_depth = calculate_depth(child, current_depth) | |
| max_depth = max(max_depth, child_depth) | |
| return max_depth | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.If): | |
| control_flow['if_statements'] += 1 | |
| elif isinstance(node, ast.For): | |
| control_flow['for_loops'] += 1 | |
| elif isinstance(node, ast.While): | |
| control_flow['while_loops'] += 1 | |
| elif isinstance(node, ast.Try): | |
| control_flow['try_except'] += 1 | |
| elif isinstance(node, ast.With): | |
| control_flow['with_statements'] += 1 | |
| elif isinstance(node, (ast.ListComp, ast.DictComp, ast.SetComp, ast.GeneratorExp)): | |
| control_flow['comprehensions'] += 1 | |
| control_flow['max_nesting_depth'] = calculate_depth(tree) | |
| return control_flow | |
| def _calculate_complexity(self, tree: ast.AST) -> Dict[str, int]: | |
| """Calculate code complexity metrics.""" | |
| complexity = { | |
| 'cyclomatic_complexity': 1, # Base complexity | |
| 'lines_of_code': len(ast.unparse(tree).split('\n')), | |
| 'number_of_nodes': len(list(ast.walk(tree))) | |
| } | |
| # Calculate cyclomatic complexity | |
| for node in ast.walk(tree): | |
| if isinstance(node, (ast.If, ast.While, ast.For, ast.ExceptHandler)): | |
| complexity['cyclomatic_complexity'] += 1 | |
| elif isinstance(node, ast.BoolOp): | |
| complexity['cyclomatic_complexity'] += len(node.values) - 1 | |
| return complexity | |
| def _analyze_dependencies(self, tree: ast.AST) -> Dict[str, Any]: | |
| """Analyze code dependencies.""" | |
| dependencies = { | |
| 'external_modules': set(), | |
| 'standard_modules': set(), | |
| 'builtin_functions': set(), | |
| 'undefined_names': set() | |
| } | |
| # Track defined names | |
| defined_names = set() | |
| # Extract imports | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Import): | |
| for alias in node.names: | |
| module_name = alias.name.split('.')[0] | |
| if module_name in self.standard_modules: | |
| dependencies['standard_modules'].add(alias.name) | |
| else: | |
| dependencies['external_modules'].add(alias.name) | |
| defined_names.add(alias.asname or alias.name) | |
| elif isinstance(node, ast.ImportFrom): | |
| module = node.module or '' | |
| module_name = module.split('.')[0] | |
| if module_name in self.standard_modules: | |
| dependencies['standard_modules'].add(module) | |
| else: | |
| dependencies['external_modules'].add(module) | |
| for alias in node.names: | |
| defined_names.add(alias.asname or alias.name) | |
| # Track function and class definitions | |
| elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): | |
| defined_names.add(node.name) | |
| # Track variable assignments | |
| elif isinstance(node, ast.Assign): | |
| for target in node.targets: | |
| if isinstance(target, ast.Name): | |
| defined_names.add(target.id) | |
| # Find undefined names | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Name) and isinstance(node.ctx, ast.Load): | |
| if (node.id not in defined_names and | |
| node.id not in self.builtin_functions and | |
| not node.id.startswith('_')): | |
| dependencies['undefined_names'].add(node.id) | |
| elif node.id in self.builtin_functions: | |
| dependencies['builtin_functions'].add(node.id) | |
| # Convert sets to lists for JSON serialization | |
| for key in dependencies: | |
| dependencies[key] = list(dependencies[key]) | |
| return dependencies | |
| def _predict_outputs(self, tree: ast.AST) -> List[Dict[str, Any]]: | |
| """Predict potential outputs from code.""" | |
| outputs = [] | |
| for node in ast.walk(tree): | |
| # Look for print statements | |
| if isinstance(node, ast.Call): | |
| if isinstance(node.func, ast.Name) and node.func.id == 'print': | |
| outputs.append({ | |
| 'type': 'print', | |
| 'line_number': node.lineno, | |
| 'args': [ast.unparse(arg) for arg in node.args] | |
| }) | |
| # Look for return statements | |
| elif isinstance(node, ast.Return): | |
| outputs.append({ | |
| 'type': 'return', | |
| 'line_number': node.lineno, | |
| 'value': ast.unparse(node.value) if node.value else None | |
| }) | |
| # Look for expressions that might produce output | |
| elif isinstance(node, ast.Expr): | |
| # Check if it's a standalone expression that would be printed in REPL | |
| if not isinstance(node.value, ast.Call): | |
| outputs.append({ | |
| 'type': 'expression', | |
| 'line_number': node.lineno, | |
| 'expression': ast.unparse(node.value) | |
| }) | |
| return outputs | |
| class ExecutionFlowAnalyzer: | |
| """Analyze execution flow and predict behavior.""" | |
| def __init__(self): | |
| """Initialize execution flow analyzer.""" | |
| pass | |
| def analyze_execution_flow(self, code: str) -> Dict[str, Any]: | |
| """ | |
| Analyze the execution flow of Python code. | |
| Args: | |
| code: Python code to analyze | |
| Returns: | |
| Execution flow analysis | |
| """ | |
| try: | |
| tree = ast.parse(code) | |
| analysis = { | |
| 'execution_order': self._determine_execution_order(tree), | |
| 'variable_lifecycle': self._track_variable_lifecycle(tree), | |
| 'function_calls': self._extract_function_calls(tree), | |
| 'potential_errors': self._detect_potential_errors(tree), | |
| 'performance_notes': self._analyze_performance(tree), | |
| 'final_result_prediction': self._predict_final_result(tree, code) | |
| } | |
| return analysis | |
| except Exception as e: | |
| logger.error(f"Execution flow analysis failed: {e}") | |
| return {'error': str(e)} | |
| def _determine_execution_order(self, tree: ast.AST) -> List[Dict[str, Any]]: | |
| """Determine the order of code execution.""" | |
| execution_order = [] | |
| for i, node in enumerate(tree.body): | |
| if isinstance(node, ast.FunctionDef): | |
| execution_order.append({ | |
| 'step': i + 1, | |
| 'type': 'function_definition', | |
| 'name': node.name, | |
| 'line': node.lineno | |
| }) | |
| elif isinstance(node, ast.ClassDef): | |
| execution_order.append({ | |
| 'step': i + 1, | |
| 'type': 'class_definition', | |
| 'name': node.name, | |
| 'line': node.lineno | |
| }) | |
| elif isinstance(node, ast.Import): | |
| modules = [alias.name for alias in node.names] | |
| execution_order.append({ | |
| 'step': i + 1, | |
| 'type': 'import', | |
| 'modules': modules, | |
| 'line': node.lineno | |
| }) | |
| elif isinstance(node, ast.ImportFrom): | |
| execution_order.append({ | |
| 'step': i + 1, | |
| 'type': 'from_import', | |
| 'module': node.module, | |
| 'names': [alias.name for alias in node.names], | |
| 'line': node.lineno | |
| }) | |
| elif isinstance(node, ast.Assign): | |
| execution_order.append({ | |
| 'step': i + 1, | |
| 'type': 'assignment', | |
| 'targets': [ast.unparse(target) for target in node.targets], | |
| 'value': ast.unparse(node.value), | |
| 'line': node.lineno | |
| }) | |
| elif isinstance(node, ast.Expr): | |
| execution_order.append({ | |
| 'step': i + 1, | |
| 'type': 'expression', | |
| 'expression': ast.unparse(node.value), | |
| 'line': node.lineno | |
| }) | |
| else: | |
| execution_order.append({ | |
| 'step': i + 1, | |
| 'type': type(node).__name__.lower(), | |
| 'line': node.lineno | |
| }) | |
| return execution_order | |
| def _track_variable_lifecycle(self, tree: ast.AST) -> Dict[str, Dict[str, Any]]: | |
| """Track variable definitions, modifications, and usage.""" | |
| variables = {} | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Assign): | |
| for target in node.targets: | |
| if isinstance(target, ast.Name): | |
| var_name = target.id | |
| if var_name not in variables: | |
| variables[var_name] = { | |
| 'first_assignment': node.lineno, | |
| 'assignments': [], | |
| 'usages': [] | |
| } | |
| variables[var_name]['assignments'].append({ | |
| 'line': node.lineno, | |
| 'value': ast.unparse(node.value) | |
| }) | |
| elif isinstance(node, ast.Name) and isinstance(node.ctx, ast.Load): | |
| var_name = node.id | |
| if var_name in variables: | |
| variables[var_name]['usages'].append(node.lineno) | |
| return variables | |
| def _extract_function_calls(self, tree: ast.AST) -> List[Dict[str, Any]]: | |
| """Extract all function calls in execution order.""" | |
| function_calls = [] | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Call): | |
| call_info = { | |
| 'line': node.lineno, | |
| 'args': [ast.unparse(arg) for arg in node.args], | |
| 'kwargs': {kw.arg: ast.unparse(kw.value) for kw in node.keywords} | |
| } | |
| if isinstance(node.func, ast.Name): | |
| call_info['function'] = node.func.id | |
| call_info['type'] = 'simple_call' | |
| elif isinstance(node.func, ast.Attribute): | |
| call_info['function'] = ast.unparse(node.func) | |
| call_info['type'] = 'method_call' | |
| else: | |
| call_info['function'] = ast.unparse(node.func) | |
| call_info['type'] = 'complex_call' | |
| function_calls.append(call_info) | |
| return function_calls | |
| def _detect_potential_errors(self, tree: ast.AST) -> List[Dict[str, Any]]: | |
| """Detect potential runtime errors.""" | |
| potential_errors = [] | |
| for node in ast.walk(tree): | |
| # Division by zero | |
| if isinstance(node, ast.BinOp) and isinstance(node.op, ast.Div): | |
| if isinstance(node.right, ast.Constant) and node.right.value == 0: | |
| potential_errors.append({ | |
| 'type': 'division_by_zero', | |
| 'line': node.lineno, | |
| 'message': 'Division by zero detected' | |
| }) | |
| # Undefined variable usage (basic check) | |
| elif isinstance(node, ast.Name) and isinstance(node.ctx, ast.Load): | |
| # This is a simplified check - would need more sophisticated analysis | |
| pass | |
| # Index out of bounds (basic patterns) | |
| elif isinstance(node, ast.Subscript): | |
| if isinstance(node.slice, ast.Constant): | |
| potential_errors.append({ | |
| 'type': 'potential_index_error', | |
| 'line': node.lineno, | |
| 'message': 'Potential index out of bounds' | |
| }) | |
| return potential_errors | |
| def _analyze_performance(self, tree: ast.AST) -> List[str]: | |
| """Analyze potential performance issues.""" | |
| performance_notes = [] | |
| for node in ast.walk(tree): | |
| # Nested loops | |
| if isinstance(node, ast.For): | |
| for child in ast.walk(node): | |
| if isinstance(child, ast.For) and child != node: | |
| performance_notes.append( | |
| f"Nested loops detected at line {node.lineno} - consider optimization" | |
| ) | |
| break | |
| # List comprehensions vs loops | |
| elif isinstance(node, ast.ListComp): | |
| performance_notes.append( | |
| f"List comprehension at line {node.lineno} - good for performance" | |
| ) | |
| return performance_notes | |
| def _predict_final_result(self, tree: ast.AST, code: str) -> Dict[str, Any]: | |
| """Predict the final result of code execution.""" | |
| prediction = { | |
| 'has_return_statement': False, | |
| 'has_print_statements': False, | |
| 'last_expression': None, | |
| 'predicted_output_type': 'none' | |
| } | |
| # Check for return statements | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Return): | |
| prediction['has_return_statement'] = True | |
| if node.value: | |
| prediction['return_value'] = ast.unparse(node.value) | |
| elif isinstance(node, ast.Call): | |
| if isinstance(node.func, ast.Name) and node.func.id == 'print': | |
| prediction['has_print_statements'] = True | |
| # Check last statement | |
| if tree.body: | |
| last_stmt = tree.body[-1] | |
| if isinstance(last_stmt, ast.Expr): | |
| prediction['last_expression'] = ast.unparse(last_stmt.value) | |
| prediction['predicted_output_type'] = 'expression_result' | |
| elif isinstance(last_stmt, ast.Return): | |
| prediction['predicted_output_type'] = 'return_value' | |
| if prediction['has_print_statements']: | |
| prediction['predicted_output_type'] = 'printed_output' | |
| return prediction | |
| class CodeAnalyzerTool: | |
| """AGNO-compatible code analysis tool.""" | |
| def __init__(self): | |
| """Initialize the code analyzer tool.""" | |
| self.structure_analyzer = CodeStructureAnalyzer() | |
| self.flow_analyzer = ExecutionFlowAnalyzer() | |
| self.available = True | |
| logger.info("CodeAnalyzerTool initialized") | |
| def analyze_python_code(self, code: str) -> str: | |
| """ | |
| Analyze Python code structure and execution flow. | |
| Args: | |
| code: Python code to analyze | |
| Returns: | |
| Formatted analysis report | |
| """ | |
| try: | |
| # Analyze code structure | |
| structure = self.structure_analyzer.analyze_code_structure(code) | |
| if not structure.get('syntax_valid', False): | |
| return f"Syntax Error: {structure.get('syntax_error', 'Unknown syntax error')}" | |
| # Analyze execution flow | |
| flow = self.flow_analyzer.analyze_execution_flow(code) | |
| # Format report | |
| report = "Code Analysis Report\n" | |
| report += "=" * 50 + "\n\n" | |
| # Structure analysis | |
| report += "STRUCTURE ANALYSIS:\n" | |
| report += f"- Functions: {len(structure['functions'])}\n" | |
| report += f"- Classes: {len(structure['classes'])}\n" | |
| report += f"- Variables: {len(structure['variables'])}\n" | |
| report += f"- Imports: {len(structure['imports'])}\n" | |
| report += f"- Complexity: {structure['complexity']['cyclomatic_complexity']}\n\n" | |
| # Dependencies | |
| if structure['dependencies']['external_modules']: | |
| report += f"External Dependencies: {', '.join(structure['dependencies']['external_modules'])}\n" | |
| # Execution flow | |
| if 'execution_order' in flow: | |
| report += f"\nEXECUTION STEPS: {len(flow['execution_order'])}\n" | |
| # Predicted output | |
| if 'final_result_prediction' in flow: | |
| pred = flow['final_result_prediction'] | |
| report += f"\nPREDICTED OUTPUT TYPE: {pred['predicted_output_type']}\n" | |
| if pred.get('last_expression'): | |
| report += f"Last Expression: {pred['last_expression']}\n" | |
| # Potential issues | |
| if 'potential_errors' in flow and flow['potential_errors']: | |
| report += "\nPOTENTIAL ISSUES:\n" | |
| for error in flow['potential_errors']: | |
| report += f"- Line {error['line']}: {error['message']}\n" | |
| return report | |
| except Exception as e: | |
| return f"Analysis failed: {e}" | |
| def predict_code_output(self, code: str) -> str: | |
| """ | |
| Predict the output of Python code without executing it. | |
| Args: | |
| code: Python code to analyze | |
| Returns: | |
| Predicted output description | |
| """ | |
| try: | |
| structure = self.structure_analyzer.analyze_code_structure(code) | |
| flow = self.flow_analyzer.analyze_execution_flow(code) | |
| if not structure.get('syntax_valid', False): | |
| return f"Cannot predict output - syntax error: {structure.get('syntax_error')}" | |
| prediction = "Output Prediction:\n" | |
| prediction += "-" * 30 + "\n" | |
| # Check for print statements | |
| if structure['potential_outputs']: | |
| print_outputs = [out for out in structure['potential_outputs'] if out['type'] == 'print'] | |
| if print_outputs: | |
| prediction += f"Print statements: {len(print_outputs)}\n" | |
| for out in print_outputs[:3]: # Show first 3 | |
| prediction += f" Line {out['line_number']}: print({', '.join(out['args'])})\n" | |
| # Check for return statements | |
| returns = [out for out in structure['potential_outputs'] if out['type'] == 'return'] | |
| if returns: | |
| prediction += f"Return statements: {len(returns)}\n" | |
| for ret in returns[:3]: | |
| prediction += f" Line {ret['line_number']}: return {ret['value']}\n" | |
| # Check for expressions | |
| expressions = [out for out in structure['potential_outputs'] if out['type'] == 'expression'] | |
| if expressions: | |
| prediction += f"Final expression: {expressions[-1]['expression']}\n" | |
| # Final result prediction | |
| if 'final_result_prediction' in flow: | |
| pred = flow['final_result_prediction'] | |
| prediction += f"\nFinal result type: {pred['predicted_output_type']}\n" | |
| return prediction | |
| except Exception as e: | |
| return f"Prediction failed: {e}" | |
| def detect_code_dependencies(self, code: str) -> str: | |
| """ | |
| Detect dependencies and imports required by code. | |
| Args: | |
| code: Python code to analyze | |
| Returns: | |
| Dependencies report | |
| """ | |
| try: | |
| structure = self.structure_analyzer.analyze_code_structure(code) | |
| if not structure.get('syntax_valid', False): | |
| return f"Cannot analyze dependencies - syntax error: {structure.get('syntax_error')}" | |
| deps = structure['dependencies'] | |
| report = "Dependencies Analysis:\n" | |
| report += "-" * 30 + "\n" | |
| if deps['standard_modules']: | |
| report += f"Standard library modules: {', '.join(deps['standard_modules'])}\n" | |
| if deps['external_modules']: | |
| report += f"External modules: {', '.join(deps['external_modules'])}\n" | |
| if deps['builtin_functions']: | |
| report += f"Built-in functions used: {', '.join(deps['builtin_functions'])}\n" | |
| if deps['undefined_names']: | |
| report += f"Undefined names (potential issues): {', '.join(deps['undefined_names'])}\n" | |
| return report | |
| except Exception as e: | |
| return f"Dependency analysis failed: {e}" | |
| def suggest_code_optimizations(self, code: str) -> str: | |
| """ | |
| Suggest optimizations for Python code. | |
| Args: | |
| code: Python code to analyze | |
| Returns: | |
| Optimization suggestions | |
| """ | |
| try: | |
| structure = self.structure_analyzer.analyze_code_structure(code) | |
| flow = self.flow_analyzer.analyze_execution_flow(code) | |
| suggestions = "Code Optimization Suggestions:\n" | |
| suggestions += "-" * 40 + "\n" | |
| # Complexity suggestions | |
| complexity = structure['complexity']['cyclomatic_complexity'] | |
| if complexity > 10: | |
| suggestions += f"- High complexity ({complexity}) - consider breaking into smaller functions\n" | |
| # Control flow suggestions | |
| control = structure['control_flow'] | |
| if control['max_nesting_depth'] > 3: | |
| suggestions += f"- Deep nesting ({control['max_nesting_depth']} levels) - consider refactoring\n" | |
| # Performance notes from flow analysis | |
| if 'performance_notes' in flow: | |
| for note in flow['performance_notes']: | |
| suggestions += f"- {note}\n" | |
| # Import suggestions | |
| deps = structure['dependencies'] | |
| if len(deps['external_modules']) > 5: | |
| suggestions += "- Many external dependencies - consider reducing for better portability\n" | |
| if not suggestions.strip().endswith(":\n" + "-" * 40): | |
| return suggestions | |
| else: | |
| return suggestions + "No specific optimizations suggested - code looks good!\n" | |
| except Exception as e: | |
| return f"Optimization analysis failed: {e}" | |
| def get_code_analyzer_tools(): | |
| """Get code analyzer tools for AGNO registration.""" | |
| tool = CodeAnalyzerTool() | |
| return [ | |
| { | |
| 'name': 'analyze_python_code', | |
| 'function': tool.analyze_python_code, | |
| 'description': 'Analyze Python code structure, complexity, and execution flow' | |
| }, | |
| { | |
| 'name': 'predict_code_output', | |
| 'function': tool.predict_code_output, | |
| 'description': 'Predict the output of Python code without executing it' | |
| }, | |
| { | |
| 'name': 'detect_code_dependencies', | |
| 'function': tool.detect_code_dependencies, | |
| 'description': 'Detect dependencies and imports required by Python code' | |
| }, | |
| { | |
| 'name': 'suggest_code_optimizations', | |
| 'function': tool.suggest_code_optimizations, | |
| 'description': 'Suggest optimizations and improvements for Python code' | |
| } | |
| ] | |
| if __name__ == "__main__": | |
| # Test the code analyzer | |
| tool = CodeAnalyzerTool() | |
| test_code = """ | |
| import math | |
| import numpy as np | |
| def calculate_result(x, y): | |
| result = math.sqrt(x**2 + y**2) | |
| return result * math.pi | |
| data = [1, 2, 3, 4, 5] | |
| mean_value = np.mean(data) | |
| final_result = calculate_result(mean_value, 2.5) | |
| print(f"Final result: {final_result}") | |
| final_result | |
| """ | |
| print("Testing CodeAnalyzerTool:") | |
| print("=" * 50) | |
| analysis = tool.analyze_python_code(test_code) | |
| print(analysis) | |
| print("\n" + "=" * 50) | |
| prediction = tool.predict_code_output(test_code) | |
| print(prediction) |