Spaces:
Paused
Paused
File size: 1,800 Bytes
9c9a39f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
from langgraph.graph import StateGraph, END
from agents.agent_state import AgentState
class AgentSystem:
def __init__(self, nodes, members):
self.workflow = self.setup_nodes(nodes)
self.setup_graph(members)
self.members = members
def get_workflow(self, ):
return workflow
def compile(self):
return self.workflow.compile()
def setup_nodes(self, nodes):
workflow = StateGraph(AgentState)
for node in nodes:
workflow.add_node(node["name"], node["instance"])
return workflow
def check_step(self, x):
o = x["next"]
#print(f"Check for {o}")
#print(self.members)
for member in self.members:
if member in o:
#print(f"Found member: {member} in target_string")
return member
#else:
#print(f"Member: {member} not found in target_string")
#print("nothing found, Finish")
return "FINISH"
def setup_graph(self, members):
for member in members:
# We want our workers to ALWAYS "report back" to the supervisor when done
print(f"Add return path to supervisor for {member}")
self.workflow.add_edge(member, "supervisor")
# The supervisor populates the "next" field in the graph state
# which routes to a node or finishes
conditional_map = {k: k for k in members}
conditional_map["FINISH"] = END
self.workflow.add_conditional_edges("supervisor", lambda x : self.check_step( x ), conditional_map)
# Finally, add entrypoint
self.workflow.set_entry_point("supervisor")
|