Yongkang ZOU
add retriever
3fba19d
raw
history blame
5.88 kB
import os
from dotenv import load_dotenv
from langgraph.graph import START, StateGraph, MessagesState
from langgraph.prebuilt import tools_condition, ToolNode
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_groq import ChatGroq
from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_community.document_loaders import WikipediaLoader, ArxivLoader
from langchain_core.messages import SystemMessage, HumanMessage
from langchain_core.tools import tool
from langchain_groq import ChatGroq
load_dotenv()
# ------------------- TOOL DEFINITIONS -------------------
@tool
def multiply(a: int, b: int) -> int:
"""Multiply two numbers."""
return a * b
@tool
def add(a: int, b: int) -> int:
"""Add two numbers."""
return a + b
@tool
def subtract(a: int, b: int) -> int:
"""Subtract two numbers."""
return a - b
@tool
def divide(a: int, b: int) -> float:
"""Divide two numbers."""
if b == 0:
raise ValueError("Cannot divide by zero.")
return a / b
@tool
def modulus(a: int, b: int) -> int:
"""Get the modulus of two numbers."""
return a % b
@tool
def wiki_search(query: str) -> str:
"""Search Wikipedia for a query (max 2 results)."""
docs = WikipediaLoader(query=query, load_max_docs=2).load()
return "\n\n".join([doc.page_content for doc in docs])
@tool
def web_search(query: str) -> str:
"""Search the web using Tavily (max 3 results)."""
results = TavilySearchResults(max_results=3).invoke(query)
texts = []
for doc in results:
if isinstance(doc, dict):
texts.append(doc.get("content", "") or doc.get("text", ""))
return "\n\n".join(texts)
@tool
def arvix_search(query: str) -> str:
"""Search Arxiv for academic papers (max 3)."""
docs = ArxivLoader(query=query, load_max_docs=3).load()
return "\n\n".join([doc.page_content[:1000] for doc in docs])
tools = [multiply, add, subtract, divide, modulus, wiki_search, web_search, arvix_search]
# ------------------- SYSTEM PROMPT -------------------
system_prompt_path = "system_prompt.txt"
if os.path.exists(system_prompt_path):
with open(system_prompt_path, "r", encoding="utf-8") as f:
system_prompt = f.read()
else:
system_prompt = (
"You are an intelligent AI agent who can solve math, science, factual, and research-based problems. "
"You can use tools like Wikipedia, Web search, or Arxiv when needed. Always give precise and helpful answers."
)
sys_msg = SystemMessage(content=system_prompt)
# ------------------- GRAPH CONSTRUCTION -------------------
from langchain_openai import ChatOpenAI # ✅ 新增导入
def build_graph(provider: str = "groq"):
"""Build LangGraph agent with QA retriever and tool-use fallback."""
# 初始化 LLM
if provider == "google":
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0)
elif provider == "groq":
groq_key = os.getenv("GROQ_API_KEY")
if not groq_key:
raise ValueError("GROQ_API_KEY is not set.")
llm = ChatGroq(model="qwen-qwq-32b", temperature=0, api_key=groq_key)
elif provider == "huggingface":
llm = ChatHuggingFace(
llm=HuggingFaceEndpoint(
url="https://api-inference.huggingface.co/models/Meta-DeepLearning/llama-2-7b-chat-hf",
temperature=0
)
)
elif provider == "openai":
openai_key = os.getenv("OPENAI_API_KEY")
if not openai_key:
raise ValueError("OPENAI_API_KEY is not set.")
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, api_key=openai_key)
else:
raise ValueError("Invalid provider")
# 工具绑定
llm_with_tools = llm.bind_tools(tools)
def assistant(state: MessagesState):
return {"messages": [sys_msg] + [llm_with_tools.invoke(state["messages"])]}
# ✅ 初始化 Supabase Retriever
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_KEY")
supabase = create_client(SUPABASE_URL, SUPABASE_KEY)
embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2")
vectorstore = SupabaseVectorStore(
client=supabase,
embedding=embedding_model,
table_name="QA_db"
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 1})
# ✅ Retriever 节点
def qa_retriever_node(state: MessagesState):
user_question = state["messages"][-1].content
docs = retriever.invoke(user_question)
if docs:
return {
"messages": state["messages"] + [AIMessage(content=docs[0].page_content)],
"__condition__": "complete"
}
return {
"messages": state["messages"],
"__condition__": "default"
}
# 构建图结构
builder = StateGraph(MessagesState)
builder.add_node("retriever", qa_retriever_node)
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "retriever")
builder.add_conditional_edges("retriever", {
"default": "assistant",
"complete": None
})
builder.add_conditional_edges("assistant", tools_condition)
builder.add_edge("tools", "assistant")
return builder.compile()
# ------------------- LOCAL TEST -------------------
if __name__ == "__main__":
question = "When was a picture of St. Thomas Aquinas first added to the Wikipedia page on the Principle of double effect?"
graph = build_graph(provider="openai")
messages = graph.invoke({"messages": [HumanMessage(content=question)]})
print("=== AI Agent Response ===")
for m in messages["messages"]:
m.pretty_print()