Duibonduil's picture
Rename AWorld-main/aworlddistributed/aworldspace/utils/loader.py to aworlddistributed/aworldspace/utils/loader.py
2d61dd2 verified
import importlib.util
import json
import logging
import os
import subprocess
import sys
import traceback
from aworldspace.base import AGENT_SPACE
import aworld.trace as trace # noqa
from config import AGENTS_DIR
if not os.path.exists(AGENTS_DIR):
os.makedirs(AGENTS_DIR)
PIPELINES = {}
PIPELINE_MODULES = {}
def get_all_pipelines():
pipelines = {}
for pipeline_id in PIPELINE_MODULES.keys():
pipeline = PIPELINE_MODULES[pipeline_id]
if hasattr(pipeline, "type"):
if pipeline.type == "manifold":
manifold_pipelines = []
# Check if pipelines is a function or a list
if callable(pipeline.pipelines):
manifold_pipelines = pipeline.pipelines()
else:
manifold_pipelines = pipeline.pipelines
for p in manifold_pipelines:
manifold_pipeline_id = f'{pipeline_id}.{p["id"]}'
manifold_pipeline_name = p["name"]
if hasattr(pipeline, "name"):
manifold_pipeline_name = (
f"{pipeline.name}{manifold_pipeline_name}"
)
pipelines[manifold_pipeline_id] = {
"module": pipeline_id,
"type": pipeline.type if hasattr(pipeline, "type") else "pipe",
"id": manifold_pipeline_id,
"name": manifold_pipeline_name,
"valves": (
pipeline.valves if hasattr(pipeline, "valves") else None
),
}
if pipeline.type == "filter":
pipelines[pipeline_id] = {
"module": pipeline_id,
"type": (pipeline.type if hasattr(pipeline, "type") else "pipe"),
"id": pipeline_id,
"name": (
pipeline.name if hasattr(pipeline, "name") else pipeline_id
),
"pipelines": (
pipeline.valves.pipelines
if hasattr(pipeline, "valves")
and hasattr(pipeline.valves, "pipelines")
else []
),
"priority": (
pipeline.valves.priority
if hasattr(pipeline, "valves")
and hasattr(pipeline.valves, "priority")
else 0
),
"valves": pipeline.valves if hasattr(pipeline, "valves") else None,
}
else:
pipelines[pipeline_id] = {
"module": pipeline_id,
"type": (pipeline.type if hasattr(pipeline, "type") else "pipe"),
"id": pipeline_id,
"name": (pipeline.name if hasattr(pipeline, "name") else pipeline_id),
"valves": pipeline.valves if hasattr(pipeline, "valves") else None,
}
return pipelines
def parse_frontmatter(content):
frontmatter = {}
for line in content.split("\n"):
if ":" in line:
key, value = line.split(":", 1)
frontmatter[key.strip().lower()] = value.strip()
return frontmatter
def install_frontmatter_requirements(requirements):
if requirements:
req_list = [req.strip() for req in requirements.split(",")]
for req in req_list:
print(f"Installing requirement: {req}")
subprocess.check_call([sys.executable, "-m", "pip", "install", req])
else:
print("No requirements found in frontmatter.")
async def load_module_from_path(module_name, module_path):
try:
# Read the module content
with open(module_path, "r") as file:
content = file.read()
# Parse frontmatter
frontmatter = {}
if content.startswith('"""'):
end = content.find('"""', 3)
if end != -1:
frontmatter_content = content[3:end]
frontmatter = parse_frontmatter(frontmatter_content)
# Install requirements if specified
if "requirements" in frontmatter:
install_frontmatter_requirements(frontmatter["requirements"])
# Load the module
spec = importlib.util.spec_from_file_location(module_name, module_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
logging.info(f"Loaded module start: {module.__name__}")
if hasattr(module, "Pipeline"):
return module.Pipeline()
else:
logging.info(f"Loaded module failed: {module.__name__ } No Pipeline class found")
raise Exception("No Pipeline class found")
except Exception as e:
logging.info(f"Error loading module: {module_name}, error is {e}")
traceback.print_exc()
# Move the file to the error folder
failed_pipelines_folder = os.path.join(AGENTS_DIR, "failed")
if not os.path.exists(failed_pipelines_folder):
os.makedirs(failed_pipelines_folder)
# failed_file_path = os.path.join(failed_pipelines_folder, f"{module_name}.py")
# if module_path.__contains__(PIPELINES_DIR):
# os.rename(module_path, failed_file_path)
print(e)
return None
async def load_modules_from_directory(directory):
logging.info(f"load_modules_from_directory: {directory}")
global PIPELINE_MODULES
for filename in os.listdir(directory):
if filename.endswith(".py"):
module_name = filename[:-3] # Remove the .py extension
module_path = os.path.join(directory, filename)
# Create subfolder matching the filename without the .py extension
subfolder_path = os.path.join(directory, module_name)
if not os.path.exists(subfolder_path):
os.makedirs(subfolder_path)
logging.info(f"Created subfolder: {subfolder_path}")
# Create a valves.json file if it doesn't exist
valves_json_path = os.path.join(subfolder_path, "valves.json")
if not os.path.exists(valves_json_path):
with open(valves_json_path, "w") as f:
json.dump({}, f)
logging.info(f"Created valves.json in: {subfolder_path}")
pipeline = await load_module_from_path(module_name, module_path)
if pipeline:
# Overwrite pipeline.valves with values from valves.json
if os.path.exists(valves_json_path):
with open(valves_json_path, "r") as f:
valves_json = json.load(f)
if hasattr(pipeline, "valves"):
ValvesModel = pipeline.valves.__class__
# Create a ValvesModel instance using default values and overwrite with valves_json
combined_valves = {
**pipeline.valves.model_dump(),
**valves_json,
}
valves = ValvesModel(**combined_valves)
pipeline.valves = valves
logging.info(f"Updated valves for module: {module_name}")
pipeline_id = pipeline.id if hasattr(pipeline, "id") else module_name
PIPELINE_MODULES[pipeline_id] = pipeline
logging.info(f"Loaded module success: {module_name}")
else:
logging.warning(f"No Pipeline class found in {module_name}")
AGENT_SPACE.agent_modules = PIPELINE_MODULES
AGENT_SPACE.agents_meta = get_all_pipelines()