devmodetest2 / perm /agent_system.py
tengel's picture
Upload 56 files
9c9a39f verified
from langgraph.graph import StateGraph, END
from agents.agent_state import AgentState
class AgentSystem:
def __init__(self, nodes, members):
self.workflow = self.setup_nodes(nodes)
self.setup_graph(members)
self.members = members
def get_workflow(self, ):
return workflow
def compile(self):
return self.workflow.compile()
def setup_nodes(self, nodes):
workflow = StateGraph(AgentState)
for node in nodes:
workflow.add_node(node["name"], node["instance"])
return workflow
def check_step(self, x):
o = x["next"]
#print(f"Check for {o}")
#print(self.members)
for member in self.members:
if member in o:
#print(f"Found member: {member} in target_string")
return member
#else:
#print(f"Member: {member} not found in target_string")
#print("nothing found, Finish")
return "FINISH"
def setup_graph(self, members):
for member in members:
# We want our workers to ALWAYS "report back" to the supervisor when done
print(f"Add return path to supervisor for {member}")
self.workflow.add_edge(member, "supervisor")
# The supervisor populates the "next" field in the graph state
# which routes to a node or finishes
conditional_map = {k: k for k in members}
conditional_map["FINISH"] = END
self.workflow.add_conditional_edges("supervisor", lambda x : self.check_step( x ), conditional_map)
# Finally, add entrypoint
self.workflow.set_entry_point("supervisor")