File size: 8,060 Bytes
3e8c06e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import logging
import traceback
from typing import Callable, List, Dict, Any, Optional, Union
import inspect

from aworld.output.artifact import Artifact


class WorkspaceObserver:
    """Base class for workspace observers"""

    async def on_create(self, workspace_id: str, artifact: Artifact) -> None:
        pass

    async def on_update(self, workspace_id: str, artifact: Artifact) -> None:
        pass

    async def on_delete(self, workspace_id: str, artifact: Artifact) -> None:
        pass

class Handler:
    """Handler wrapper to support both functions and class methods"""
    def __init__(self, func: Callable, instance=None, workspace_id: Optional[str] = None, filters: Optional[Dict[str, Any]] = None):
        self.func = func
        self.instance = instance  # Class instance if method
        self.workspace_id = workspace_id  # Specific workspace to monitor
        self.filters = filters or {}  # Additional filters (e.g., artifact_type)
    
    async def __call__(self, artifact: Artifact, **kwargs) -> Any:
        """Call the handler with appropriate arguments"""
        # Check if this handler should process the artifact
        if self.workspace_id and kwargs.get('workspace_id') != self.workspace_id:
            return None
            
        # Check additional filters
        for key, value in self.filters.items():
            if key == 'artifact_type':
                if artifact.artifact_type != value and artifact.artifact_type.value != value:
                    return None
            elif key in artifact.metadata and artifact.metadata[key] != value:
                return None

        # Get function signature to determine what arguments it expects
        sig = inspect.signature(self.func)
        param_count = len(sig.parameters)
        
        # Call based on whether it's a method or function, and parameter count
        if self.instance:
            if param_count == 0:  # Just self
                return await self.func() if inspect.iscoroutinefunction(self.func) else self.func()
            elif param_count == 1:  # Self + artifact
                return await self.func(artifact) if inspect.iscoroutinefunction(self.func) else self.func(artifact)
            else:  # Self + artifact + kwargs
                return await self.func(artifact, **kwargs) if inspect.iscoroutinefunction(self.func) else self.func(artifact, **kwargs)
        else:
            if param_count == 0:  # No parameters
                return await self.func() if inspect.iscoroutinefunction(self.func) else self.func()
            elif param_count == 1:  # Just artifact
                return await self.func(artifact) if inspect.iscoroutinefunction(self.func) else self.func(artifact)
            else:  # Artifact + kwargs
                return await self.func(artifact, **kwargs) if inspect.iscoroutinefunction(self.func) else self.func(artifact, **kwargs)


class DecoratorBasedObserver(WorkspaceObserver):
    """Enhanced decorator-based observer implementation"""
    def __init__(self):
        self.create_handlers: List[Handler] = []
        self.update_handlers: List[Handler] = []
        self.delete_handlers: List[Handler] = []
    
    async def on_create(self, workspace_id: str, artifact: Artifact, **kwargs) -> List[Any]:
        """Process artifact creation with all handlers"""
        results = []
        for handler in self.create_handlers:
            try:
                result = await handler(workspace_id=workspace_id, artifact=artifact, **kwargs)
                if result is not None:
                    results.append(result)
            except Exception as e:
                print(f"Create handler failed:  error is {e}: {traceback.format_exc()}")
        return results
    
    async def on_update(self, artifact: Artifact, **kwargs) -> List[Any]:
        """Process artifact update with all handlers"""
        results = []
        for handler in self.update_handlers:
            try:
                result = await handler(artifact, **kwargs)
                if result is not None:
                    results.append(result)
            except Exception as e:
                print(f"Update handler failed: {e}")
        return results
    
    async def on_delete(self, artifact: Artifact, **kwargs) -> List[Any]:
        """Process artifact deletion with all handlers"""
        results = []
        for handler in self.delete_handlers:
            try:
                result = await handler(artifact, **kwargs)
                if result is not None:
                    results.append(result)
            except Exception as e:
                print(f"Delete handler failed: {e}")
        return results
    
    def register_create_handler(self, func, instance=None, workspace_id=None, filters=None):
        """Register a handler for artifact creation"""
        logging.info(f"[📂WORKSPACE]✨ Registering create handler for {func}")
        self.create_handlers.append(Handler(func, instance, workspace_id, filters))
        return func

    def un_register_create_handler(self, func, instance=None, workspace_id=None):
        """Register a handler for artifact creation"""
        logging.info(f"[📂WORKSPACE] UnRegister create handler for {func}")
        for handler in self.create_handlers:
            if handler.func == func:
                self.create_handlers.remove(handler)
                logging.info(f"[📂WORKSPACE] UnRegister create handler for {func} success")

    def register_update_handler(self, func, instance=None, workspace_id=None, filters=None):
        """Register a handler for artifact update"""
        logging.info(f"[📂WORKSPACE]✨ Registering update handler for {func}")
        self.update_handlers.append(Handler(func, instance, workspace_id, filters))
        return func
        
    def register_delete_handler(self, func, instance=None, workspace_id=None, filters=None):
        """Register a handler for artifact deletion"""
        logging.info(f"[📂WORKSPACE]✨ Registering delete handler for {func}")
        self.delete_handlers.append(Handler(func, instance, workspace_id, filters))
        return func

# Global observer instance
_observer = DecoratorBasedObserver()

def get_observer() -> DecoratorBasedObserver:
    """Get the global observer instance"""
    return _observer

def on_artifact_create(func=None, workspace_id=None, filters=None):
    """
    Decorator for artifact creation handlers
    
    Can be used as a simple decorator (@on_artifact_create) or with parameters:
    @on_artifact_create(workspace_id='abc', filters={'artifact_type': 'WEB_PAGES'})
    """
    if func is None:
        # Called with parameters
        def decorator(f):
            return _observer.register_create_handler(f, None, workspace_id, filters)
        return decorator
    
    # Called as simple decorator
    return _observer.register_create_handler(func)

def on_artifact_update(func=None, workspace_id=None, filters=None):
    """
    Decorator for artifact update handlers
    
    Can be used as a simple decorator (@on_artifact_update) or with parameters:
    @on_artifact_update(workspace_id='abc', filters={'artifact_type': 'WEB_PAGES'})
    """
    if func is None:
        # Called with parameters
        def decorator(f):
            return _observer.register_update_handler(f, None, workspace_id, filters)
        return decorator
    
    # Called as simple decorator
    return _observer.register_update_handler(func)

def on_artifact_delete(func=None, workspace_id=None, filters=None):
    """
    Decorator for artifact deletion handlers
    
    Can be used as a simple decorator (@on_artifact_delete) or with parameters:
    @on_artifact_delete(workspace_id='abc', filters={'artifact_type': 'WEB_PAGES'})
    """
    if func is None:
        # Called with parameters
        def decorator(f):
            return _observer.register_delete_handler(f, None, workspace_id, filters)
        return decorator
    
    # Called as simple decorator
    return _observer.register_delete_handler(func)