Spaces:
Sleeping
Sleeping

Rename AWorld-main/aworlddistributed/aworldspace/utils/loader.py to aworlddistributed/aworldspace/utils/loader.py
2d61dd2
verified
import importlib.util | |
import json | |
import logging | |
import os | |
import subprocess | |
import sys | |
import traceback | |
from aworldspace.base import AGENT_SPACE | |
import aworld.trace as trace # noqa | |
from config import AGENTS_DIR | |
if not os.path.exists(AGENTS_DIR): | |
os.makedirs(AGENTS_DIR) | |
PIPELINES = {} | |
PIPELINE_MODULES = {} | |
def get_all_pipelines(): | |
pipelines = {} | |
for pipeline_id in PIPELINE_MODULES.keys(): | |
pipeline = PIPELINE_MODULES[pipeline_id] | |
if hasattr(pipeline, "type"): | |
if pipeline.type == "manifold": | |
manifold_pipelines = [] | |
# Check if pipelines is a function or a list | |
if callable(pipeline.pipelines): | |
manifold_pipelines = pipeline.pipelines() | |
else: | |
manifold_pipelines = pipeline.pipelines | |
for p in manifold_pipelines: | |
manifold_pipeline_id = f'{pipeline_id}.{p["id"]}' | |
manifold_pipeline_name = p["name"] | |
if hasattr(pipeline, "name"): | |
manifold_pipeline_name = ( | |
f"{pipeline.name}{manifold_pipeline_name}" | |
) | |
pipelines[manifold_pipeline_id] = { | |
"module": pipeline_id, | |
"type": pipeline.type if hasattr(pipeline, "type") else "pipe", | |
"id": manifold_pipeline_id, | |
"name": manifold_pipeline_name, | |
"valves": ( | |
pipeline.valves if hasattr(pipeline, "valves") else None | |
), | |
} | |
if pipeline.type == "filter": | |
pipelines[pipeline_id] = { | |
"module": pipeline_id, | |
"type": (pipeline.type if hasattr(pipeline, "type") else "pipe"), | |
"id": pipeline_id, | |
"name": ( | |
pipeline.name if hasattr(pipeline, "name") else pipeline_id | |
), | |
"pipelines": ( | |
pipeline.valves.pipelines | |
if hasattr(pipeline, "valves") | |
and hasattr(pipeline.valves, "pipelines") | |
else [] | |
), | |
"priority": ( | |
pipeline.valves.priority | |
if hasattr(pipeline, "valves") | |
and hasattr(pipeline.valves, "priority") | |
else 0 | |
), | |
"valves": pipeline.valves if hasattr(pipeline, "valves") else None, | |
} | |
else: | |
pipelines[pipeline_id] = { | |
"module": pipeline_id, | |
"type": (pipeline.type if hasattr(pipeline, "type") else "pipe"), | |
"id": pipeline_id, | |
"name": (pipeline.name if hasattr(pipeline, "name") else pipeline_id), | |
"valves": pipeline.valves if hasattr(pipeline, "valves") else None, | |
} | |
return pipelines | |
def parse_frontmatter(content): | |
frontmatter = {} | |
for line in content.split("\n"): | |
if ":" in line: | |
key, value = line.split(":", 1) | |
frontmatter[key.strip().lower()] = value.strip() | |
return frontmatter | |
def install_frontmatter_requirements(requirements): | |
if requirements: | |
req_list = [req.strip() for req in requirements.split(",")] | |
for req in req_list: | |
print(f"Installing requirement: {req}") | |
subprocess.check_call([sys.executable, "-m", "pip", "install", req]) | |
else: | |
print("No requirements found in frontmatter.") | |
async def load_module_from_path(module_name, module_path): | |
try: | |
# Read the module content | |
with open(module_path, "r") as file: | |
content = file.read() | |
# Parse frontmatter | |
frontmatter = {} | |
if content.startswith('"""'): | |
end = content.find('"""', 3) | |
if end != -1: | |
frontmatter_content = content[3:end] | |
frontmatter = parse_frontmatter(frontmatter_content) | |
# Install requirements if specified | |
if "requirements" in frontmatter: | |
install_frontmatter_requirements(frontmatter["requirements"]) | |
# Load the module | |
spec = importlib.util.spec_from_file_location(module_name, module_path) | |
module = importlib.util.module_from_spec(spec) | |
spec.loader.exec_module(module) | |
logging.info(f"Loaded module start: {module.__name__}") | |
if hasattr(module, "Pipeline"): | |
return module.Pipeline() | |
else: | |
logging.info(f"Loaded module failed: {module.__name__ } No Pipeline class found") | |
raise Exception("No Pipeline class found") | |
except Exception as e: | |
logging.info(f"Error loading module: {module_name}, error is {e}") | |
traceback.print_exc() | |
# Move the file to the error folder | |
failed_pipelines_folder = os.path.join(AGENTS_DIR, "failed") | |
if not os.path.exists(failed_pipelines_folder): | |
os.makedirs(failed_pipelines_folder) | |
# failed_file_path = os.path.join(failed_pipelines_folder, f"{module_name}.py") | |
# if module_path.__contains__(PIPELINES_DIR): | |
# os.rename(module_path, failed_file_path) | |
print(e) | |
return None | |
async def load_modules_from_directory(directory): | |
logging.info(f"load_modules_from_directory: {directory}") | |
global PIPELINE_MODULES | |
for filename in os.listdir(directory): | |
if filename.endswith(".py"): | |
module_name = filename[:-3] # Remove the .py extension | |
module_path = os.path.join(directory, filename) | |
# Create subfolder matching the filename without the .py extension | |
subfolder_path = os.path.join(directory, module_name) | |
if not os.path.exists(subfolder_path): | |
os.makedirs(subfolder_path) | |
logging.info(f"Created subfolder: {subfolder_path}") | |
# Create a valves.json file if it doesn't exist | |
valves_json_path = os.path.join(subfolder_path, "valves.json") | |
if not os.path.exists(valves_json_path): | |
with open(valves_json_path, "w") as f: | |
json.dump({}, f) | |
logging.info(f"Created valves.json in: {subfolder_path}") | |
pipeline = await load_module_from_path(module_name, module_path) | |
if pipeline: | |
# Overwrite pipeline.valves with values from valves.json | |
if os.path.exists(valves_json_path): | |
with open(valves_json_path, "r") as f: | |
valves_json = json.load(f) | |
if hasattr(pipeline, "valves"): | |
ValvesModel = pipeline.valves.__class__ | |
# Create a ValvesModel instance using default values and overwrite with valves_json | |
combined_valves = { | |
**pipeline.valves.model_dump(), | |
**valves_json, | |
} | |
valves = ValvesModel(**combined_valves) | |
pipeline.valves = valves | |
logging.info(f"Updated valves for module: {module_name}") | |
pipeline_id = pipeline.id if hasattr(pipeline, "id") else module_name | |
PIPELINE_MODULES[pipeline_id] = pipeline | |
logging.info(f"Loaded module success: {module_name}") | |
else: | |
logging.warning(f"No Pipeline class found in {module_name}") | |
AGENT_SPACE.agent_modules = PIPELINE_MODULES | |
AGENT_SPACE.agents_meta = get_all_pipelines() | |