File size: 4,814 Bytes
664c17b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import os
import io
import sys
import uuid
import base64
import traceback
import contextlib
import tempfile
import subprocess
from typing import Dict, List, Any, Optional, Union
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image
from smolagents.tools import Tool

class CodeExecutionTool(Tool):
    name = "code_execution"
    description = "Execute Python code with support for data analysis, plotting, and file operations. Returns text output and base64-encoded images for plots."
    inputs = {
        'code': {'type': 'string', 'description': 'The Python code to execute'},
        'input_data': {'type': 'object', 'description': 'Optional input data for the code execution', 'nullable': True}
    }
    output_type = "object"

    def __init__(self, work_dir="code_execution"):
        super().__init__()
        self.work_dir = work_dir
        os.makedirs(work_dir, exist_ok=True)
        self._setup_matplotlib()
        
    def _setup_matplotlib(self):
        """Configure matplotlib for non-interactive backend"""
        plt.switch_backend('Agg')
    
    def _capture_output(self, code: str) -> Dict[str, Any]:
        """Execute code and capture output, including stdout, stderr, and plots"""
        # Create string buffers for stdout and stderr
        stdout_buffer = io.StringIO()
        stderr_buffer = io.StringIO()
        result = {
            'output': '',
            'error': '',
            'images': [],
            'dataframes': []
        }
        
        # Create a temporary namespace for code execution
        namespace = {
            'np': np,
            'pd': pd,
            'plt': plt,
            'Image': Image,
        }
        
        try:
            # Execute in controlled environment
            with contextlib.redirect_stdout(stdout_buffer), \
                 contextlib.redirect_stderr(stderr_buffer):
                
                # Execute the code
                exec(code, namespace)
                
                # Capture any active plots
                if plt.get_fignums():
                    for fig_num in plt.get_fignums():
                        fig = plt.figure(fig_num)
                        img_buffer = io.BytesIO()
                        fig.savefig(img_buffer, format='png')
                        img_buffer.seek(0)
                        img_data = base64.b64encode(img_buffer.getvalue()).decode()
                        result['images'].append(img_data)
                        plt.close(fig)
                
                # Capture any DataFrames in the namespace
                for var_name, var_value in namespace.items():
                    if isinstance(var_value, pd.DataFrame):
                        result['dataframes'].append({
                            'name': var_name,
                            'data': var_value.to_dict(orient='records')
                        })
            
            # Get output from buffers
            result['output'] = stdout_buffer.getvalue()
            result['error'] = stderr_buffer.getvalue()
            
        except Exception as e:
            result['error'] = f"Execution error: {str(e)}\n{traceback.format_exc()}"
        
        finally:
            stdout_buffer.close()
            stderr_buffer.close()
        
        return result

    def forward(self, code: str, input_data: Optional[Dict] = None) -> Dict[str, Any]:
        if not code:
            return {"error": "Error: No code provided to execute."}
        
        # If input_data is provided, add it to the setup code
        setup_code = ""
        if input_data:
            for var_name, var_value in input_data.items():
                if isinstance(var_value, (str, int, float, list, dict)):
                    setup_code += f"{var_name} = {repr(var_value)}\n"
        
        # Combine setup code with user code
        full_code = setup_code + code
        
        try:
            # Execute the code and capture all output
            result = self._capture_output(full_code)
            return result
        except Exception as e:
            return {
                "error": f"Error executing code: {str(e)}",
                "output": "",
                "images": [],
                "dataframes": []
            }

    def __del__(self):
        """Cleanup any temporary files"""
        try:
            if os.path.exists(self.work_dir):
                for root, dirs, files in os.walk(self.work_dir, topdown=False):
                    for name in files:
                        os.remove(os.path.join(root, name))
                    for name in dirs:
                        os.rmdir(os.path.join(root, name))
                os.rmdir(self.work_dir)
        except Exception:
            pass  # Ignore cleanup errors