Spaces:
Running
Running
""" | |
Secure Code Execution Tool for GAIA Agent | |
Provides safe Python code execution with mathematical computation capabilities. | |
Features: | |
- Secure sandboxed execution environment | |
- Mathematical libraries (numpy, scipy, sympy, pandas) | |
- Timeout and resource management | |
- Result validation and formatting | |
- Security restrictions and input sanitization | |
""" | |
import os | |
import sys | |
import ast | |
import subprocess | |
import tempfile | |
import time | |
import signal | |
import logging | |
import traceback | |
import re | |
from typing import Dict, Any, Optional, Union, List | |
from pathlib import Path | |
import json | |
# Mathematical and scientific computing libraries | |
try: | |
import numpy as np | |
NUMPY_AVAILABLE = True | |
except ImportError: | |
NUMPY_AVAILABLE = False | |
try: | |
import pandas as pd | |
PANDAS_AVAILABLE = True | |
except ImportError: | |
PANDAS_AVAILABLE = False | |
try: | |
import scipy | |
SCIPY_AVAILABLE = True | |
except ImportError: | |
SCIPY_AVAILABLE = False | |
try: | |
import sympy as sp | |
SYMPY_AVAILABLE = True | |
except ImportError: | |
SYMPY_AVAILABLE = False | |
try: | |
import matplotlib | |
matplotlib.use('Agg') # Non-interactive backend | |
import matplotlib.pyplot as plt | |
MATPLOTLIB_AVAILABLE = True | |
except ImportError: | |
MATPLOTLIB_AVAILABLE = False | |
logger = logging.getLogger(__name__) | |
class SecurityError(Exception): | |
"""Raised when code contains potentially dangerous operations.""" | |
pass | |
class ExecutionTimeoutError(Exception): | |
"""Raised when code execution exceeds timeout limit.""" | |
pass | |
class CodeSecurityValidator: | |
"""Validates Python code for security risks before execution.""" | |
# Dangerous imports and functions to block | |
BLOCKED_IMPORTS = { | |
'os', 'sys', 'subprocess', 'shutil', 'glob', 'pickle', 'marshal', | |
'importlib', '__import__', 'eval', 'exec', 'compile', 'open', | |
'file', 'input', 'raw_input', 'reload', 'vars', 'locals', 'globals', | |
'dir', 'hasattr', 'getattr', 'setattr', 'delattr', 'callable', | |
'socket', 'urllib', 'requests', 'http', 'ftplib', 'smtplib', | |
'telnetlib', 'poplib', 'imaplib', 'nntplib', 'ssl', 'hashlib', | |
'hmac', 'secrets', 'random', 'tempfile', 'threading', 'multiprocessing' | |
} | |
BLOCKED_FUNCTIONS = { | |
'eval', 'exec', 'compile', '__import__', 'open', 'file', 'input', | |
'raw_input', 'reload', 'vars', 'locals', 'globals', 'dir', | |
'hasattr', 'getattr', 'setattr', 'delattr', 'callable' | |
} | |
BLOCKED_ATTRIBUTES = { | |
'__class__', '__bases__', '__subclasses__', '__mro__', '__globals__', | |
'__code__', '__func__', '__self__', '__module__', '__dict__', | |
'__getattribute__', '__setattr__', '__delattr__', '__reduce__', | |
'__reduce_ex__', '__getstate__', '__setstate__' | |
} | |
def validate_code(self, code: str) -> bool: | |
""" | |
Validate Python code for security risks. | |
Args: | |
code: Python code string to validate | |
Returns: | |
True if code is safe, raises SecurityError if dangerous | |
""" | |
try: | |
# Parse the code into an AST | |
tree = ast.parse(code) | |
# Walk through all nodes in the AST | |
for node in ast.walk(tree): | |
self._check_node(node) | |
return True | |
except SyntaxError as e: | |
raise SecurityError(f"Syntax error in code: {e}") | |
except Exception as e: | |
raise SecurityError(f"Code validation failed: {e}") | |
def _check_node(self, node: ast.AST) -> None: | |
"""Check individual AST node for security risks.""" | |
# Check imports | |
if isinstance(node, (ast.Import, ast.ImportFrom)): | |
self._check_import(node) | |
# Check function calls | |
elif isinstance(node, ast.Call): | |
self._check_function_call(node) | |
# Check attribute access | |
elif isinstance(node, ast.Attribute): | |
self._check_attribute_access(node) | |
# Check name access | |
elif isinstance(node, ast.Name): | |
self._check_name_access(node) | |
def _check_import(self, node: Union[ast.Import, ast.ImportFrom]) -> None: | |
"""Check import statements for dangerous modules.""" | |
if isinstance(node, ast.Import): | |
for alias in node.names: | |
if alias.name in self.BLOCKED_IMPORTS: | |
raise SecurityError(f"Blocked import: {alias.name}") | |
elif isinstance(node, ast.ImportFrom): | |
if node.module and node.module in self.BLOCKED_IMPORTS: | |
raise SecurityError(f"Blocked import from: {node.module}") | |
def _check_function_call(self, node: ast.Call) -> None: | |
"""Check function calls for dangerous operations.""" | |
if isinstance(node.func, ast.Name): | |
if node.func.id in self.BLOCKED_FUNCTIONS: | |
raise SecurityError(f"Blocked function call: {node.func.id}") | |
def _check_attribute_access(self, node: ast.Attribute) -> None: | |
"""Check attribute access for dangerous attributes.""" | |
if node.attr in self.BLOCKED_ATTRIBUTES: | |
raise SecurityError(f"Blocked attribute access: {node.attr}") | |
def _check_name_access(self, node: ast.Name) -> None: | |
"""Check name access for blocked identifiers.""" | |
if node.id in self.BLOCKED_FUNCTIONS: | |
# Allow if it's being assigned to (not called) | |
if not isinstance(node.ctx, ast.Store): | |
raise SecurityError(f"Blocked name access: {node.id}") | |
class SecureCodeExecutor: | |
"""Secure Python code executor with mathematical capabilities.""" | |
def __init__(self, timeout: int = 30, memory_limit_mb: int = 512): | |
""" | |
Initialize secure code executor. | |
Args: | |
timeout: Maximum execution time in seconds | |
memory_limit_mb: Maximum memory usage in MB | |
""" | |
self.timeout = timeout | |
self.memory_limit_mb = memory_limit_mb | |
self.validator = CodeSecurityValidator() | |
# Available libraries status | |
self.available_libraries = { | |
'numpy': NUMPY_AVAILABLE, | |
'pandas': PANDAS_AVAILABLE, | |
'scipy': SCIPY_AVAILABLE, | |
'sympy': SYMPY_AVAILABLE, | |
'matplotlib': MATPLOTLIB_AVAILABLE | |
} | |
logger.info(f"SecureCodeExecutor initialized with {timeout}s timeout, {memory_limit_mb}MB limit") | |
logger.info(f"Available libraries: {[lib for lib, avail in self.available_libraries.items() if avail]}") | |
def execute_code(self, code: str, return_output: bool = True) -> Dict[str, Any]: | |
""" | |
Execute Python code securely and return results. | |
Args: | |
code: Python code to execute | |
return_output: Whether to capture and return output | |
Returns: | |
Dictionary with execution results | |
""" | |
start_time = time.time() | |
try: | |
# Validate code security | |
self.validator.validate_code(code) | |
# Prepare execution environment | |
execution_result = self._execute_in_subprocess(code, return_output) | |
execution_time = time.time() - start_time | |
return { | |
'success': True, | |
'result': execution_result.get('result'), | |
'output': execution_result.get('output', ''), | |
'error': None, | |
'execution_time': execution_time, | |
'libraries_used': self._detect_libraries_used(code) | |
} | |
except SecurityError as e: | |
return { | |
'success': False, | |
'result': None, | |
'output': '', | |
'error': f"Security violation: {e}", | |
'execution_time': time.time() - start_time, | |
'libraries_used': [] | |
} | |
except ExecutionTimeoutError as e: | |
return { | |
'success': False, | |
'result': None, | |
'output': '', | |
'error': f"Execution timeout: {e}", | |
'execution_time': self.timeout, | |
'libraries_used': [] | |
} | |
except Exception as e: | |
return { | |
'success': False, | |
'result': None, | |
'output': '', | |
'error': f"Execution error: {e}", | |
'execution_time': time.time() - start_time, | |
'libraries_used': [] | |
} | |
def _execute_in_subprocess(self, code: str, return_output: bool) -> Dict[str, Any]: | |
"""Execute code in a secure subprocess.""" | |
# Create temporary file for code execution | |
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: | |
# Prepare safe execution environment | |
safe_code = self._prepare_safe_code(code, return_output) | |
f.write(safe_code) | |
temp_file = f.name | |
try: | |
# Execute in subprocess with timeout and resource limits | |
result = subprocess.run( | |
[sys.executable, temp_file], | |
capture_output=True, | |
text=True, | |
timeout=self.timeout, | |
cwd=tempfile.gettempdir() # Run in temp directory | |
) | |
if result.returncode == 0: | |
# Parse output | |
output_lines = result.stdout.strip().split('\n') | |
if return_output and output_lines: | |
# Last line should be the result if we added result capture | |
if output_lines[-1].startswith('RESULT:'): | |
result_str = output_lines[-1][7:] # Remove 'RESULT:' prefix | |
output = '\n'.join(output_lines[:-1]) | |
try: | |
# Try to parse as JSON for complex types | |
parsed_result = json.loads(result_str) | |
except: | |
# Fall back to string result | |
parsed_result = result_str | |
return { | |
'result': parsed_result, | |
'output': output | |
} | |
else: | |
return { | |
'result': None, | |
'output': result.stdout | |
} | |
else: | |
return { | |
'result': None, | |
'output': result.stdout | |
} | |
else: | |
raise Exception(f"Code execution failed: {result.stderr}") | |
except subprocess.TimeoutExpired: | |
raise ExecutionTimeoutError(f"Code execution exceeded {self.timeout} seconds") | |
finally: | |
# Clean up temporary file | |
try: | |
os.unlink(temp_file) | |
except: | |
pass | |
def _prepare_safe_code(self, code: str, capture_result: bool) -> str: | |
"""Prepare code for safe execution with necessary imports and result capture.""" | |
safe_imports = [] | |
# Add available mathematical libraries | |
if NUMPY_AVAILABLE: | |
safe_imports.append("import numpy as np") | |
if PANDAS_AVAILABLE: | |
safe_imports.append("import pandas as pd") | |
if SCIPY_AVAILABLE: | |
safe_imports.append("import scipy") | |
safe_imports.append("from scipy import stats, optimize, integrate, linalg") | |
if SYMPY_AVAILABLE: | |
safe_imports.append("import sympy as sp") | |
safe_imports.append("from sympy import symbols, solve, diff, integrate as sp_integrate, simplify, expand, factor") | |
if MATPLOTLIB_AVAILABLE: | |
safe_imports.append("import matplotlib") | |
safe_imports.append("matplotlib.use('Agg')") | |
safe_imports.append("import matplotlib.pyplot as plt") | |
# Add basic math and other safe imports | |
safe_imports.extend([ | |
"import math", | |
"import cmath", | |
"import decimal", | |
"import fractions", | |
"import statistics", | |
"import itertools", | |
"import functools", | |
"import operator", | |
"import json" | |
]) | |
# Prepare the complete code | |
complete_code = '\n'.join(safe_imports) + '\n\n' | |
if capture_result: | |
# Wrap user code to capture the last expression result | |
complete_code += ''' | |
# User code execution | |
import sys | |
from io import StringIO | |
# Capture stdout | |
old_stdout = sys.stdout | |
sys.stdout = captured_output = StringIO() | |
try: | |
# Execute user code and capture result | |
user_code = """''' + code.replace('"""', '\\"\\"\\"') + '''""" | |
# Execute the code | |
exec(user_code) | |
# Try to capture the result of the last expression | |
import ast | |
try: | |
tree = ast.parse(user_code) | |
if tree.body and isinstance(tree.body[-1], ast.Expr): | |
# Last statement is an expression, evaluate it | |
last_expr = ast.Expression(tree.body[-1].value) | |
result = eval(compile(last_expr, '<string>', 'eval')) | |
print(f"RESULT:{json.dumps(result) if isinstance(result, (int, float, str, list, dict, bool)) else str(result)}") | |
else: | |
print("RESULT:None") | |
except: | |
print("RESULT:None") | |
finally: | |
# Restore stdout and print captured output | |
sys.stdout = old_stdout | |
output = captured_output.getvalue() | |
if output: | |
print(output, end='') | |
''' | |
else: | |
complete_code += code | |
return complete_code | |
def _detect_libraries_used(self, code: str) -> List[str]: | |
"""Detect which mathematical libraries are used in the code.""" | |
libraries_used = [] | |
# Simple detection based on import statements and usage | |
if 'numpy' in code or 'np.' in code: | |
libraries_used.append('numpy') | |
if 'pandas' in code or 'pd.' in code: | |
libraries_used.append('pandas') | |
if 'scipy' in code: | |
libraries_used.append('scipy') | |
if 'sympy' in code or 'sp.' in code: | |
libraries_used.append('sympy') | |
if 'matplotlib' in code or 'plt.' in code: | |
libraries_used.append('matplotlib') | |
if 'math.' in code: | |
libraries_used.append('math') | |
return libraries_used | |
class CodeExecutionTool: | |
"""AGNO-compatible tool for secure Python code execution.""" | |
def __init__(self, timeout: int = 30, memory_limit_mb: int = 512): | |
"""Initialize the code execution tool.""" | |
self.executor = SecureCodeExecutor(timeout, memory_limit_mb) | |
self.available = True | |
logger.info("CodeExecutionTool initialized successfully") | |
def execute_python_code(self, code: str) -> str: | |
""" | |
Execute Python code and return the result. | |
Args: | |
code: Python code to execute | |
Returns: | |
Formatted result string | |
""" | |
result = self.executor.execute_code(code, return_output=True) | |
if result['success']: | |
output_parts = [] | |
if result['output']: | |
output_parts.append(f"Output:\n{result['output']}") | |
if result['result'] is not None: | |
output_parts.append(f"Result: {result['result']}") | |
if result['libraries_used']: | |
output_parts.append(f"Libraries used: {', '.join(result['libraries_used'])}") | |
output_parts.append(f"Execution time: {result['execution_time']:.3f}s") | |
return '\n'.join(output_parts) | |
else: | |
return f"Error: {result['error']}" | |
def run_mathematical_computation(self, expression: str) -> str: | |
""" | |
Run a mathematical computation using available libraries. | |
Args: | |
expression: Mathematical expression or computation | |
Returns: | |
Computation result | |
""" | |
# Prepare code for mathematical computation | |
code = f""" | |
# Mathematical computation | |
result = {expression} | |
print(f"Computation: {expression}") | |
print(f"Result: {{result}}") | |
result | |
""" | |
return self.execute_python_code(code) | |
def analyze_numerical_data(self, data: str, operation: str = "basic_stats") -> str: | |
""" | |
Analyze numerical data using pandas and numpy. | |
Args: | |
data: Data as string (comma-separated values or JSON) | |
operation: Type of analysis to perform | |
Returns: | |
Analysis results | |
""" | |
code = f""" | |
import json | |
# Parse data | |
try: | |
data = json.loads('{data}') | |
except: | |
data = [float(x.strip()) for x in '{data}'.split(',') if x.strip()] | |
# Convert to numpy array for analysis | |
data_array = np.array(data) | |
# Perform analysis | |
if '{operation}' == 'basic_stats': | |
result = {{ | |
'mean': float(np.mean(data_array)), | |
'median': float(np.median(data_array)), | |
'std': float(np.std(data_array)), | |
'min': float(np.min(data_array)), | |
'max': float(np.max(data_array)), | |
'sum': float(np.sum(data_array)), | |
'count': len(data_array) | |
}} | |
elif '{operation}' == 'advanced_stats': | |
result = {{ | |
'mean': float(np.mean(data_array)), | |
'variance': float(np.var(data_array)), | |
'skewness': float(stats.skew(data_array)) if 'stats' in globals() else 'N/A', | |
'kurtosis': float(stats.kurtosis(data_array)) if 'stats' in globals() else 'N/A', | |
'percentiles': {{ | |
'25th': float(np.percentile(data_array, 25)), | |
'50th': float(np.percentile(data_array, 50)), | |
'75th': float(np.percentile(data_array, 75)) | |
}} | |
}} | |
else: | |
result = 'Unknown operation' | |
print(f"Data analysis ({operation}):") | |
print(f"Data: {{data}}") | |
print(f"Results: {{result}}") | |
result | |
""" | |
return self.execute_python_code(code) | |
def get_status(self) -> Dict[str, Any]: | |
"""Get tool status and capabilities.""" | |
return { | |
'available': self.available, | |
'timeout': self.executor.timeout, | |
'memory_limit_mb': self.executor.memory_limit_mb, | |
'available_libraries': self.executor.available_libraries, | |
'security_features': [ | |
'AST-based code validation', | |
'Subprocess isolation', | |
'Import restrictions', | |
'Function call blocking', | |
'Attribute access control', | |
'Timeout protection', | |
'Memory limits' | |
] | |
} | |
# AGNO tool registration functions | |
def get_code_execution_tools(): | |
"""Get code execution tools for AGNO registration.""" | |
tool = CodeExecutionTool() | |
# Return tool methods that can be called by AGNO | |
return [ | |
{ | |
'name': 'execute_python_code', | |
'function': tool.execute_python_code, | |
'description': 'Execute Python code securely with mathematical libraries' | |
}, | |
{ | |
'name': 'run_mathematical_computation', | |
'function': tool.run_mathematical_computation, | |
'description': 'Perform mathematical computations using numpy, scipy, sympy' | |
}, | |
{ | |
'name': 'analyze_numerical_data', | |
'function': tool.analyze_numerical_data, | |
'description': 'Analyze numerical data with statistical operations' | |
} | |
] | |
if __name__ == "__main__": | |
# Test the code execution tool | |
tool = CodeExecutionTool() | |
# Test basic mathematical computation | |
test_code = """ | |
import math | |
result = math.sqrt(2) * math.pi | |
print(f"Square root of 2 times pi: {result}") | |
result | |
""" | |
print("Testing CodeExecutionTool:") | |
print("=" * 50) | |
result = tool.execute_python_code(test_code) | |
print(result) | |
print("=" * 50) | |
# Test status | |
status = tool.get_status() | |
print("Tool Status:") | |
print(json.dumps(status, indent=2)) |