Duibonduil's picture
Upload 2 files
3a3f97e verified
# coding: utf-8
# Copyright (c) 2025 inclusionAI.
import subprocess
import os
import signal
import sys
from typing import Any, Dict, Tuple
from aworld.config.conf import ToolConfig
from examples.tools.tool_action import ShellAction
from aworld.core.common import ActionModel, Observation, ActionResult
from aworld.core.tool.base import Tool, AgentInput, ToolFactory
from aworld.logs.util import logger
from aworld.tools.utils import build_observation
@ToolFactory.register(name='shell',
desc="shell execute tool",
supported_action=ShellAction,
conf_file_name=f'shell_tool.yaml')
class ShellTool(Tool):
"""
used to execute shell commands, providing initialization, execution, and exit functions.
"""
def __init__(self, conf: ToolConfig, **kwargs) -> None:
"""
Initialize the ShellTool
Args:
conf: tool config
**kwargs: -
"""
super(ShellTool, self).__init__(conf, **kwargs)
self.type = "function"
self.working_dir = self.conf.get('working_dir')
self.env = self.conf.get('env') if self.conf.get('env') else os.environ.copy()
self.processes = []
def reset(self, *, seed: int | None = None, options: Dict[str, str] | None = None) -> Tuple[
AgentInput, dict[str, Any]]:
"""
Reset the executor
Args:
seed: -
options: -
Returns:
AgentInput, dict[str, Any]: -
"""
self.working_dir = None
self.env = os.environ.copy()
self.processes = []
self._finished = False
return build_observation(observer=self.name(),
ability=ShellAction.EXECUTE_SCRIPT.value.name), {}
def close(self) -> None:
"""
Close the executor
Returns:
None
"""
try:
for process in self.processes:
# Check whether the process is still running
if process.poll() is None:
try:
# Try to gracefully terminate the process
if sys.platform != "win32":
os.kill(process.pid, signal.SIGTERM)
else:
process.terminate()
except Exception as e:
logger.warning(f"An error occurred while terminating the process. e: {str(e)}")
except Exception as e:
logger.warning(f"Error while exiting Shell Executor. e: {str(e)}")
finally:
# Clear process list
self.processes = []
self._finished = True
def do_step(self,
actions: list[ActionModel],
**kwargs) -> Tuple[Observation, float, bool, bool, dict[str, Any]]:
"""
Step the executor
Args:
actions: actions
**kwargs: -
Returns:
Observation, float, bool, bool, dict[str, Any]: -
"""
self._finished = False
reward = 0
fail_error = ""
observation = build_observation(observer=self.name(),
ability=ShellAction.EXECUTE_SCRIPT.value.name)
try:
if not actions:
return (observation, reward,
kwargs.get("terminated",
False), kwargs.get("truncated", False), {
"exception": "actions is empty"
})
for action in actions:
cmd_string = action.params.get("command", "")
if not cmd_string:
continue
_, output, error = self.execute(cmd_string)
observation.content = output
observation.action_result.append(
ActionResult(is_done=True,
success=False if error else True,
content=output,
error=error,
keep=False))
reward = 1
except Exception as e:
fail_error = str(e)
finally:
self._finished = True
info = {"exception": fail_error}
info.update(kwargs)
return (observation,
reward,
kwargs.get("terminated", False),
kwargs.get("truncated", False),
info)
def execute(self, script: str, capture_output: bool = True, timeout: int = 5):
"""
exec shell script
Args:
script (str): shell script to execute
capture_output (bool): whether to capture the script output
timeout (int, optional): Command execution timeout (seconds)
Returns:
dict: action result
"""
try:
if capture_output:
process_ = subprocess.run(
script,
shell=True,
cwd=self.working_dir,
env=self.env,
timeout=timeout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
return {
'success': process_.returncode == 0,
'return_code': process_.returncode,
'stdout': process_.stdout,
'stderr': process_.stderr,
'script': script
}
else:
process_ = subprocess.Popen(
script,
shell=True,
cwd=self.working_dir,
env=self.env
)
self.processes.append(process_)
process_.wait(timeout=timeout)
return {
'success': process_.returncode == 0,
'return_code': process_.returncode,
'script': script
}
except subprocess.TimeoutExpired:
return {
'success': False,
'error': 'Timeout',
'script': script
}
except Exception as e:
return {
'success': False,
'error': str(e),
'script': script
}
def execute_async(self, script: str):
"""
Execute shell script asynchronously (no waiting)
Args:
script (str): The shell script to execute
Returns:
subprocess.Popen: Process object
"""
try:
process_ = subprocess.Popen(
script,
shell=True,
cwd=self.working_dir,
env=self.env
)
self.processes.append(process_)
return process_
except Exception as e:
logger.warning(f"An error occurred while executing the script asynchronously. e: {str(e)}")
return None