Duibonduil's picture
Upload 9 files
3e8c06e verified
import logging
import traceback
from typing import Callable, List, Dict, Any, Optional, Union
import inspect
from aworld.output.artifact import Artifact
class WorkspaceObserver:
"""Base class for workspace observers"""
async def on_create(self, workspace_id: str, artifact: Artifact) -> None:
pass
async def on_update(self, workspace_id: str, artifact: Artifact) -> None:
pass
async def on_delete(self, workspace_id: str, artifact: Artifact) -> None:
pass
class Handler:
"""Handler wrapper to support both functions and class methods"""
def __init__(self, func: Callable, instance=None, workspace_id: Optional[str] = None, filters: Optional[Dict[str, Any]] = None):
self.func = func
self.instance = instance # Class instance if method
self.workspace_id = workspace_id # Specific workspace to monitor
self.filters = filters or {} # Additional filters (e.g., artifact_type)
async def __call__(self, artifact: Artifact, **kwargs) -> Any:
"""Call the handler with appropriate arguments"""
# Check if this handler should process the artifact
if self.workspace_id and kwargs.get('workspace_id') != self.workspace_id:
return None
# Check additional filters
for key, value in self.filters.items():
if key == 'artifact_type':
if artifact.artifact_type != value and artifact.artifact_type.value != value:
return None
elif key in artifact.metadata and artifact.metadata[key] != value:
return None
# Get function signature to determine what arguments it expects
sig = inspect.signature(self.func)
param_count = len(sig.parameters)
# Call based on whether it's a method or function, and parameter count
if self.instance:
if param_count == 0: # Just self
return await self.func() if inspect.iscoroutinefunction(self.func) else self.func()
elif param_count == 1: # Self + artifact
return await self.func(artifact) if inspect.iscoroutinefunction(self.func) else self.func(artifact)
else: # Self + artifact + kwargs
return await self.func(artifact, **kwargs) if inspect.iscoroutinefunction(self.func) else self.func(artifact, **kwargs)
else:
if param_count == 0: # No parameters
return await self.func() if inspect.iscoroutinefunction(self.func) else self.func()
elif param_count == 1: # Just artifact
return await self.func(artifact) if inspect.iscoroutinefunction(self.func) else self.func(artifact)
else: # Artifact + kwargs
return await self.func(artifact, **kwargs) if inspect.iscoroutinefunction(self.func) else self.func(artifact, **kwargs)
class DecoratorBasedObserver(WorkspaceObserver):
"""Enhanced decorator-based observer implementation"""
def __init__(self):
self.create_handlers: List[Handler] = []
self.update_handlers: List[Handler] = []
self.delete_handlers: List[Handler] = []
async def on_create(self, workspace_id: str, artifact: Artifact, **kwargs) -> List[Any]:
"""Process artifact creation with all handlers"""
results = []
for handler in self.create_handlers:
try:
result = await handler(workspace_id=workspace_id, artifact=artifact, **kwargs)
if result is not None:
results.append(result)
except Exception as e:
print(f"Create handler failed: error is {e}: {traceback.format_exc()}")
return results
async def on_update(self, artifact: Artifact, **kwargs) -> List[Any]:
"""Process artifact update with all handlers"""
results = []
for handler in self.update_handlers:
try:
result = await handler(artifact, **kwargs)
if result is not None:
results.append(result)
except Exception as e:
print(f"Update handler failed: {e}")
return results
async def on_delete(self, artifact: Artifact, **kwargs) -> List[Any]:
"""Process artifact deletion with all handlers"""
results = []
for handler in self.delete_handlers:
try:
result = await handler(artifact, **kwargs)
if result is not None:
results.append(result)
except Exception as e:
print(f"Delete handler failed: {e}")
return results
def register_create_handler(self, func, instance=None, workspace_id=None, filters=None):
"""Register a handler for artifact creation"""
logging.info(f"[📂WORKSPACE]✨ Registering create handler for {func}")
self.create_handlers.append(Handler(func, instance, workspace_id, filters))
return func
def un_register_create_handler(self, func, instance=None, workspace_id=None):
"""Register a handler for artifact creation"""
logging.info(f"[📂WORKSPACE] UnRegister create handler for {func}")
for handler in self.create_handlers:
if handler.func == func:
self.create_handlers.remove(handler)
logging.info(f"[📂WORKSPACE] UnRegister create handler for {func} success")
def register_update_handler(self, func, instance=None, workspace_id=None, filters=None):
"""Register a handler for artifact update"""
logging.info(f"[📂WORKSPACE]✨ Registering update handler for {func}")
self.update_handlers.append(Handler(func, instance, workspace_id, filters))
return func
def register_delete_handler(self, func, instance=None, workspace_id=None, filters=None):
"""Register a handler for artifact deletion"""
logging.info(f"[📂WORKSPACE]✨ Registering delete handler for {func}")
self.delete_handlers.append(Handler(func, instance, workspace_id, filters))
return func
# Global observer instance
_observer = DecoratorBasedObserver()
def get_observer() -> DecoratorBasedObserver:
"""Get the global observer instance"""
return _observer
def on_artifact_create(func=None, workspace_id=None, filters=None):
"""
Decorator for artifact creation handlers
Can be used as a simple decorator (@on_artifact_create) or with parameters:
@on_artifact_create(workspace_id='abc', filters={'artifact_type': 'WEB_PAGES'})
"""
if func is None:
# Called with parameters
def decorator(f):
return _observer.register_create_handler(f, None, workspace_id, filters)
return decorator
# Called as simple decorator
return _observer.register_create_handler(func)
def on_artifact_update(func=None, workspace_id=None, filters=None):
"""
Decorator for artifact update handlers
Can be used as a simple decorator (@on_artifact_update) or with parameters:
@on_artifact_update(workspace_id='abc', filters={'artifact_type': 'WEB_PAGES'})
"""
if func is None:
# Called with parameters
def decorator(f):
return _observer.register_update_handler(f, None, workspace_id, filters)
return decorator
# Called as simple decorator
return _observer.register_update_handler(func)
def on_artifact_delete(func=None, workspace_id=None, filters=None):
"""
Decorator for artifact deletion handlers
Can be used as a simple decorator (@on_artifact_delete) or with parameters:
@on_artifact_delete(workspace_id='abc', filters={'artifact_type': 'WEB_PAGES'})
"""
if func is None:
# Called with parameters
def decorator(f):
return _observer.register_delete_handler(f, None, workspace_id, filters)
return decorator
# Called as simple decorator
return _observer.register_delete_handler(func)