Spaces:
Sleeping
Sleeping
"""LangGraph Agent""" | |
import os | |
from dotenv import load_dotenv | |
from langgraph.graph import START, StateGraph, MessagesState | |
from langgraph.prebuilt import tools_condition | |
from langgraph.prebuilt import ToolNode | |
from langchain_google_genai import ChatGoogleGenerativeAI | |
from langchain_groq import ChatGroq | |
from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint, HuggingFaceEmbeddings | |
from langchain_community.tools.tavily_search import TavilySearchResults | |
from langchain_community.document_loaders import WikipediaLoader | |
from langchain_community.document_loaders import ArxivLoader | |
from langchain_core.messages import SystemMessage, HumanMessage | |
from langchain_core.tools import tool | |
from langchain.tools.retriever import create_retriever_tool | |
from langchain_community.vectorstores import Chroma | |
from langchain_core.documents import Document | |
import shutil | |
import pandas as pd # Ny import för pandas | |
import json # För att parsa metadata-kolumnen | |
load_dotenv() | |
def multiply(a: int, b: int) -> int: | |
"""Multiply two numbers. | |
Args: | |
a: first int | |
b: second int | |
""" | |
return a * b | |
def add(a: int, b: int) -> int: | |
"""Add two numbers. | |
Args: | |
a: first int | |
b: second int | |
""" | |
return a + b | |
def subtract(a: int, b: int) -> int: | |
"""Subtract two numbers. | |
Args: | |
a: first int | |
b: second int | |
""" | |
return a - b | |
def divide(a: int, b: int) -> int: | |
"""Divide two numbers. | |
Args: | |
a: first int | |
b: second int | |
""" | |
if b == 0: | |
raise ValueError("Cannot divide by zero.") | |
return a / b | |
def modulus(a: int, b: int) -> int: | |
"""Get the modulus of two numbers. | |
Args: | |
a: first int | |
b: second int | |
""" | |
return a % b | |
def wiki_search(query: str) -> str: | |
"""Search Wikipedia for a query and return maximum 2 results. | |
Args: | |
query: The search query.""" | |
search_docs = WikipediaLoader(query=query, load_max_docs=2).load() | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
for doc in search_docs | |
]) | |
return {"wiki_results": formatted_search_docs} | |
def web_search(query: str) -> str: | |
"""Search Tavily for a query and return maximum 3 results. | |
Args: | |
query: The search query.""" | |
search_docs = TavilySearchResults(max_results=3).invoke(query=query) | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
for doc in search_docs | |
]) | |
return {"web_results": formatted_search_docs} | |
def arvix_search(query: str) -> str: | |
"""Search Arxiv for a query and return maximum 3 result. | |
Args: | |
query: The search query.""" | |
search_docs = ArxivLoader(query=query, load_max_docs=3).load() | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>' | |
for doc in search_docs | |
]) | |
return {"arvix_results": formatted_search_docs} | |
# load the system prompt from the file | |
with open("system_prompt.txt", "r", encoding="utf-8") as f: | |
system_prompt = f.read() | |
# System message | |
sys_msg = SystemMessage(content=system_prompt) | |
# --- Start ChromaDB Setup --- | |
# Define the directory for ChromaDB persistence | |
CHROMA_DB_DIR = "./chroma_db" | |
CSV_FILE_PATH = "./supabase_docs.csv" # Path to your CSV file | |
# Build embeddings (this remains the same) | |
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2") # dim=768 | |
# Initialize ChromaDB | |
# If the directory exists and contains data, load the existing vector store. | |
# Otherwise, create a new one and add documents from the CSV file. | |
if os.path.exists(CHROMA_DB_DIR) and os.listdir(CHROMA_DB_DIR): | |
print(f"Loading existing ChromaDB from {CHROMA_DB_DIR}") | |
vector_store = Chroma( | |
persist_directory=CHROMA_DB_DIR, | |
embedding_function=embeddings | |
) | |
else: | |
print(f"Creating new ChromaDB at {CHROMA_DB_DIR} and loading documents from {CSV_FILE_PATH}.") | |
# Ensure the directory is clean before creating new | |
if os.path.exists(CHROMA_DB_DIR): | |
shutil.rmtree(CHROMA_DB_DIR) | |
os.makedirs(CHROMA_DB_DIR) | |
# Load data from the CSV file | |
if not os.path.exists(CSV_FILE_PATH): | |
raise FileNotFoundError(f"CSV file not found at {CSV_FILE_PATH}. Please ensure it's in the root directory.") | |
df = pd.read_csv(CSV_FILE_PATH) | |
documents = [] | |
for index, row in df.iterrows(): | |
content = row["content"] | |
# Extract the question part from the content | |
# Assuming the question is everything before "Final answer :" | |
question_part = content.split("Final answer :")[0].strip() | |
# Extract the final answer part from the content | |
final_answer_part = content.split("Final answer :")[-1].strip() if "Final answer :" in content else "" | |
# Parse the metadata string into a dictionary | |
# The metadata column might be stored as a string representation of a dictionary | |
try: | |
metadata = json.loads(row["metadata"].replace("'", "\"")) # Replace single quotes for valid JSON | |
except json.JSONDecodeError: | |
metadata = {} # Fallback if parsing fails | |
# Add the extracted final answer to the metadata for easy retrieval | |
metadata["final_answer"] = final_answer_part | |
# Create a Document object. The page_content should be the question for similarity search. | |
# The answer will be in metadata. | |
documents.append(Document(page_content=question_part, metadata=metadata)) | |
if not documents: | |
print("No documents loaded from CSV. ChromaDB will be empty.") | |
# Create an empty ChromaDB if no documents are found | |
vector_store = Chroma( | |
persist_directory=CHROMA_DB_DIR, | |
embedding_function=embeddings | |
) | |
else: | |
vector_store = Chroma.from_documents( | |
documents=documents, | |
embedding=embeddings, | |
persist_directory=CHROMA_DB_DIR | |
) | |
vector_store.persist() # Save the new vector store to disk | |
print(f"ChromaDB initialized and persisted with {len(documents)} documents from CSV.") | |
# Create retriever tool using the Chroma vector store | |
retriever_tool = create_retriever_tool( | |
retriever=vector_store.as_retriever(), | |
name="Question_Search", | |
description="A tool to retrieve similar questions from a vector store. The retrieved document's metadata contains the 'final_answer' to the question.", | |
) | |
# Add the new retriever tool to your list of tools | |
tools = [ | |
multiply, | |
add, | |
subtract, | |
divide, | |
modulus, | |
wiki_search, | |
web_search, | |
arvix_search, | |
retriever_tool, | |
] | |
# Build graph function | |
def build_graph(provider: str = "google"): | |
"""Build the graph""" | |
if provider == "google": | |
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0) | |
elif provider == "groq": | |
llm = ChatGroq(model="qwen-qwq-32b", temperature=0) | |
elif provider == "huggingface": | |
llm = ChatHuggingFace( | |
llm=HuggingFaceEndpoint( | |
url="https://api-inference.huggingface.co/models/Meta-DeepLearning/llama-2-7b-chat-hf", | |
temperature=0, | |
), | |
) | |
else: | |
raise ValueError("Invalid provider. Choose 'google', 'groq' or 'huggingface'.") | |
llm_with_tools = llm.bind_tools(tools) | |
def assistant(state: MessagesState): | |
"""Assistant node""" | |
return {"messages": [llm_with_tools.invoke(state["messages"])]} | |
from langchain_core.messages import AIMessage | |
def retriever(state: MessagesState): | |
query = state["messages"][-1].content | |
# Use the vector_store directly for similarity search to get the full Document object | |
similar_docs = vector_store.similarity_search(query, k=1) | |
if similar_docs: | |
similar_doc = similar_docs[0] | |
# Prioritize 'final_answer' from metadata, then check page_content | |
if "final_answer" in similar_doc.metadata and similar_doc.metadata["final_answer"]: | |
answer = similar_doc.metadata["final_answer"] | |
elif "Final answer :" in similar_doc.page_content: | |
answer = similar_doc.page_content.split("Final answer :")[-1].strip() | |
else: | |
answer = similar_doc.page_content.strip() # Fallback to page_content if no explicit answer | |
# The system prompt expects "FINAL ANSWER: [ANSWER]". | |
# We should return the extracted answer directly, as the prompt handles the formatting. | |
return {"messages": [AIMessage(content=answer)]} | |
else: | |
return {"messages": [AIMessage(content="No similar questions found in the knowledge base.")]} | |
builder = StateGraph(MessagesState) | |
builder.add_node("retriever", retriever) | |
builder.set_entry_point("retriever") | |
builder.set_finish_point("retriever") | |
return builder.compile() | |