Spaces:
Sleeping
Sleeping
File size: 8,060 Bytes
3e8c06e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
import logging
import traceback
from typing import Callable, List, Dict, Any, Optional, Union
import inspect
from aworld.output.artifact import Artifact
class WorkspaceObserver:
"""Base class for workspace observers"""
async def on_create(self, workspace_id: str, artifact: Artifact) -> None:
pass
async def on_update(self, workspace_id: str, artifact: Artifact) -> None:
pass
async def on_delete(self, workspace_id: str, artifact: Artifact) -> None:
pass
class Handler:
"""Handler wrapper to support both functions and class methods"""
def __init__(self, func: Callable, instance=None, workspace_id: Optional[str] = None, filters: Optional[Dict[str, Any]] = None):
self.func = func
self.instance = instance # Class instance if method
self.workspace_id = workspace_id # Specific workspace to monitor
self.filters = filters or {} # Additional filters (e.g., artifact_type)
async def __call__(self, artifact: Artifact, **kwargs) -> Any:
"""Call the handler with appropriate arguments"""
# Check if this handler should process the artifact
if self.workspace_id and kwargs.get('workspace_id') != self.workspace_id:
return None
# Check additional filters
for key, value in self.filters.items():
if key == 'artifact_type':
if artifact.artifact_type != value and artifact.artifact_type.value != value:
return None
elif key in artifact.metadata and artifact.metadata[key] != value:
return None
# Get function signature to determine what arguments it expects
sig = inspect.signature(self.func)
param_count = len(sig.parameters)
# Call based on whether it's a method or function, and parameter count
if self.instance:
if param_count == 0: # Just self
return await self.func() if inspect.iscoroutinefunction(self.func) else self.func()
elif param_count == 1: # Self + artifact
return await self.func(artifact) if inspect.iscoroutinefunction(self.func) else self.func(artifact)
else: # Self + artifact + kwargs
return await self.func(artifact, **kwargs) if inspect.iscoroutinefunction(self.func) else self.func(artifact, **kwargs)
else:
if param_count == 0: # No parameters
return await self.func() if inspect.iscoroutinefunction(self.func) else self.func()
elif param_count == 1: # Just artifact
return await self.func(artifact) if inspect.iscoroutinefunction(self.func) else self.func(artifact)
else: # Artifact + kwargs
return await self.func(artifact, **kwargs) if inspect.iscoroutinefunction(self.func) else self.func(artifact, **kwargs)
class DecoratorBasedObserver(WorkspaceObserver):
"""Enhanced decorator-based observer implementation"""
def __init__(self):
self.create_handlers: List[Handler] = []
self.update_handlers: List[Handler] = []
self.delete_handlers: List[Handler] = []
async def on_create(self, workspace_id: str, artifact: Artifact, **kwargs) -> List[Any]:
"""Process artifact creation with all handlers"""
results = []
for handler in self.create_handlers:
try:
result = await handler(workspace_id=workspace_id, artifact=artifact, **kwargs)
if result is not None:
results.append(result)
except Exception as e:
print(f"Create handler failed: error is {e}: {traceback.format_exc()}")
return results
async def on_update(self, artifact: Artifact, **kwargs) -> List[Any]:
"""Process artifact update with all handlers"""
results = []
for handler in self.update_handlers:
try:
result = await handler(artifact, **kwargs)
if result is not None:
results.append(result)
except Exception as e:
print(f"Update handler failed: {e}")
return results
async def on_delete(self, artifact: Artifact, **kwargs) -> List[Any]:
"""Process artifact deletion with all handlers"""
results = []
for handler in self.delete_handlers:
try:
result = await handler(artifact, **kwargs)
if result is not None:
results.append(result)
except Exception as e:
print(f"Delete handler failed: {e}")
return results
def register_create_handler(self, func, instance=None, workspace_id=None, filters=None):
"""Register a handler for artifact creation"""
logging.info(f"[📂WORKSPACE]✨ Registering create handler for {func}")
self.create_handlers.append(Handler(func, instance, workspace_id, filters))
return func
def un_register_create_handler(self, func, instance=None, workspace_id=None):
"""Register a handler for artifact creation"""
logging.info(f"[📂WORKSPACE] UnRegister create handler for {func}")
for handler in self.create_handlers:
if handler.func == func:
self.create_handlers.remove(handler)
logging.info(f"[📂WORKSPACE] UnRegister create handler for {func} success")
def register_update_handler(self, func, instance=None, workspace_id=None, filters=None):
"""Register a handler for artifact update"""
logging.info(f"[📂WORKSPACE]✨ Registering update handler for {func}")
self.update_handlers.append(Handler(func, instance, workspace_id, filters))
return func
def register_delete_handler(self, func, instance=None, workspace_id=None, filters=None):
"""Register a handler for artifact deletion"""
logging.info(f"[📂WORKSPACE]✨ Registering delete handler for {func}")
self.delete_handlers.append(Handler(func, instance, workspace_id, filters))
return func
# Global observer instance
_observer = DecoratorBasedObserver()
def get_observer() -> DecoratorBasedObserver:
"""Get the global observer instance"""
return _observer
def on_artifact_create(func=None, workspace_id=None, filters=None):
"""
Decorator for artifact creation handlers
Can be used as a simple decorator (@on_artifact_create) or with parameters:
@on_artifact_create(workspace_id='abc', filters={'artifact_type': 'WEB_PAGES'})
"""
if func is None:
# Called with parameters
def decorator(f):
return _observer.register_create_handler(f, None, workspace_id, filters)
return decorator
# Called as simple decorator
return _observer.register_create_handler(func)
def on_artifact_update(func=None, workspace_id=None, filters=None):
"""
Decorator for artifact update handlers
Can be used as a simple decorator (@on_artifact_update) or with parameters:
@on_artifact_update(workspace_id='abc', filters={'artifact_type': 'WEB_PAGES'})
"""
if func is None:
# Called with parameters
def decorator(f):
return _observer.register_update_handler(f, None, workspace_id, filters)
return decorator
# Called as simple decorator
return _observer.register_update_handler(func)
def on_artifact_delete(func=None, workspace_id=None, filters=None):
"""
Decorator for artifact deletion handlers
Can be used as a simple decorator (@on_artifact_delete) or with parameters:
@on_artifact_delete(workspace_id='abc', filters={'artifact_type': 'WEB_PAGES'})
"""
if func is None:
# Called with parameters
def decorator(f):
return _observer.register_delete_handler(f, None, workspace_id, filters)
return decorator
# Called as simple decorator
return _observer.register_delete_handler(func)
|