File size: 4,098 Bytes
297ed13
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import os
import importlib
import subprocess
import sys
import traceback
import logging
from typing import List, Dict
from .. import AgentModel

logger = logging.getLogger(__name__)

_agent_cache: Dict[str, AgentModel] = {}


def list_agents() -> List[AgentModel]:
    """
    List all cached agents

    Returns:
        List[AgentModel]: The list of agent models
    """
    if len(_agent_cache) == 0:
        for m in _list_agents():
            _agent_cache[m.id] = m
    return _agent_cache


def get_agent(agent_id) -> AgentModel:
    """
    Get the agent model by agent name

    Args:
        agent_id: The name of the agent

    Returns:
        AgentModel: The agent model
    """
    if len(_agent_cache) == 0:
        list_agents()
    if agent_id not in _agent_cache:
        raise Exception(f"Agent {agent_id} not found")
    return _agent_cache[agent_id]


def _list_agents() -> List[AgentModel]:
    agents_dir = os.path.join(os.getcwd(), "agent_deploy")

    if not os.path.exists(agents_dir):
        logger.warning(f"Agents directory {agents_dir} does not exist")
        return []

    if agents_dir not in sys.path:
        sys.path.append(agents_dir)

    agents = []
    for agent_id in os.listdir(agents_dir):
        try:
            agent_path = os.path.join(agents_dir, agent_id)
            if os.path.isdir(agent_path):
                requirements_file = os.path.join(agent_path, "requirements.txt")
                if os.path.exists(requirements_file):
                    p = subprocess.Popen(
                        ["pip", "install", "-r", requirements_file],
                        cwd=agent_path,
                    )
                    p.wait()
                    if p.returncode != 0:
                        logger.error(
                            f"Error installing requirements for agent {agent_id}, path {agent_path}"
                        )
                        continue

                agent_file = os.path.join(agent_path, "agent.py")
                if os.path.exists(agent_file):
                    try:
                        instance = _get_agent_instance(agent_id)
                        if hasattr(instance, "name"):
                            name = instance.name()
                        else:
                            name = agent_id
                        if hasattr(instance, "description"):
                            description = instance.description()
                        else:
                            description = ""
                        agent_model = AgentModel(
                            id=agent_id,
                            name=name,
                            description=description,
                            path=agent_path,
                            instance=instance,
                        )

                        agents.append(agent_model)
                        logger.info(
                            f"Loaded agent {agent_id} successfully, path {agent_path}"
                        )
                    except Exception as e:
                        logger.error(
                            f"Error loading agent {agent_id}: {traceback.format_exc()}"
                        )
                        continue
                else:
                    logger.warning(f"Agent {agent_id} does not have agent.py file")
        except Exception as e:
            logger.error(
                f"Error loading agent {agent_id}, path {agent_path} : {traceback.format_exc()}"
            )
            continue

    return agents


def _get_agent_instance(agent_name):
    try:
        agent_module = importlib.import_module(
            name=f"{agent_name}.agent",
        )
    except Exception as e:
        msg = f"Error loading agent {agent_name}, cwd:{os.getcwd()}, sys.path:{sys.path}: {traceback.format_exc()}"
        logger.error(msg)
        raise Exception(msg)

    if hasattr(agent_module, "AWorldAgent"):
        agent = agent_module.AWorldAgent()
        return agent
    else:
        raise Exception(f"Agent {agent_name} does not have AWorldAgent class")