| | import ast |
| | import contextlib |
| | import io |
| | import importlib |
| | import sys |
| | import os |
| | import traceback |
| | from typing import List, Set, Optional, Union, Dict |
| | from .interpreter_base import BaseInterpreter |
| | from .tool import Tool,Toolkit |
| | from .storage_handler import FileStorageHandler |
| | from pydantic import Field |
| |
|
| | |
| | DEFAULT_ENCODING = 'utf-8' |
| |
|
| | class PythonInterpreter(BaseInterpreter): |
| |
|
| | project_path:Optional[str] = Field(default=".", description="Path to the project directory") |
| | directory_names:Optional[List[str]] = Field(default_factory=list, description="List of directory names to check for imports") |
| | allowed_imports:Optional[Set[str]] = Field(default_factory=set, description="Set of allowed imports") |
| |
|
| | def __init__( |
| | self, |
| | name: str = 'PythonInterpreter', |
| | project_path:Optional[str] = ".", |
| | directory_names:Optional[List[str]] = [], |
| | allowed_imports:Optional[Set[str]] = None, |
| | storage_handler: FileStorageHandler = None, |
| | **kwargs |
| | ): |
| | """ |
| | Initialize a Python interpreter for executing code in a controlled environment. |
| | |
| | Args: |
| | name (str): The name of the interpreter |
| | project_path (Optional[str]): Path to the project directory for module resolution |
| | directory_names (Optional[List[str]]): List of directory names to check for imports |
| | allowed_imports (Optional[Set[str]]): Set of allowed module imports to enforce security |
| | storage_handler (Optional[FileStorageHandler]): Storage handler for file operations |
| | **kwargs: Additional data to pass to the parent class |
| | """ |
| | super().__init__( |
| | name=name, |
| | project_path=project_path, |
| | directory_names=directory_names, |
| | allowed_imports=allowed_imports, |
| | **kwargs |
| | ) |
| | self.allowed_imports = allowed_imports or set() |
| | self.namespace = {} |
| | self.visited_modules = {} |
| | |
| | |
| | if storage_handler is None: |
| | from .storage_handler import LocalStorageHandler |
| | self.storage_handler = LocalStorageHandler(base_path="./workplace/interpreter") |
| | else: |
| | self.storage_handler = storage_handler |
| |
|
| | def _get_file_and_folder_names(self, target_path: str) -> List[str]: |
| | """Retrieves the names of files and folders (without extensions) in a given directory. |
| | Args: |
| | target_path (str): Path to the target directory. |
| | Returns: |
| | List[str]: List of file and folder names (excluding extensions). |
| | """ |
| | names = [] |
| | for item in os.listdir(target_path): |
| | name, _ = os.path.splitext(item) |
| | names.append(name) |
| | return names |
| |
|
| | def _extract_definitions(self, module_name: str, path: str, potential_names: Optional[Set[str]] = None) -> List[str]: |
| | """Extracts function and class definitions from a module file while ensuring safety. |
| | Args: |
| | module_name (str): The name of the module. |
| | path (str): The file path of the module. |
| | potential_names (Optional[Set[str]]): The specific functions/classes to import (for ImportFrom). |
| | Returns: |
| | List[str]: A list of violations found during analysis. An empty list indicates no issues. |
| | """ |
| | if path in self.namespace: |
| | return [] |
| | |
| | try: |
| | |
| | module_spec = importlib.util.spec_from_file_location(module_name, path) |
| | loaded_module = importlib.util.module_from_spec(module_spec) |
| | module_spec.loader.exec_module(loaded_module) |
| |
|
| | |
| | self.namespace[module_name] = loaded_module |
| |
|
| | except Exception: |
| | exc_type, exc_value, exc_tb = sys.exc_info() |
| | return ["".join(traceback.format_exception(exc_type, exc_value, exc_tb))] |
| |
|
| | |
| | |
| | result = self.storage_handler.read(path) |
| | if result["success"]: |
| | code = result["content"] |
| | else: |
| | raise FileNotFoundError(f"Could not read file {path}: {result.get('error', 'Unknown error')}") |
| |
|
| | |
| | violations = self._analyze_code(code) |
| | if violations: |
| | return violations |
| |
|
| | |
| | tree = ast.parse(code) |
| | available_symbols = {} |
| |
|
| | for node in tree.body: |
| | if isinstance(node, (ast.FunctionDef, ast.ClassDef)): |
| | available_symbols[node.name] = node |
| |
|
| | |
| | try: |
| | spec = importlib.util.spec_from_file_location(module_name, path) |
| | module = importlib.util.module_from_spec(spec) |
| | spec.loader.exec_module(module) |
| |
|
| | if potential_names is None: |
| | |
| | for name in available_symbols: |
| | if hasattr(module, name): |
| | self.namespace[name] = getattr(module, name) |
| | else: |
| | |
| | for name in potential_names: |
| | if name in available_symbols and hasattr(module, name): |
| | self.namespace[name] = getattr(module, name) |
| | else: |
| | violations.append(f"Function or class '{name}' not found in {module_name}") |
| |
|
| | except Exception: |
| | exc_type, exc_value, exc_tb = sys.exc_info() |
| | return ["".join(traceback.format_exception(exc_type, exc_value, exc_tb))] |
| |
|
| |
|
| | return violations |
| |
|
| | def _check_project(self, module: Union[ast.Import, ast.ImportFrom]) -> List[str]: |
| | """Checks and imports a local project module while ensuring safety. |
| | |
| | Args: |
| | module (Union[ast.Import, ast.ImportFrom]): The AST import node representing the module. |
| | |
| | Returns: |
| | List[str]: A list of violations found during analysis. |
| | """ |
| | |
| | if isinstance(module, ast.Import): |
| | module_name = module.name |
| | potential_names = None |
| | else: |
| | module_name = module.module |
| | potential_names = {name.name for name in module.names} |
| |
|
| | |
| | if len(module_name.split(".")) > 1: |
| | module_path = os.path.join(self.project_path, *module_name.split(".")) + ".py" |
| | else: |
| | module_path = os.path.join(self.project_path, module_name + ".py") |
| |
|
| | |
| | if os.path.exists(module_path): |
| | violations = self._extract_definitions(module_name, module_path, potential_names) |
| | else: |
| | return [f"Module not found: {module_name}"] |
| |
|
| | if violations: |
| | return violations |
| |
|
| | |
| | try: |
| | module_spec = importlib.util.spec_from_file_location(module_name, module_path) |
| | loaded_module = importlib.util.module_from_spec(module_spec) |
| | module_spec.loader.exec_module(loaded_module) |
| |
|
| | |
| | self.namespace[module_name] = loaded_module |
| | |
| | except Exception: |
| | exc_type, exc_value, exc_tb = sys.exc_info() |
| | return ["".join(traceback.format_exception(exc_type, exc_value, exc_tb))] |
| |
|
| | return violations |
| |
|
| | def _execute_import(self, import_module: ast.Import) -> List[str]: |
| | """Processes an import statement, verifying permissions and adding modules to the namespace. |
| | |
| | Args: |
| | import_module (ast.Import): The AST node representing an import statement. |
| | |
| | Returns: |
| | List[str]: A list of violations found during import handling. |
| | """ |
| | violations = [] |
| | |
| | for module in import_module.names: |
| | |
| | if module.name.split(".")[0] in self.directory_names: |
| | violations += self._check_project(module) |
| | continue |
| |
|
| | |
| | if module.name not in self.allowed_imports: |
| | violations.append(f"Unauthorized import: {module.name}") |
| | return violations |
| |
|
| | |
| | try: |
| | alias = module.asname or module.name |
| | imported_module = importlib.import_module(module.name) |
| | self.namespace[alias] = imported_module |
| | except ImportError: |
| | exc_type, exc_value, exc_tb = sys.exc_info() |
| | violations.append("".join(traceback.format_exception(exc_type, exc_value, exc_tb))) |
| |
|
| | return violations |
| |
|
| | def _execute_import_from(self, import_from: ast.ImportFrom) -> List[str]: |
| | """Processes a 'from module import name' statement, ensuring safety and adding modules to the namespace. |
| | |
| | Args: |
| | import_from (ast.ImportFrom): The AST node representing an 'import from' statement. |
| | |
| | Returns: |
| | List[str]: A list of violations found during import handling. |
| | """ |
| | |
| | if import_from.module is None: |
| | return ["'from . import' is not supported."] |
| |
|
| | |
| | if import_from.module.split(".")[0] in self.directory_names: |
| | return self._check_project(import_from) |
| |
|
| | |
| | if import_from.module not in self.allowed_imports: |
| | return [f"Unauthorized import: {import_from.module}"] |
| |
|
| | try: |
| | |
| | for import_name in import_from.names: |
| | imported_module = importlib.import_module(import_from.module) |
| | alias = import_name.asname or import_name.name |
| | self.namespace[alias] = getattr(imported_module, import_name.name) |
| | return [] |
| | except ImportError: |
| | exc_type, exc_value, exc_tb = sys.exc_info() |
| | return ["".join(traceback.format_exception(exc_type, exc_value, exc_tb))] |
| |
|
| | def _analyze_code(self, code: str) -> List[str]: |
| | """Parses and analyzes the code for import violations before execution. |
| | |
| | Args: |
| | code (str): The raw Python code to analyze. |
| | |
| | Returns: |
| | List[str]: A list of violations detected in the code. |
| | """ |
| | violations = [] |
| |
|
| | try: |
| | |
| | tree = ast.parse(code) |
| |
|
| | |
| | for node in ast.walk(tree): |
| | if isinstance(node, ast.Import): |
| | violations += self._execute_import(node) |
| | elif isinstance(node, ast.ImportFrom): |
| | violations += self._execute_import_from(node) |
| | except SyntaxError: |
| | exc_type, exc_value, exc_tb = sys.exc_info() |
| | violations.append("".join(traceback.format_exception(exc_type, exc_value, exc_tb))) |
| |
|
| | return violations |
| |
|
| | def execute(self, code: str, language: str = "python") -> str: |
| | """ |
| | Analyzes and executes the provided Python code in a controlled environment. |
| | |
| | NOTE: This method only returns content printed to stdout during execution. |
| | It does not return any values from the code itself. To see results, use |
| | print statements in your code. |
| | |
| | WARNING: This method uses Python's exec() function internally, which executes |
| | code with full privileges. While safety checks are performed, there is still |
| | a security risk. Do not use with untrusted code. |
| | |
| | Args: |
| | code (str): The Python code to execute. |
| | language (str, optional): The programming language of the code. Defaults to "python". |
| | |
| | Returns: |
| | str: The output of the executed code (printed content only), or a list of violations if found. |
| | """ |
| | |
| | if language.lower() != "python": |
| | return f"Error: This interpreter only supports Python language. Received: {language}" |
| | |
| | self.visited_modules = {} |
| | self.namespace = {} |
| |
|
| | |
| | if not self.project_path: |
| | raise ValueError("Project path (project_path) is not set") |
| | |
| | if not os.path.exists(self.project_path): |
| | raise ValueError(f"Project path '{self.project_path}' does not exist") |
| | |
| | if not os.path.isdir(self.project_path): |
| | raise ValueError(f"Project path '{self.project_path}' is not a directory") |
| | |
| | os.chdir(self.project_path) |
| | sys.path.insert(0, self.project_path) |
| |
|
| | if self.allowed_imports: |
| | violations = self._analyze_code(code) |
| | if violations: |
| | return"\n".join(violations) |
| | |
| |
|
| | |
| | stdout_capture = io.StringIO() |
| | with contextlib.redirect_stdout(stdout_capture): |
| | try: |
| | |
| | exec(code, {"__builtins__": __builtins__}) |
| | except Exception: |
| | exc_type, exc_value, exc_tb = sys.exc_info() |
| | error_msg = "".join(traceback.format_exception(exc_type, exc_value, exc_tb)) |
| | return error_msg |
| |
|
| | |
| | return stdout_capture.getvalue().strip() |
| |
|
| | def execute_script(self, file_path: str, language: str = "python") -> str: |
| | """ |
| | Reads Python code from a file and executes it using the `execute` method. |
| | |
| | NOTE: This method only returns content printed to stdout during execution. |
| | It does not return any values from the code itself. To see results, use |
| | print statements in your code. |
| | |
| | WARNING: This method uses Python's exec() function internally, which executes |
| | code with full privileges. While safety checks are performed, there is still |
| | a security risk. Do not use with untrusted code. |
| | |
| | Args: |
| | file_path (str): The path to the Python file to be executed. |
| | language (str, optional): The programming language of the code. Defaults to "python". |
| | |
| | Returns: |
| | str: The output of the executed code (printed content only), or an error message if the execution fails. |
| | """ |
| | |
| | |
| | result = self.storage_handler.read(file_path) |
| | if result["success"]: |
| | code = result["content"] |
| | else: |
| | return f"Error: Could not read file '{file_path}': {result.get('error', 'Unknown error')}" |
| | |
| | return self.execute(code, language) |
| | |
| |
|
| | class PythonExecuteTool(Tool): |
| | name: str = "python_execute" |
| | description: str = "Execute Python code in a controlled environment with safety checks" |
| | inputs: Dict[str, Dict[str, str]] = { |
| | "code": { |
| | "type": "string", |
| | "description": "The Python code to execute" |
| | }, |
| | "language": { |
| | "type": "string", |
| | "description": "The programming language of the code (only 'python' is supported)" |
| | } |
| | } |
| | required: Optional[List[str]] = ["code"] |
| | |
| | def __init__(self, python_interpreter: PythonInterpreter = None): |
| | super().__init__() |
| | self.python_interpreter = python_interpreter |
| | |
| | def __call__(self, code: str, language: str = "python") -> str: |
| | """Execute Python code using the Python interpreter.""" |
| | if not self.python_interpreter: |
| | raise RuntimeError("Python interpreter not initialized") |
| | |
| | try: |
| | return self.python_interpreter.execute(code, language) |
| | except Exception as e: |
| | return f"Error executing code: {str(e)}" |
| |
|
| |
|
| | class PythonExecuteScriptTool(Tool): |
| | name: str = "python_execute_script" |
| | description: str = "Execute Python code from a file in a controlled environment with safety checks" |
| | inputs: Dict[str, Dict[str, str]] = { |
| | "file_path": { |
| | "type": "string", |
| | "description": "The path to the Python file to be executed" |
| | }, |
| | "language": { |
| | "type": "string", |
| | "description": "The programming language of the code (only 'python' is supported)" |
| | } |
| | } |
| | required: Optional[List[str]] = ["file_path"] |
| | |
| | def __init__(self, python_interpreter: PythonInterpreter = None): |
| | super().__init__() |
| | self.python_interpreter = python_interpreter |
| | |
| | def __call__(self, file_path: str, language: str = "python") -> str: |
| | """Execute Python script file using the Python interpreter.""" |
| | if not self.python_interpreter: |
| | raise RuntimeError("Python interpreter not initialized") |
| | |
| | try: |
| | return self.python_interpreter.execute_script(file_path, language) |
| | except Exception as e: |
| | return f"Error executing script: {str(e)}" |
| |
|
| |
|
| | class PythonInterpreterToolkit(Toolkit): |
| | def __init__( |
| | self, |
| | name: str = "PythonInterpreterToolkit", |
| | project_path: Optional[str] = ".", |
| | directory_names: Optional[List[str]] = None, |
| | allowed_imports: Optional[Set[str]] = None, |
| | storage_handler: FileStorageHandler = None, |
| | **kwargs |
| | ): |
| | |
| | if storage_handler is None: |
| | from .storage_handler import LocalStorageHandler |
| | storage_handler = LocalStorageHandler(base_path="./workplace/python") |
| | |
| | |
| | python_interpreter = PythonInterpreter( |
| | name="PythonInterpreter", |
| | project_path=project_path, |
| | directory_names=directory_names or [], |
| | allowed_imports=allowed_imports, |
| | storage_handler=storage_handler, |
| | **kwargs |
| | ) |
| | |
| | |
| | tools = [ |
| | PythonExecuteTool(python_interpreter=python_interpreter), |
| | PythonExecuteScriptTool(python_interpreter=python_interpreter) |
| | ] |
| | |
| | |
| | super().__init__(name=name, tools=tools) |
| | |
| | |
| | self.python_interpreter = python_interpreter |
| | |
| |
|
| | |
| |
|