Update langgraph_agent.py
Browse files- langgraph_agent.py +43 -58
langgraph_agent.py
CHANGED
@@ -4,50 +4,44 @@ import contextlib
|
|
4 |
import pandas as pd
|
5 |
from typing import Dict, List, Union
|
6 |
|
7 |
-
|
8 |
-
from PIL import Image as PILImage # Used for type checking/potential future local processing
|
9 |
from huggingface_hub import InferenceClient
|
10 |
|
11 |
from langgraph.graph import START, StateGraph, MessagesState
|
12 |
from langgraph.prebuilt import tools_condition, ToolNode
|
13 |
from langchain_openai import ChatOpenAI
|
14 |
from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint
|
15 |
-
from langchain_community.tools.tavily_search import TavilySearchResults
|
16 |
from langchain_community.document_loaders import WikipediaLoader, ArxivLoader
|
17 |
from langchain_core.messages import SystemMessage, HumanMessage
|
18 |
from langchain_google_genai import ChatGoogleGenerativeAI
|
19 |
from langchain_core.tools import tool
|
20 |
|
|
|
|
|
21 |
@tool
|
22 |
def multiply(a: int, b: int) -> int:
|
23 |
-
"""Multiply two integers."""
|
24 |
return a * b
|
25 |
|
26 |
@tool
|
27 |
def add(a: int, b: int) -> int:
|
28 |
-
"""Add two integers."""
|
29 |
return a + b
|
30 |
|
31 |
@tool
|
32 |
def subtract(a: int, b: int) -> int:
|
33 |
-
"""Subtract the second integer from the first."""
|
34 |
return a - b
|
35 |
|
36 |
@tool
|
37 |
def divide(a: int, b: int) -> float:
|
38 |
-
"""Divide first integer by second; error if divisor is zero."""
|
39 |
if b == 0:
|
40 |
raise ValueError("Cannot divide by zero.")
|
41 |
return a / b
|
42 |
|
43 |
@tool
|
44 |
def modulus(a: int, b: int) -> int:
|
45 |
-
"""Return the remainder of dividing first integer by second."""
|
46 |
return a % b
|
47 |
|
48 |
@tool
|
49 |
def wiki_search(query: str) -> dict:
|
50 |
-
"""Search Wikipedia for a query and return up to 2 documents."""
|
51 |
try:
|
52 |
docs = WikipediaLoader(query=query, load_max_docs=2, lang="en").load()
|
53 |
if not docs:
|
@@ -61,23 +55,21 @@ def wiki_search(query: str) -> dict:
|
|
61 |
print(f"Error in wiki_search tool: {e}")
|
62 |
return {"wiki_results": f"Error occurred while searching Wikipedia for '{query}'. Details: {str(e)}"}
|
63 |
|
|
|
|
|
|
|
|
|
64 |
@tool
|
65 |
-
def
|
66 |
-
"""Perform a web search (via Tavily) and return up to 3 results."""
|
67 |
try:
|
68 |
-
docs =
|
69 |
-
|
70 |
-
f'<Document source="{d.metadata["source"]}"/>\n{d.page_content}'
|
71 |
-
for d in docs
|
72 |
-
)
|
73 |
-
return {"web_results": formatted}
|
74 |
except Exception as e:
|
75 |
-
print(f"Error in
|
76 |
-
return {"
|
77 |
|
78 |
@tool
|
79 |
def arvix_search(query: str) -> dict:
|
80 |
-
"""Search arXiv for a query and return up to 3 paper excerpts."""
|
81 |
docs = ArxivLoader(query=query, load_max_docs=3).load()
|
82 |
formatted = "\n\n---\n\n".join(
|
83 |
f'<Document source="{d.metadata["source"]}"/>\n{d.page_content[:1000]}'
|
@@ -85,7 +77,6 @@ def arvix_search(query: str) -> dict:
|
|
85 |
)
|
86 |
return {"arvix_results": formatted}
|
87 |
|
88 |
-
# Initialize Hugging Face Inference Client
|
89 |
HF_API_TOKEN = os.getenv("HF_API_TOKEN")
|
90 |
HF_INFERENCE_CLIENT = None
|
91 |
if HF_API_TOKEN:
|
@@ -95,10 +86,6 @@ else:
|
|
95 |
|
96 |
@tool
|
97 |
def read_file_content(file_path: str) -> Dict[str, str]:
|
98 |
-
"""
|
99 |
-
Reads the content of a file and returns its primary information.
|
100 |
-
For text/code/excel, returns content. For media, returns a prompt to use specific tools.
|
101 |
-
"""
|
102 |
try:
|
103 |
_, file_extension = os.path.splitext(file_path)
|
104 |
file_extension = file_extension.lower()
|
@@ -108,14 +95,12 @@ def read_file_content(file_path: str) -> Dict[str, str]:
|
|
108 |
content = f.read()
|
109 |
return {"file_type": "text/code", "file_name": file_path, "file_content": content}
|
110 |
elif file_extension == ".xlsx":
|
111 |
-
|
112 |
-
|
113 |
-
|
114 |
elif file_extension in (".jpeg", ".jpg", ".png"):
|
115 |
-
# Indicate that it's an image and needs to be described by a specific tool
|
116 |
return {"file_type": "image", "file_name": file_path, "file_content": f"Image file '{file_path}' detected. Use 'describe_image' tool to get a textual description."}
|
117 |
elif file_extension == ".mp3":
|
118 |
-
# Indicate that it's an audio file and needs to be transcribed by a specific tool
|
119 |
return {"file_type": "audio", "file_name": file_path, "file_content": f"Audio file '{file_path}' detected. Use 'transcribe_audio' tool to get the text transcription."}
|
120 |
else:
|
121 |
return {"file_type": "unsupported", "file_name": file_path, "file_content": f"Unsupported file type: {file_extension}. Only .txt, .py, .xlsx, .jpeg, .jpg, .png, .mp3 files are recognized."}
|
@@ -126,10 +111,6 @@ def read_file_content(file_path: str) -> Dict[str, str]:
|
|
126 |
|
127 |
@tool
|
128 |
def python_interpreter(code: str) -> Dict[str, str]:
|
129 |
-
"""
|
130 |
-
Executes Python code and returns its standard output.
|
131 |
-
If there's an error during execution, it returns the error message.
|
132 |
-
"""
|
133 |
old_stdout = io.StringIO()
|
134 |
with contextlib.redirect_stdout(old_stdout):
|
135 |
try:
|
@@ -143,10 +124,6 @@ def python_interpreter(code: str) -> Dict[str, str]:
|
|
143 |
|
144 |
@tool
|
145 |
def describe_image(image_path: str) -> Dict[str, str]:
|
146 |
-
"""
|
147 |
-
Generates a textual description for an image file (JPEG, JPG, PNG) using an image-to-text model
|
148 |
-
from the Hugging Face Inference API. Requires HF_API_TOKEN environment variable to be set.
|
149 |
-
"""
|
150 |
if not HF_INFERENCE_CLIENT:
|
151 |
return {"error": "Hugging Face API token not configured for image description. Cannot use this tool."}
|
152 |
try:
|
@@ -159,38 +136,48 @@ def describe_image(image_path: str) -> Dict[str, str]:
|
|
159 |
except Exception as e:
|
160 |
return {"error": f"Error describing image {image_path}: {str(e)}"}
|
161 |
|
162 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
163 |
|
164 |
API_KEY = os.getenv("GEMINI_API_KEY")
|
165 |
HF_API_TOKEN = os.getenv("HF_SPACE_TOKEN")
|
166 |
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
|
167 |
|
168 |
-
|
169 |
tools = [
|
170 |
multiply, add, subtract, divide, modulus,
|
171 |
-
wiki_search,
|
|
|
|
|
172 |
read_file_content,
|
173 |
python_interpreter,
|
174 |
describe_image,
|
|
|
175 |
]
|
176 |
|
177 |
-
|
178 |
with open("prompt.txt", "r", encoding="utf-8") as f:
|
179 |
system_prompt = f.read()
|
180 |
sys_msg = SystemMessage(content=system_prompt)
|
181 |
|
182 |
-
|
183 |
def build_graph(provider: str = "gemini"):
|
184 |
-
"""Build the LangGraph agent with chosen LLM (default: Gemini)."""
|
185 |
if provider == "gemini":
|
186 |
llm = ChatGoogleGenerativeAI(
|
187 |
-
|
188 |
-
|
189 |
-
|
190 |
-
|
191 |
-
|
192 |
-
)
|
193 |
-
|
194 |
elif provider == "huggingface":
|
195 |
llm = ChatHuggingFace(
|
196 |
llm=HuggingFaceEndpoint(
|
@@ -204,10 +191,10 @@ def build_graph(provider: str = "gemini"):
|
|
204 |
llm_with_tools = llm.bind_tools(tools)
|
205 |
|
206 |
def assistant(state: MessagesState):
|
207 |
-
|
208 |
-
|
209 |
-
|
210 |
-
|
211 |
|
212 |
builder = StateGraph(MessagesState)
|
213 |
builder.add_node("assistant", assistant)
|
@@ -219,6 +206,4 @@ def build_graph(provider: str = "gemini"):
|
|
219 |
return builder.compile()
|
220 |
|
221 |
if __name__ == "__main__":
|
222 |
-
|
223 |
-
# Your agent will interact with the graph by invoking it with messages.
|
224 |
-
pass
|
|
|
4 |
import pandas as pd
|
5 |
from typing import Dict, List, Union
|
6 |
|
7 |
+
from PIL import Image as PILImage
|
|
|
8 |
from huggingface_hub import InferenceClient
|
9 |
|
10 |
from langgraph.graph import START, StateGraph, MessagesState
|
11 |
from langgraph.prebuilt import tools_condition, ToolNode
|
12 |
from langchain_openai import ChatOpenAI
|
13 |
from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint
|
|
|
14 |
from langchain_community.document_loaders import WikipediaLoader, ArxivLoader
|
15 |
from langchain_core.messages import SystemMessage, HumanMessage
|
16 |
from langchain_google_genai import ChatGoogleGenerativeAI
|
17 |
from langchain_core.tools import tool
|
18 |
|
19 |
+
from langchain_google_community.tools.Google Search import GoogleSearchResults
|
20 |
+
|
21 |
@tool
|
22 |
def multiply(a: int, b: int) -> int:
|
|
|
23 |
return a * b
|
24 |
|
25 |
@tool
|
26 |
def add(a: int, b: int) -> int:
|
|
|
27 |
return a + b
|
28 |
|
29 |
@tool
|
30 |
def subtract(a: int, b: int) -> int:
|
|
|
31 |
return a - b
|
32 |
|
33 |
@tool
|
34 |
def divide(a: int, b: int) -> float:
|
|
|
35 |
if b == 0:
|
36 |
raise ValueError("Cannot divide by zero.")
|
37 |
return a / b
|
38 |
|
39 |
@tool
|
40 |
def modulus(a: int, b: int) -> int:
|
|
|
41 |
return a % b
|
42 |
|
43 |
@tool
|
44 |
def wiki_search(query: str) -> dict:
|
|
|
45 |
try:
|
46 |
docs = WikipediaLoader(query=query, load_max_docs=2, lang="en").load()
|
47 |
if not docs:
|
|
|
55 |
print(f"Error in wiki_search tool: {e}")
|
56 |
return {"wiki_results": f"Error occurred while searching Wikipedia for '{query}'. Details: {str(e)}"}
|
57 |
|
58 |
+
Google Search_tool = GoogleSearchResults(
|
59 |
+
api_key=os.getenv("GOOGLE_API_KEY"),
|
60 |
+
engine_id=os.getenv("GOOGLE_CSE_ID")
|
61 |
+
)
|
62 |
@tool
|
63 |
+
def google_web_search(query: str) -> dict:
|
|
|
64 |
try:
|
65 |
+
docs = Google Search_tool.invoke(query)
|
66 |
+
return {"google_web_results": docs}
|
|
|
|
|
|
|
|
|
67 |
except Exception as e:
|
68 |
+
print(f"Error in google_web_search tool: {e}")
|
69 |
+
return {"google_web_results": f"Error occurred while searching the web for '{query}'. Details: {str(e)}"}
|
70 |
|
71 |
@tool
|
72 |
def arvix_search(query: str) -> dict:
|
|
|
73 |
docs = ArxivLoader(query=query, load_max_docs=3).load()
|
74 |
formatted = "\n\n---\n\n".join(
|
75 |
f'<Document source="{d.metadata["source"]}"/>\n{d.page_content[:1000]}'
|
|
|
77 |
)
|
78 |
return {"arvix_results": formatted}
|
79 |
|
|
|
80 |
HF_API_TOKEN = os.getenv("HF_API_TOKEN")
|
81 |
HF_INFERENCE_CLIENT = None
|
82 |
if HF_API_TOKEN:
|
|
|
86 |
|
87 |
@tool
|
88 |
def read_file_content(file_path: str) -> Dict[str, str]:
|
|
|
|
|
|
|
|
|
89 |
try:
|
90 |
_, file_extension = os.path.splitext(file_path)
|
91 |
file_extension = file_extension.lower()
|
|
|
95 |
content = f.read()
|
96 |
return {"file_type": "text/code", "file_name": file_path, "file_content": content}
|
97 |
elif file_extension == ".xlsx":
|
98 |
+
df = pd.read_excel(file_path)
|
99 |
+
content = df.to_string()
|
100 |
+
return {"file_type": "excel", "file_name": file_path, "file_content": content}
|
101 |
elif file_extension in (".jpeg", ".jpg", ".png"):
|
|
|
102 |
return {"file_type": "image", "file_name": file_path, "file_content": f"Image file '{file_path}' detected. Use 'describe_image' tool to get a textual description."}
|
103 |
elif file_extension == ".mp3":
|
|
|
104 |
return {"file_type": "audio", "file_name": file_path, "file_content": f"Audio file '{file_path}' detected. Use 'transcribe_audio' tool to get the text transcription."}
|
105 |
else:
|
106 |
return {"file_type": "unsupported", "file_name": file_path, "file_content": f"Unsupported file type: {file_extension}. Only .txt, .py, .xlsx, .jpeg, .jpg, .png, .mp3 files are recognized."}
|
|
|
111 |
|
112 |
@tool
|
113 |
def python_interpreter(code: str) -> Dict[str, str]:
|
|
|
|
|
|
|
|
|
114 |
old_stdout = io.StringIO()
|
115 |
with contextlib.redirect_stdout(old_stdout):
|
116 |
try:
|
|
|
124 |
|
125 |
@tool
|
126 |
def describe_image(image_path: str) -> Dict[str, str]:
|
|
|
|
|
|
|
|
|
127 |
if not HF_INFERENCE_CLIENT:
|
128 |
return {"error": "Hugging Face API token not configured for image description. Cannot use this tool."}
|
129 |
try:
|
|
|
136 |
except Exception as e:
|
137 |
return {"error": f"Error describing image {image_path}: {str(e)}"}
|
138 |
|
139 |
+
@tool
|
140 |
+
def transcribe_audio(audio_path: str) -> Dict[str, str]:
|
141 |
+
if not HF_INFERENCE_CLIENT:
|
142 |
+
return {"error": "Hugging Face API token not configured for audio transcription. Cannot use this tool."}
|
143 |
+
try:
|
144 |
+
with open(audio_path, "rb") as f:
|
145 |
+
audio_bytes = f.read()
|
146 |
+
transcription = HF_INFERENCE_CLIENT.automatic_speech_recognition(audio_bytes)
|
147 |
+
return {"audio_transcription": transcription, "audio_path": audio_path}
|
148 |
+
except FileNotFoundError:
|
149 |
+
return {"error": f"Audio file not found: {audio_path}. Please ensure the file exists."}
|
150 |
+
except Exception as e:
|
151 |
+
return {"error": f"Error transcribing audio {audio_path}: {str(e)}"}
|
152 |
|
153 |
API_KEY = os.getenv("GEMINI_API_KEY")
|
154 |
HF_API_TOKEN = os.getenv("HF_SPACE_TOKEN")
|
155 |
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
|
156 |
|
|
|
157 |
tools = [
|
158 |
multiply, add, subtract, divide, modulus,
|
159 |
+
wiki_search,
|
160 |
+
google_web_search,
|
161 |
+
arvix_search,
|
162 |
read_file_content,
|
163 |
python_interpreter,
|
164 |
describe_image,
|
165 |
+
transcribe_audio,
|
166 |
]
|
167 |
|
|
|
168 |
with open("prompt.txt", "r", encoding="utf-8") as f:
|
169 |
system_prompt = f.read()
|
170 |
sys_msg = SystemMessage(content=system_prompt)
|
171 |
|
|
|
172 |
def build_graph(provider: str = "gemini"):
|
|
|
173 |
if provider == "gemini":
|
174 |
llm = ChatGoogleGenerativeAI(
|
175 |
+
model="gemini-2.5-pro-preview-05-06",
|
176 |
+
temperature=1.0,
|
177 |
+
max_retries=2,
|
178 |
+
api_key=GEMINI_API_KEY,
|
179 |
+
max_tokens=5000
|
180 |
+
)
|
|
|
181 |
elif provider == "huggingface":
|
182 |
llm = ChatHuggingFace(
|
183 |
llm=HuggingFaceEndpoint(
|
|
|
191 |
llm_with_tools = llm.bind_tools(tools)
|
192 |
|
193 |
def assistant(state: MessagesState):
|
194 |
+
messages_to_send = [sys_msg] + state["messages"]
|
195 |
+
llm_response = llm_with_tools.invoke(messages_to_send)
|
196 |
+
print(f"LLM Raw Response: {llm_response}")
|
197 |
+
return {"messages": [llm_response]}
|
198 |
|
199 |
builder = StateGraph(MessagesState)
|
200 |
builder.add_node("assistant", assistant)
|
|
|
206 |
return builder.compile()
|
207 |
|
208 |
if __name__ == "__main__":
|
209 |
+
pass
|
|
|
|