Final_Assignment_Template / langgraph_agent.py
philincloud's picture
Update langgraph_agent.py
830a3ef verified
raw
history blame
9.48 kB
import os
import io
import contextlib
import pandas as pd
from typing import Dict, List, Union
# New imports for image and audio processing
from PIL import Image as PILImage # Used for type checking/potential future local processing
from huggingface_hub import InferenceClient
from langgraph.graph import START, StateGraph, MessagesState
from langgraph.prebuilt import tools_condition, ToolNode
from langchain_openai import ChatOpenAI
from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_community.document_loaders import WikipediaLoader, ArxivLoader
from langchain_core.messages import SystemMessage, HumanMessage
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.tools import tool
@tool
def multiply(a: int, b: int) -> int:
"""Multiply two integers."""
return a * b
@tool
def add(a: int, b: int) -> int:
"""Add two integers."""
return a + b
@tool
def subtract(a: int, b: int) -> int:
"""Subtract the second integer from the first."""
return a - b
@tool
def divide(a: int, b: int) -> float:
"""Divide first integer by second; error if divisor is zero."""
if b == 0:
raise ValueError("Cannot divide by zero.")
return a / b
@tool
def modulus(a: int, b: int) -> int:
"""Return the remainder of dividing first integer by second."""
return a % b
@tool
def wiki_search(query: str) -> dict:
"""Search Wikipedia for a query and return up to 2 documents."""
try:
docs = WikipediaLoader(query=query, load_max_docs=2, lang="en").load()
if not docs:
return {"wiki_results": f"No documents found on Wikipedia for '{query}'."}
formatted = "\n\n---\n\n".join(
f'<Document source="{d.metadata.get("source", "N/A")}"/>\n{d.page_content}'
for d in docs
)
return {"wiki_results": formatted}
except Exception as e:
print(f"Error in wiki_search tool: {e}")
return {"wiki_results": f"Error occurred while searching Wikipedia for '{query}'. Details: {str(e)}"}
@tool
def web_search(query: str) -> dict:
"""Perform a web search (via Tavily) and return up to 3 results."""
try:
docs = TavilySearchResults(max_results=3).invoke(query=query)
formatted = "\n\n---\n\n".join(
f'<Document source="{d.metadata["source"]}"/>\n{d.page_content}'
for d in docs
)
return {"web_results": formatted}
except Exception as e:
print(f"Error in web_search tool: {e}")
return {"web_results": f"Error occurred while searching the web for '{query}'. Details: {str(e)}"}
@tool
def arvix_search(query: str) -> dict:
"""Search arXiv for a query and return up to 3 paper excerpts."""
docs = ArxivLoader(query=query, load_max_docs=3).load()
formatted = "\n\n---\n\n".join(
f'<Document source="{d.metadata["source"]}"/>\n{d.page_content[:1000]}'
for d in docs
)
return {"arvix_results": formatted}
# Initialize Hugging Face Inference Client
HF_API_TOKEN = os.getenv("HF_API_TOKEN")
HF_INFERENCE_CLIENT = None
if HF_API_TOKEN:
HF_INFERENCE_CLIENT = InferenceClient(token=HF_API_TOKEN)
else:
print("WARNING: HF_API_TOKEN not set. Image and Audio tools will not function.")
@tool
def read_file_content(file_path: str) -> Dict[str, str]:
"""
Reads the content of a file and returns its primary information.
For text/code/excel, returns content. For media, returns a prompt to use specific tools.
"""
try:
_, file_extension = os.path.splitext(file_path)
file_extension = file_extension.lower()
if file_extension in (".txt", ".py"):
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
return {"file_type": "text/code", "file_name": file_path, "file_content": content}
elif file_extension == ".xlsx":
df = pd.read_excel(file_path)
content = df.to_string()
return {"file_type": "excel", "file_name": file_path, "file_content": content}
elif file_extension in (".jpeg", ".jpg", ".png"):
# Indicate that it's an image and needs to be described by a specific tool
return {"file_type": "image", "file_name": file_path, "file_content": f"Image file '{file_path}' detected. Use 'describe_image' tool to get a textual description."}
elif file_extension == ".mp3":
# Indicate that it's an audio file and needs to be transcribed by a specific tool
return {"file_type": "audio", "file_name": file_path, "file_content": f"Audio file '{file_path}' detected. Use 'transcribe_audio' tool to get the text transcription."}
else:
return {"file_type": "unsupported", "file_name": file_path, "file_content": f"Unsupported file type: {file_extension}. Only .txt, .py, .xlsx, .jpeg, .jpg, .png, .mp3 files are recognized."}
except FileNotFoundError:
return {"file_error": f"File not found: {file_path}. Please ensure the file exists in the environment."}
except Exception as e:
return {"file_error": f"Error reading file {file_path}: {e}"}
@tool
def python_interpreter(code: str) -> Dict[str, str]:
"""
Executes Python code and returns its standard output.
If there's an error during execution, it returns the error message.
"""
old_stdout = io.StringIO()
with contextlib.redirect_stdout(old_stdout):
try:
exec_globals = {}
exec_locals = {}
exec(code, exec_globals, exec_locals)
output = old_stdout.getvalue()
return {"execution_result": output.strip()}
except Exception as e:
return {"execution_error": str(e)}
@tool
def describe_image(image_path: str) -> Dict[str, str]:
"""
Generates a textual description for an image file (JPEG, JPG, PNG) using an image-to-text model
from the Hugging Face Inference API. Requires HF_API_TOKEN environment variable to be set.
"""
if not HF_INFERENCE_CLIENT:
return {"error": "Hugging Face API token not configured for image description. Cannot use this tool."}
try:
with open(image_path, "rb") as f:
image_bytes = f.read()
description = HF_INFERENCE_CLIENT.image_to_text(image_bytes)
return {"image_description": description, "image_path": image_path}
except FileNotFoundError:
return {"error": f"Image file not found: {image_path}. Please ensure the file exists."}
except Exception as e:
return {"error": f"Error describing image {image_path}: {str(e)}"}
@tool
def transcribe_audio(audio_path: str) -> Dict[str, str]:
"""
Transcribes an audio file (e.g., MP3) to text using an automatic speech recognition model
from the Hugging Face Inference API. Requires HF_API_TOKEN environment variable to be set.
"""
if not HF_INFERENCE_CLIENT:
return {"error": "Hugging Face API token not configured for audio transcription. Cannot use this tool."}
try:
with open(audio_path, "rb") as f:
audio_bytes = f.read()
transcription = HF_INFERENCE_CLIENT.automatic_speech_recognition(audio_bytes)
return {"audio_transcription": transcription, "audio_path": audio_path}
except FileNotFoundError:
return {"error": f"Audio file not found: {audio_path}. Please ensure the file exists."}
except Exception as e:
return {"error": f"Error transcribing audio {audio_path}: {str(e)}"}
API_KEY = os.getenv("GEMINI_API_KEY")
HF_SPACE_TOKEN = os.getenv("HF_SPACE_TOKEN")
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
tools = [
multiply, add, subtract, divide, modulus,
wiki_search, web_search, arvix_search,
read_file_content,
python_interpreter,
describe_image,
transcribe_audio, # Re-added tool
]
with open("prompt.txt", "r", encoding="utf-8") as f:
system_prompt = f.read()
sys_msg = SystemMessage(content=system_prompt)
def build_graph(provider: str = "gemini"):
"""Build the LangGraph agent with chosen LLM (default: Gemini)."""
if provider == "gemini":
llm = ChatGoogleGenerativeAI(
model= "gemini-1.5-flash-preview-05-20",
temperature=1.0,
max_retries=2,
api_key=GEMINI_API_KEY,
max_tokens=5000
)
elif provider == "huggingface":
llm = ChatHuggingFace(
llm=HuggingFaceEndpoint(
url="https://api-inference.huggingface.co/models/Meta-DeepLearning/llama-2-7b-chat-hf",
),
temperature=0,
)
else:
raise ValueError("Invalid provider. Choose 'gemini' or 'huggingface'.")
llm_with_tools = llm.bind_tools(tools)
def assistant(state: MessagesState):
messages_to_send = [sys_msg] + state["messages"]
return {"messages": [llm_with_tools.invoke(messages_to_send)]}
builder = StateGraph(MessagesState)
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "assistant")
builder.add_conditional_edges("assistant", tools_condition)
builder.add_edge("tools", "assistant")
return builder.compile()
if __name__ == "__main__":
# This block is intentionally left empty as per user request to remove examples.
# Your agent will interact with the graph by invoking it with messages.
pass