Spaces:
Sleeping
Sleeping
| """ | |
| Secure Code Execution Tool for GAIA Agent | |
| Provides safe Python code execution with mathematical computation capabilities. | |
| Features: | |
| - Secure sandboxed execution environment | |
| - Mathematical libraries (numpy, scipy, sympy, pandas) | |
| - Timeout and resource management | |
| - Result validation and formatting | |
| - Security restrictions and input sanitization | |
| """ | |
| import os | |
| import sys | |
| import ast | |
| import subprocess | |
| import tempfile | |
| import time | |
| import signal | |
| import logging | |
| import traceback | |
| import re | |
| from typing import Dict, Any, Optional, Union, List | |
| from pathlib import Path | |
| import json | |
| # Mathematical and scientific computing libraries | |
| try: | |
| import numpy as np | |
| NUMPY_AVAILABLE = True | |
| except ImportError: | |
| NUMPY_AVAILABLE = False | |
| try: | |
| import pandas as pd | |
| PANDAS_AVAILABLE = True | |
| except ImportError: | |
| PANDAS_AVAILABLE = False | |
| try: | |
| import scipy | |
| SCIPY_AVAILABLE = True | |
| except ImportError: | |
| SCIPY_AVAILABLE = False | |
| try: | |
| import sympy as sp | |
| SYMPY_AVAILABLE = True | |
| except ImportError: | |
| SYMPY_AVAILABLE = False | |
| try: | |
| import matplotlib | |
| matplotlib.use('Agg') # Non-interactive backend | |
| import matplotlib.pyplot as plt | |
| MATPLOTLIB_AVAILABLE = True | |
| except ImportError: | |
| MATPLOTLIB_AVAILABLE = False | |
| logger = logging.getLogger(__name__) | |
| class SecurityError(Exception): | |
| """Raised when code contains potentially dangerous operations.""" | |
| pass | |
| class ExecutionTimeoutError(Exception): | |
| """Raised when code execution exceeds timeout limit.""" | |
| pass | |
| class CodeSecurityValidator: | |
| """Validates Python code for security risks before execution.""" | |
| # Dangerous imports and functions to block | |
| BLOCKED_IMPORTS = { | |
| 'os', 'sys', 'subprocess', 'shutil', 'glob', 'pickle', 'marshal', | |
| 'importlib', '__import__', 'eval', 'exec', 'compile', 'open', | |
| 'file', 'input', 'raw_input', 'reload', 'vars', 'locals', 'globals', | |
| 'dir', 'hasattr', 'getattr', 'setattr', 'delattr', 'callable', | |
| 'socket', 'urllib', 'requests', 'http', 'ftplib', 'smtplib', | |
| 'telnetlib', 'poplib', 'imaplib', 'nntplib', 'ssl', 'hashlib', | |
| 'hmac', 'secrets', 'random', 'tempfile', 'threading', 'multiprocessing' | |
| } | |
| BLOCKED_FUNCTIONS = { | |
| 'eval', 'exec', 'compile', '__import__', 'open', 'file', 'input', | |
| 'raw_input', 'reload', 'vars', 'locals', 'globals', 'dir', | |
| 'hasattr', 'getattr', 'setattr', 'delattr', 'callable' | |
| } | |
| BLOCKED_ATTRIBUTES = { | |
| '__class__', '__bases__', '__subclasses__', '__mro__', '__globals__', | |
| '__code__', '__func__', '__self__', '__module__', '__dict__', | |
| '__getattribute__', '__setattr__', '__delattr__', '__reduce__', | |
| '__reduce_ex__', '__getstate__', '__setstate__' | |
| } | |
| def validate_code(self, code: str) -> bool: | |
| """ | |
| Validate Python code for security risks. | |
| Args: | |
| code: Python code string to validate | |
| Returns: | |
| True if code is safe, raises SecurityError if dangerous | |
| """ | |
| try: | |
| # Parse the code into an AST | |
| tree = ast.parse(code) | |
| # Walk through all nodes in the AST | |
| for node in ast.walk(tree): | |
| self._check_node(node) | |
| return True | |
| except SyntaxError as e: | |
| raise SecurityError(f"Syntax error in code: {e}") | |
| except Exception as e: | |
| raise SecurityError(f"Code validation failed: {e}") | |
| def _check_node(self, node: ast.AST) -> None: | |
| """Check individual AST node for security risks.""" | |
| # Check imports | |
| if isinstance(node, (ast.Import, ast.ImportFrom)): | |
| self._check_import(node) | |
| # Check function calls | |
| elif isinstance(node, ast.Call): | |
| self._check_function_call(node) | |
| # Check attribute access | |
| elif isinstance(node, ast.Attribute): | |
| self._check_attribute_access(node) | |
| # Check name access | |
| elif isinstance(node, ast.Name): | |
| self._check_name_access(node) | |
| def _check_import(self, node: Union[ast.Import, ast.ImportFrom]) -> None: | |
| """Check import statements for dangerous modules.""" | |
| if isinstance(node, ast.Import): | |
| for alias in node.names: | |
| if alias.name in self.BLOCKED_IMPORTS: | |
| raise SecurityError(f"Blocked import: {alias.name}") | |
| elif isinstance(node, ast.ImportFrom): | |
| if node.module and node.module in self.BLOCKED_IMPORTS: | |
| raise SecurityError(f"Blocked import from: {node.module}") | |
| def _check_function_call(self, node: ast.Call) -> None: | |
| """Check function calls for dangerous operations.""" | |
| if isinstance(node.func, ast.Name): | |
| if node.func.id in self.BLOCKED_FUNCTIONS: | |
| raise SecurityError(f"Blocked function call: {node.func.id}") | |
| def _check_attribute_access(self, node: ast.Attribute) -> None: | |
| """Check attribute access for dangerous attributes.""" | |
| if node.attr in self.BLOCKED_ATTRIBUTES: | |
| raise SecurityError(f"Blocked attribute access: {node.attr}") | |
| def _check_name_access(self, node: ast.Name) -> None: | |
| """Check name access for blocked identifiers.""" | |
| if node.id in self.BLOCKED_FUNCTIONS: | |
| # Allow if it's being assigned to (not called) | |
| if not isinstance(node.ctx, ast.Store): | |
| raise SecurityError(f"Blocked name access: {node.id}") | |
| class SecureCodeExecutor: | |
| """Secure Python code executor with mathematical capabilities.""" | |
| def __init__(self, timeout: int = 30, memory_limit_mb: int = 512): | |
| """ | |
| Initialize secure code executor. | |
| Args: | |
| timeout: Maximum execution time in seconds | |
| memory_limit_mb: Maximum memory usage in MB | |
| """ | |
| self.timeout = timeout | |
| self.memory_limit_mb = memory_limit_mb | |
| self.validator = CodeSecurityValidator() | |
| # Available libraries status | |
| self.available_libraries = { | |
| 'numpy': NUMPY_AVAILABLE, | |
| 'pandas': PANDAS_AVAILABLE, | |
| 'scipy': SCIPY_AVAILABLE, | |
| 'sympy': SYMPY_AVAILABLE, | |
| 'matplotlib': MATPLOTLIB_AVAILABLE | |
| } | |
| logger.info(f"SecureCodeExecutor initialized with {timeout}s timeout, {memory_limit_mb}MB limit") | |
| logger.info(f"Available libraries: {[lib for lib, avail in self.available_libraries.items() if avail]}") | |
| def execute_code(self, code: str, return_output: bool = True) -> Dict[str, Any]: | |
| """ | |
| Execute Python code securely and return results. | |
| Args: | |
| code: Python code to execute | |
| return_output: Whether to capture and return output | |
| Returns: | |
| Dictionary with execution results | |
| """ | |
| start_time = time.time() | |
| try: | |
| # Validate code security | |
| self.validator.validate_code(code) | |
| # Prepare execution environment | |
| execution_result = self._execute_in_subprocess(code, return_output) | |
| execution_time = time.time() - start_time | |
| return { | |
| 'success': True, | |
| 'result': execution_result.get('result'), | |
| 'output': execution_result.get('output', ''), | |
| 'error': None, | |
| 'execution_time': execution_time, | |
| 'libraries_used': self._detect_libraries_used(code) | |
| } | |
| except SecurityError as e: | |
| return { | |
| 'success': False, | |
| 'result': None, | |
| 'output': '', | |
| 'error': f"Security violation: {e}", | |
| 'execution_time': time.time() - start_time, | |
| 'libraries_used': [] | |
| } | |
| except ExecutionTimeoutError as e: | |
| return { | |
| 'success': False, | |
| 'result': None, | |
| 'output': '', | |
| 'error': f"Execution timeout: {e}", | |
| 'execution_time': self.timeout, | |
| 'libraries_used': [] | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'result': None, | |
| 'output': '', | |
| 'error': f"Execution error: {e}", | |
| 'execution_time': time.time() - start_time, | |
| 'libraries_used': [] | |
| } | |
| def _execute_in_subprocess(self, code: str, return_output: bool) -> Dict[str, Any]: | |
| """Execute code in a secure subprocess.""" | |
| # Create temporary file for code execution | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: | |
| # Prepare safe execution environment | |
| safe_code = self._prepare_safe_code(code, return_output) | |
| f.write(safe_code) | |
| temp_file = f.name | |
| try: | |
| # Execute in subprocess with timeout and resource limits | |
| result = subprocess.run( | |
| [sys.executable, temp_file], | |
| capture_output=True, | |
| text=True, | |
| timeout=self.timeout, | |
| cwd=tempfile.gettempdir() # Run in temp directory | |
| ) | |
| if result.returncode == 0: | |
| # Parse output | |
| output_lines = result.stdout.strip().split('\n') | |
| if return_output and output_lines: | |
| # Last line should be the result if we added result capture | |
| if output_lines[-1].startswith('RESULT:'): | |
| result_str = output_lines[-1][7:] # Remove 'RESULT:' prefix | |
| output = '\n'.join(output_lines[:-1]) | |
| try: | |
| # Try to parse as JSON for complex types | |
| parsed_result = json.loads(result_str) | |
| except: | |
| # Fall back to string result | |
| parsed_result = result_str | |
| return { | |
| 'result': parsed_result, | |
| 'output': output | |
| } | |
| else: | |
| return { | |
| 'result': None, | |
| 'output': result.stdout | |
| } | |
| else: | |
| return { | |
| 'result': None, | |
| 'output': result.stdout | |
| } | |
| else: | |
| raise Exception(f"Code execution failed: {result.stderr}") | |
| except subprocess.TimeoutExpired: | |
| raise ExecutionTimeoutError(f"Code execution exceeded {self.timeout} seconds") | |
| finally: | |
| # Clean up temporary file | |
| try: | |
| os.unlink(temp_file) | |
| except: | |
| pass | |
| def _prepare_safe_code(self, code: str, capture_result: bool) -> str: | |
| """Prepare code for safe execution with necessary imports and result capture.""" | |
| safe_imports = [] | |
| # Add available mathematical libraries | |
| if NUMPY_AVAILABLE: | |
| safe_imports.append("import numpy as np") | |
| if PANDAS_AVAILABLE: | |
| safe_imports.append("import pandas as pd") | |
| if SCIPY_AVAILABLE: | |
| safe_imports.append("import scipy") | |
| safe_imports.append("from scipy import stats, optimize, integrate, linalg") | |
| if SYMPY_AVAILABLE: | |
| safe_imports.append("import sympy as sp") | |
| safe_imports.append("from sympy import symbols, solve, diff, integrate as sp_integrate, simplify, expand, factor") | |
| if MATPLOTLIB_AVAILABLE: | |
| safe_imports.append("import matplotlib") | |
| safe_imports.append("matplotlib.use('Agg')") | |
| safe_imports.append("import matplotlib.pyplot as plt") | |
| # Add basic math and other safe imports | |
| safe_imports.extend([ | |
| "import math", | |
| "import cmath", | |
| "import decimal", | |
| "import fractions", | |
| "import statistics", | |
| "import itertools", | |
| "import functools", | |
| "import operator", | |
| "import json" | |
| ]) | |
| # Prepare the complete code | |
| complete_code = '\n'.join(safe_imports) + '\n\n' | |
| if capture_result: | |
| # Wrap user code to capture the last expression result | |
| complete_code += ''' | |
| # User code execution | |
| import sys | |
| from io import StringIO | |
| # Capture stdout | |
| old_stdout = sys.stdout | |
| sys.stdout = captured_output = StringIO() | |
| try: | |
| # Execute user code and capture result | |
| user_code = """''' + code.replace('"""', '\\"\\"\\"') + '''""" | |
| # Execute the code | |
| exec(user_code) | |
| # Try to capture the result of the last expression | |
| import ast | |
| try: | |
| tree = ast.parse(user_code) | |
| if tree.body and isinstance(tree.body[-1], ast.Expr): | |
| # Last statement is an expression, evaluate it | |
| last_expr = ast.Expression(tree.body[-1].value) | |
| result = eval(compile(last_expr, '<string>', 'eval')) | |
| print(f"RESULT:{json.dumps(result) if isinstance(result, (int, float, str, list, dict, bool)) else str(result)}") | |
| else: | |
| print("RESULT:None") | |
| except: | |
| print("RESULT:None") | |
| finally: | |
| # Restore stdout and print captured output | |
| sys.stdout = old_stdout | |
| output = captured_output.getvalue() | |
| if output: | |
| print(output, end='') | |
| ''' | |
| else: | |
| complete_code += code | |
| return complete_code | |
| def _detect_libraries_used(self, code: str) -> List[str]: | |
| """Detect which mathematical libraries are used in the code.""" | |
| libraries_used = [] | |
| # Simple detection based on import statements and usage | |
| if 'numpy' in code or 'np.' in code: | |
| libraries_used.append('numpy') | |
| if 'pandas' in code or 'pd.' in code: | |
| libraries_used.append('pandas') | |
| if 'scipy' in code: | |
| libraries_used.append('scipy') | |
| if 'sympy' in code or 'sp.' in code: | |
| libraries_used.append('sympy') | |
| if 'matplotlib' in code or 'plt.' in code: | |
| libraries_used.append('matplotlib') | |
| if 'math.' in code: | |
| libraries_used.append('math') | |
| return libraries_used | |
| class CodeExecutionTool: | |
| """AGNO-compatible tool for secure Python code execution.""" | |
| def __init__(self, timeout: int = 30, memory_limit_mb: int = 512): | |
| """Initialize the code execution tool.""" | |
| self.executor = SecureCodeExecutor(timeout, memory_limit_mb) | |
| self.available = True | |
| logger.info("CodeExecutionTool initialized successfully") | |
| def execute_python_code(self, code: str) -> str: | |
| """ | |
| Execute Python code and return the result. | |
| Args: | |
| code: Python code to execute | |
| Returns: | |
| Formatted result string | |
| """ | |
| result = self.executor.execute_code(code, return_output=True) | |
| if result['success']: | |
| output_parts = [] | |
| if result['output']: | |
| output_parts.append(f"Output:\n{result['output']}") | |
| if result['result'] is not None: | |
| output_parts.append(f"Result: {result['result']}") | |
| if result['libraries_used']: | |
| output_parts.append(f"Libraries used: {', '.join(result['libraries_used'])}") | |
| output_parts.append(f"Execution time: {result['execution_time']:.3f}s") | |
| return '\n'.join(output_parts) | |
| else: | |
| return f"Error: {result['error']}" | |
| def run_mathematical_computation(self, expression: str) -> str: | |
| """ | |
| Run a mathematical computation using available libraries. | |
| Args: | |
| expression: Mathematical expression or computation | |
| Returns: | |
| Computation result | |
| """ | |
| # Prepare code for mathematical computation | |
| code = f""" | |
| # Mathematical computation | |
| result = {expression} | |
| print(f"Computation: {expression}") | |
| print(f"Result: {{result}}") | |
| result | |
| """ | |
| return self.execute_python_code(code) | |
| def analyze_numerical_data(self, data: str, operation: str = "basic_stats") -> str: | |
| """ | |
| Analyze numerical data using pandas and numpy. | |
| Args: | |
| data: Data as string (comma-separated values or JSON) | |
| operation: Type of analysis to perform | |
| Returns: | |
| Analysis results | |
| """ | |
| code = f""" | |
| import json | |
| # Parse data | |
| try: | |
| data = json.loads('{data}') | |
| except: | |
| data = [float(x.strip()) for x in '{data}'.split(',') if x.strip()] | |
| # Convert to numpy array for analysis | |
| data_array = np.array(data) | |
| # Perform analysis | |
| if '{operation}' == 'basic_stats': | |
| result = {{ | |
| 'mean': float(np.mean(data_array)), | |
| 'median': float(np.median(data_array)), | |
| 'std': float(np.std(data_array)), | |
| 'min': float(np.min(data_array)), | |
| 'max': float(np.max(data_array)), | |
| 'sum': float(np.sum(data_array)), | |
| 'count': len(data_array) | |
| }} | |
| elif '{operation}' == 'advanced_stats': | |
| result = {{ | |
| 'mean': float(np.mean(data_array)), | |
| 'variance': float(np.var(data_array)), | |
| 'skewness': float(stats.skew(data_array)) if 'stats' in globals() else 'N/A', | |
| 'kurtosis': float(stats.kurtosis(data_array)) if 'stats' in globals() else 'N/A', | |
| 'percentiles': {{ | |
| '25th': float(np.percentile(data_array, 25)), | |
| '50th': float(np.percentile(data_array, 50)), | |
| '75th': float(np.percentile(data_array, 75)) | |
| }} | |
| }} | |
| else: | |
| result = 'Unknown operation' | |
| print(f"Data analysis ({operation}):") | |
| print(f"Data: {{data}}") | |
| print(f"Results: {{result}}") | |
| result | |
| """ | |
| return self.execute_python_code(code) | |
| def get_status(self) -> Dict[str, Any]: | |
| """Get tool status and capabilities.""" | |
| return { | |
| 'available': self.available, | |
| 'timeout': self.executor.timeout, | |
| 'memory_limit_mb': self.executor.memory_limit_mb, | |
| 'available_libraries': self.executor.available_libraries, | |
| 'security_features': [ | |
| 'AST-based code validation', | |
| 'Subprocess isolation', | |
| 'Import restrictions', | |
| 'Function call blocking', | |
| 'Attribute access control', | |
| 'Timeout protection', | |
| 'Memory limits' | |
| ] | |
| } | |
| # AGNO tool registration functions | |
| def get_code_execution_tools(): | |
| """Get code execution tools for AGNO registration.""" | |
| tool = CodeExecutionTool() | |
| # Return tool methods that can be called by AGNO | |
| return [ | |
| { | |
| 'name': 'execute_python_code', | |
| 'function': tool.execute_python_code, | |
| 'description': 'Execute Python code securely with mathematical libraries' | |
| }, | |
| { | |
| 'name': 'run_mathematical_computation', | |
| 'function': tool.run_mathematical_computation, | |
| 'description': 'Perform mathematical computations using numpy, scipy, sympy' | |
| }, | |
| { | |
| 'name': 'analyze_numerical_data', | |
| 'function': tool.analyze_numerical_data, | |
| 'description': 'Analyze numerical data with statistical operations' | |
| } | |
| ] | |
| if __name__ == "__main__": | |
| # Test the code execution tool | |
| tool = CodeExecutionTool() | |
| # Test basic mathematical computation | |
| test_code = """ | |
| import math | |
| result = math.sqrt(2) * math.pi | |
| print(f"Square root of 2 times pi: {result}") | |
| result | |
| """ | |
| print("Testing CodeExecutionTool:") | |
| print("=" * 50) | |
| result = tool.execute_python_code(test_code) | |
| print(result) | |
| print("=" * 50) | |
| # Test status | |
| status = tool.get_status() | |
| print("Tool Status:") | |
| print(json.dumps(status, indent=2)) |