Duibonduil's picture
Upload 3 files
6c4f72f verified
import logging
import asyncio
import uuid
from typing import Dict, List, Any, Optional
from aworld.sandbox.common import BaseSandbox
from aworld.sandbox.api.super.sandbox_api import SuperSandboxApi
from aworld.sandbox.models import SandboxStatus, SandboxEnvType, SandboxInfo
from aworld.sandbox.run.mcp_servers import McpServers
class SuperSandbox(BaseSandbox, SuperSandboxApi):
"""
Supercomputer sandbox implementation that runs on a supercomputer environment.
"""
def __init__(
self,
sandbox_id: Optional[str] = None,
metadata: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None,
mcp_servers: Optional[List[str]] = None,
mcp_config: Optional[Any] = None,
**kwargs
):
"""
Initialize a new SuperSandbox instance.
Args:
sandbox_id: Unique identifier for the sandbox. If None, one will be generated.
metadata: Additional metadata for the sandbox.
timeout: Timeout for sandbox operations.
mcp_servers: List of MCP servers to use.
mcp_config: Configuration for MCP servers.
**kwargs: Additional parameters for specific sandbox types.
"""
super().__init__(
sandbox_id=sandbox_id,
env_type=SandboxEnvType.SUPERCOMPUTER,
metadata=metadata,
timeout=timeout,
mcp_servers=mcp_servers,
mcp_config=mcp_config
)
if sandbox_id:
if not self._metadata:
return self
else:
raise ValueError("sandbox_id is not exist")
# Initialize properties
self._status = SandboxStatus.INIT
self._timeout = timeout or self.default_sandbox_timeout
self._metadata = metadata or {}
self._env_type = SandboxEnvType.SUPERCOMPUTER
self._mcp_servers = mcp_servers
self._mcp_config = mcp_config
# Ensure sandbox_id has a value in all cases
self._sandbox_id = sandbox_id or str(uuid.uuid4())
# If no sandbox_id provided, create a new sandbox
if not sandbox_id:
response = self._create_sandbox(
env_type=self._env_type,
env_config=None,
mcp_servers=mcp_servers,
mcp_config=mcp_config,
)
if not response:
self._status = SandboxStatus.ERROR
# If creation fails, keep the generated UUID as the ID
logging.warning(f"Failed to create super sandbox, using generated ID: {self._sandbox_id}")
else:
self._sandbox_id = response.sandbox_id
self._status = SandboxStatus.RUNNING
self._metadata = {
"status": getattr(response, 'status', None),
"host": getattr(response, 'host', None),
"mcp_config": getattr(response, 'mcp_config', None),
"env_type": getattr(response, 'env_type', None),
}
self._mcp_config = getattr(response, 'mcp_config', None)
# Initialize McpServers
self._mcpservers = McpServers(
mcp_servers,
self._mcp_config,
sandbox=self
)
async def remove(self) -> None:
"""
Remove sandbox.
"""
await self._remove_sandbox(
sandbox_id=self.sandbox_id,
metadata=self._metadata,
env_type=self._env_type
)
async def cleanup(self) -> None:
"""
Clean up Sandbox resources, including MCP server connections
"""
try:
if hasattr(self, '_mcpservers') and self._mcpservers:
await self._mcpservers.cleanup()
logging.info(f"Cleaned up MCP servers for sandbox {self.sandbox_id}")
except Exception as e:
logging.warning(f"Failed to cleanup MCP servers: {e}")
# Call the original remove method
try:
await self.remove()
except Exception as e:
logging.warning(f"Failed to remove sandbox: {e}")
def __del__(self):
"""
Ensure resources are cleaned up when the object is garbage collected
"""
try:
# Handle the case where an event loop already exists
try:
loop = asyncio.get_running_loop()
logging.warning("Cannot clean up sandbox in __del__ when event loop is already running")
return
except RuntimeError:
# No running event loop, create a new one
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self.cleanup())
loop.close()
except Exception as e:
logging.warning(f"Failed to cleanup sandbox resources during garbage collection: {e}")