Spaces:
Sleeping
Sleeping
import asyncio | |
import os | |
from dotenv import load_dotenv | |
from aworld.logs.util import logger | |
from aworld.output import WorkSpace, ArtifactType | |
from aworld.output.observer import on_artifact_create, get_observer | |
async def handle_artifact_create(artifact): | |
logger.info(f"Artifact created: {artifact.artifact_id}") | |
async def handle_specific_artifacts(artifact, **kwargs): | |
logger.info(f"text artifact created in specific workspace {kwargs['workspace_id']}: {artifact.artifact_id}") | |
class DemoClass: | |
def __init__(self): | |
observer = get_observer() | |
observer.register_create_handler( | |
self.artifact_create, | |
instance=self, | |
workspace_id="demo" | |
) | |
async def artifact_create(self, artifact, **kwargs): | |
logger.info(f"DemoClass : text artifact created in specific workspace {kwargs['workspace_id']}: {artifact.artifact_id}") | |
async def run(): | |
load_dotenv() | |
DemoClass() | |
workspace = WorkSpace.from_local_storages(workspace_id="demo") | |
await workspace.create_artifact(ArtifactType.TEXT, "artifact_001", content="123") | |
await workspace.create_artifact(ArtifactType.TEXT, "artifact_001", content="456") | |
await workspace.update_artifact("artifact_001", content="7890") | |
await workspace.mark_as_completed("artifact_001") | |
# await workspace.delete_artifact("artifact_001") | |
if __name__ == '__main__': | |
asyncio.run(run()) | |