Spaces:
Paused
Paused
from langgraph.graph import StateGraph, END | |
from agents.agent_state import AgentState | |
class AgentSystem: | |
def __init__(self, nodes, members): | |
self.workflow = self.setup_nodes(nodes) | |
self.setup_graph(members) | |
self.members = members | |
def get_workflow(self, ): | |
return workflow | |
def compile(self): | |
return self.workflow.compile() | |
def setup_nodes(self, nodes): | |
workflow = StateGraph(AgentState) | |
for node in nodes: | |
workflow.add_node(node["name"], node["instance"]) | |
return workflow | |
def check_step(self, x): | |
o = x["next"] | |
#print(f"Check for {o}") | |
#print(self.members) | |
for member in self.members: | |
if member in o: | |
#print(f"Found member: {member} in target_string") | |
return member | |
#else: | |
#print(f"Member: {member} not found in target_string") | |
#print("nothing found, Finish") | |
return "FINISH" | |
def setup_graph(self, members): | |
for member in members: | |
# We want our workers to ALWAYS "report back" to the supervisor when done | |
print(f"Add return path to supervisor for {member}") | |
self.workflow.add_edge(member, "supervisor") | |
# The supervisor populates the "next" field in the graph state | |
# which routes to a node or finishes | |
conditional_map = {k: k for k in members} | |
conditional_map["FINISH"] = END | |
self.workflow.add_conditional_edges("supervisor", lambda x : self.check_step( x ), conditional_map) | |
# Finally, add entrypoint | |
self.workflow.set_entry_point("supervisor") | |