Spaces:
Sleeping
Sleeping
"""LangGraph Agent - Complete bypass of problematic vector store""" | |
import os | |
import json | |
from dotenv import load_dotenv | |
from langgraph.graph import START, StateGraph, MessagesState | |
from langgraph.prebuilt import tools_condition | |
from langgraph.prebuilt import ToolNode | |
from langchain_groq import ChatGroq | |
from langchain_community.tools.tavily_search import TavilySearchResults | |
from langchain_community.document_loaders import WikipediaLoader | |
from langchain_community.document_loaders import ArxivLoader | |
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage | |
from langchain_core.tools import tool | |
from supabase.client import Client, create_client | |
load_dotenv() | |
def multiply(a: int, b: int) -> int: | |
"""Multiply two numbers.""" | |
return a * b | |
def add(a: int, b: int) -> int: | |
"""Add two numbers.""" | |
return a + b | |
def subtract(a: int, b: int) -> int: | |
"""Subtract two numbers.""" | |
return a - b | |
def divide(a: int, b: int) -> int: | |
"""Divide two numbers.""" | |
if b == 0: | |
raise ValueError("Cannot divide by zero.") | |
return a / b | |
def modulus(a: int, b: int) -> int: | |
"""Get the modulus of two numbers.""" | |
return a % b | |
def wiki_search(query: str) -> str: | |
"""Search Wikipedia for a query and return maximum 2 results.""" | |
try: | |
search_docs = WikipediaLoader(query=query, load_max_docs=2).load() | |
formatted_docs = [] | |
for doc in search_docs: | |
source = "Wikipedia" | |
if hasattr(doc, 'metadata') and isinstance(doc.metadata, dict): | |
source = doc.metadata.get('source', 'Wikipedia') | |
formatted_docs.append(f"Source: {source}\n{doc.page_content[:1000]}...") | |
return "\n\n---\n\n".join(formatted_docs) | |
except Exception as e: | |
return f"Error searching Wikipedia: {str(e)}" | |
def web_search(query: str) -> str: | |
"""Search the web using Tavily.""" | |
try: | |
search_tool = TavilySearchResults(max_results=3) | |
results = search_tool.invoke(query) | |
if isinstance(results, list): | |
formatted_results = [] | |
for result in results: | |
if isinstance(result, dict): | |
url = result.get('url', 'Unknown') | |
content = result.get('content', '')[:1000] | |
formatted_results.append(f"Source: {url}\n{content}...") | |
return "\n\n---\n\n".join(formatted_results) | |
return str(results) | |
except Exception as e: | |
return f"Error searching web: {str(e)}" | |
def arxiv_search(query: str) -> str: | |
"""Search Arxiv for academic papers.""" | |
try: | |
search_docs = ArxivLoader(query=query, load_max_docs=3).load() | |
formatted_docs = [] | |
for doc in search_docs: | |
source = "ArXiv" | |
if hasattr(doc, 'metadata') and isinstance(doc.metadata, dict): | |
source = doc.metadata.get('source', 'ArXiv') | |
formatted_docs.append(f"Source: {source}\n{doc.page_content[:1000]}...") | |
return "\n\n---\n\n".join(formatted_docs) | |
except Exception as e: | |
return f"Error searching ArXiv: {str(e)}" | |
# Raw Supabase search function that bypasses LangChain entirely | |
def raw_supabase_search(query: str, supabase_client): | |
"""Direct Supabase search without any LangChain components""" | |
try: | |
# Simple text-based search using Supabase's built-in functions | |
# This assumes you have a simple text search function in your database | |
result = supabase_client.table('documents').select('content').text_search('content', query).limit(1).execute() | |
if result.data: | |
return result.data[0]['content'] | |
else: | |
# Fallback: get any document (for testing) | |
result = supabase_client.table('documents').select('content').limit(1).execute() | |
if result.data: | |
return result.data[0]['content'] | |
return "No documents found in database" | |
except Exception as e: | |
return f"Database search error: {str(e)}" | |
# Alternative: Use simple SQL query | |
def simple_sql_search(query: str, supabase_client): | |
"""Simple SQL-based search""" | |
try: | |
# Use a simple SQL query to avoid metadata issues | |
sql_query = f""" | |
SELECT content | |
FROM documents | |
WHERE content ILIKE '%{query}%' | |
LIMIT 1 | |
""" | |
result = supabase_client.rpc('execute_sql', {'query': sql_query}).execute() | |
if result.data: | |
return result.data[0]['content'] | |
return "No matching documents found" | |
except Exception as e: | |
return f"SQL search error: {str(e)}" | |
# Load system prompt | |
try: | |
with open("system_prompt.txt", "r", encoding="utf-8") as f: | |
system_prompt = f.read() | |
except FileNotFoundError: | |
system_prompt = "You are a helpful AI assistant." | |
sys_msg = SystemMessage(content=system_prompt) | |
# Initialize Supabase without vector store | |
supabase_url = "https://ajnakgegqblhwltzkzbz.supabase.co" | |
supabase_key = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImFqbmFrZ2VncWJsaHdsdHpremJ6Iiwicm9sZSI6ImFub24iLCJpYXQiOjE3NDkyMDgxODgsImV4cCI6MjA2NDc4NDE4OH0.b9RPF-5otedg4yiaQu_uhOgYpXVXd9D_0oR-9cluUjo" | |
try: | |
supabase_client = create_client(supabase_url, supabase_key) | |
except Exception as e: | |
print(f"Warning: Could not initialize Supabase client: {e}") | |
supabase_client = None | |
tools = [ | |
multiply, | |
add, | |
subtract, | |
divide, | |
modulus, | |
wiki_search, | |
web_search, | |
arxiv_search, | |
] | |
def build_graph(provider: str = "groq"): | |
"""Build the graph without problematic vector store operations""" | |
if provider == "groq": | |
llm = ChatGroq( | |
model="qwen-qwq-32b", | |
api_key="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", | |
temperature=0 | |
) | |
else: | |
raise ValueError("Invalid provider. Choose 'groq'.") | |
def retriever(state: MessagesState): | |
"""Retriever that actually searches based on query""" | |
try: | |
query = state["messages"][-1].content.lower() | |
if supabase_client is None: | |
return {"messages": [AIMessage(content="I don't have access to my knowledge base right now. Let me help you using my general knowledge or search tools instead. What would you like to know?")]} | |
print(f"Searching for: {query}") # Debug print | |
# Try text-based search in the content | |
try: | |
# Search for documents containing query terms | |
result = supabase_client.table('documents').select('content')\ | |
.ilike('content', f'%{query}%')\ | |
.limit(3).execute() | |
if result.data and len(result.data) > 0: | |
print(f"Found {len(result.data)} results") # Debug print | |
# Get the most relevant result | |
content = result.data[0].get('content', '') | |
# Look for final answer pattern | |
if "Final answer :" in content: | |
answer = content.split("Final answer :")[-1].strip() | |
else: | |
# Take relevant portion | |
answer = content.strip()[:800] | |
if len(content) > 800: | |
answer += "..." | |
return {"messages": [AIMessage(content=answer)]} | |
else: | |
print("No matching documents found") # Debug print | |
except Exception as e: | |
print(f"Text search failed: {e}") | |
# Fallback: Instead of returning same document, provide helpful response | |
return {"messages": [AIMessage(content=f"I couldn't find specific information about '{query}' in my knowledge base. Let me try to help you with my general knowledge, or would you like me to search the web for current information?")]} | |
except Exception as e: | |
return {"messages": [AIMessage(content=f"I'm having trouble accessing my knowledge base right now. How can I help you using web search or my general knowledge instead?")]} | |
# Build simple graph | |
builder = StateGraph(MessagesState) | |
builder.add_node("retriever", retriever) | |
builder.set_entry_point("retriever") | |
builder.set_finish_point("retriever") | |
return builder.compile() | |
# RECOMMENDED: Use this function instead of build_graph() | |
def build_working_graph(provider: str = "groq"): | |
"""Build a fully functional graph that actually works for different questions""" | |
if provider == "groq": | |
llm = ChatGroq( | |
model="qwen-qwq-32b", | |
api_key="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", | |
temperature=0 | |
) | |
else: | |
raise ValueError("Invalid provider.") | |
llm_with_tools = llm.bind_tools(tools) | |
def assistant(state: MessagesState): | |
"""Assistant that can provide different answers for different questions""" | |
# Add system message to the conversation | |
messages = [sys_msg] + state["messages"] | |
response = llm_with_tools.invoke(messages) | |
return {"messages": [response]} | |
# Build the graph | |
builder = StateGraph(MessagesState) | |
builder.add_node("assistant", assistant) | |
builder.add_node("tools", ToolNode(tools)) | |
builder.set_entry_point("assistant") | |
builder.add_conditional_edges("assistant", tools_condition) | |
builder.add_edge("tools", "assistant") | |
return builder.compile() | |
# Test function | |
def test_graph(): | |
"""Test the graph builds successfully""" | |
print("Building working graph (recommended)...") | |
try: | |
graph = build_working_graph() | |
print("β Working graph built successfully!") | |
return graph | |
except Exception as e: | |
print(f"β Working graph failed: {e}") | |
print("Testing retriever-based graph...") | |
try: | |
graph1 = build_graph() | |
print("β Retriever graph built successfully!") | |
return graph1 | |
except Exception as e: | |
print(f"β Retriever graph failed: {e}") | |
return None | |
if __name__ == "__main__": | |
question = "When was a picture of St. Thomas Aquinas first added to the Wikipedia page on the Principle of double effect?" | |
graph = test_graph() | |
messages = [HumanMessage(content=question)] | |
messages = graph.invoke({"messages": messages}) | |
for m in messages["messages"]: | |
m.pretty_print() |