|
import os |
|
import io |
|
import contextlib |
|
import pandas as pd |
|
from typing import Dict, List, Union |
|
|
|
from PIL import Image as PILImage |
|
from huggingface_hub import InferenceClient |
|
from langgraph.graph import START, StateGraph, MessagesState |
|
from langgraph.prebuilt import tools_condition, ToolNode |
|
from langchain_openai import ChatOpenAI |
|
from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint |
|
from langchain_community.document_loaders import WikipediaLoader, ArxivLoader |
|
from langchain_core.messages import SystemMessage, HumanMessage |
|
from langchain_google_genai import ChatGoogleGenerativeAI |
|
from langchain_core.tools import tool |
|
|
|
|
|
from langchain_google_community import GoogleSearchAPIWrapper |
|
|
|
@tool |
|
def multiply(a: int, b: int) -> int: |
|
return a * b |
|
|
|
@tool |
|
def add(a: int, b: int) -> int: |
|
return a + b |
|
|
|
@tool |
|
def subtract(a: int, b: int) -> int: |
|
return a - b |
|
|
|
@tool |
|
def divide(a: int, b: int) -> float: |
|
if b == 0: |
|
raise ValueError("Cannot divide by zero.") |
|
return a / b |
|
|
|
@tool |
|
def modulus(a: int, b: int) -> int: |
|
return a % b |
|
|
|
@tool |
|
def wiki_search(query: str) -> dict: |
|
try: |
|
docs = WikipediaLoader(query=query, load_max_docs=2, lang="en").load() |
|
if not docs: |
|
return {"wiki_results": f"No documents found on Wikipedia for '{query}'."} |
|
formatted = "\n\n---\n\n".join( |
|
f'<Document source="{d.metadata.get("source", "N/A")}"/>\n{d.page_content}' |
|
for d in docs |
|
) |
|
return {"wiki_results": formatted} |
|
except Exception as e: |
|
print(f"Error in wiki_search tool: {e}") |
|
return {"wiki_results": f"Error occurred while searching Wikipedia for '{query}'. Details: {str(e)}"} |
|
|
|
|
|
search = GoogleSearchAPIWrapper() |
|
|
|
@tool |
|
def google_web_search(query: str) -> str: |
|
try: |
|
|
|
return search.run(query) |
|
except Exception as e: |
|
print(f"Error in google_web_search tool: {e}") |
|
return f"Error occurred while searching the web for '{query}'. Details: {str(e)}" |
|
|
|
@tool |
|
def arvix_search(query: str) -> dict: |
|
docs = ArxivLoader(query=query, load_max_docs=3).load() |
|
formatted = "\n\n---\n\n".join( |
|
f'<Document source="{d.metadata["source"]}"/>\n{d.page_content[:1000]}' |
|
for d in docs |
|
) |
|
return {"arvix_results": formatted} |
|
|
|
HF_API_TOKEN = os.getenv("HF_API_TOKEN") |
|
HF_INFERENCE_CLIENT = None |
|
if HF_API_TOKEN: |
|
HF_INFERENCE_CLIENT = InferenceClient(token=HF_API_TOKEN) |
|
else: |
|
print("WARNING: HF_API_TOKEN not set. Image and Audio tools will not function.") |
|
|
|
@tool |
|
def read_file_content(file_path: str) -> Dict[str, str]: |
|
try: |
|
_, file_extension = os.path.splitext(file_path) |
|
file_extension = file_extension.lower() |
|
|
|
if file_extension in (".txt", ".py"): |
|
with open(file_path, "r", encoding="utf-8") as f: |
|
content = f.read() |
|
return {"file_type": "text/code", "file_name": file_path, "file_content": content} |
|
elif file_extension == ".xlsx": |
|
df = pd.read_excel(file_path) |
|
content = df.to_string() |
|
return {"file_type": "excel", "file_name": file_path, "file_content": content} |
|
elif file_extension in (".jpeg", ".jpg", ".png"): |
|
return {"file_type": "image", "file_name": file_path, "file_content": f"Image file '{file_path}' detected. Use 'describe_image' tool to get a textual description."} |
|
elif file_extension == ".mp3": |
|
return {"file_type": "audio", "file_name": file_path, "file_content": f"Audio file '{file_path}' detected. Use 'transcribe_audio' tool to get the text transcription."} |
|
else: |
|
return {"file_type": "unsupported", "file_name": file_path, "file_content": f"Unsupported file type: {file_extension}. Only .txt, .py, .xlsx, .jpeg, .jpg, .png, .mp3 files are recognized."} |
|
except FileNotFoundError: |
|
return {"file_error": f"File not found: {file_path}. Please ensure the file exists in the environment."} |
|
except Exception as e: |
|
return {"file_error": f"Error reading file {file_path}: {e}"} |
|
|
|
@tool |
|
def python_interpreter(code: str) -> Dict[str, str]: |
|
old_stdout = io.StringIO() |
|
with contextlib.redirect_stdout(old_stdout): |
|
try: |
|
exec_globals = {} |
|
exec_locals = {} |
|
exec(code, exec_globals, exec_locals) |
|
output = old_stdout.getvalue() |
|
return {"execution_result": output.strip()} |
|
except Exception as e: |
|
return {"execution_error": str(e)} |
|
|
|
@tool |
|
def describe_image(image_path: str) -> Dict[str, str]: |
|
if not HF_INFERENCE_CLIENT: |
|
return {"error": "Hugging Face API token not configured for image description. Cannot use this tool."} |
|
try: |
|
with open(image_path, "rb") as f: |
|
image_bytes = f.read() |
|
description = HF_INFERENCE_CLIENT.image_to_text(image_bytes) |
|
return {"image_description": description, "image_path": image_path} |
|
except FileNotFoundError: |
|
return {"error": f"Image file not found: {image_path}. Please ensure the file exists."} |
|
except Exception as e: |
|
return {"error": f"Error describing image {image_path}: {str(e)}"} |
|
|
|
@tool |
|
def transcribe_audio(audio_path: str) -> Dict[str, str]: |
|
if not HF_INFERENCE_CLIENT: |
|
return {"error": "Hugging Face API token not configured for audio transcription. Cannot use this tool."} |
|
try: |
|
with open(audio_path, "rb") as f: |
|
audio_bytes = f.read() |
|
transcription = HF_INFERENCE_CLIENT.automatic_speech_recognition(audio_bytes) |
|
return {"audio_transcription": transcription, "audio_path": audio_path} |
|
except FileNotFoundError: |
|
return {"error": f"Audio file not found: {audio_path}. Please ensure the file exists."} |
|
except Exception as e: |
|
return {"error": f"Error transcribing audio {audio_path}: {str(e)}"} |
|
|
|
API_KEY = os.getenv("GEMINI_API_KEY") |
|
HF_API_TOKEN = os.getenv("HF_SPACE_TOKEN") |
|
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") |
|
|
|
tools = [ |
|
multiply, add, subtract, divide, modulus, |
|
wiki_search, |
|
google_web_search, |
|
arvix_search, |
|
read_file_content, |
|
python_interpreter, |
|
describe_image, |
|
transcribe_audio, |
|
] |
|
|
|
with open("prompt.txt", "r", encoding="utf-8") as f: |
|
system_prompt = f.read() |
|
sys_msg = SystemMessage(content=system_prompt) |
|
|
|
def build_graph(provider: str = "gemini"): |
|
if provider == "gemini": |
|
llm = ChatGoogleGenerativeAI( |
|
model="gemini-2.5-pro-preview-05-06", |
|
temperature=1.0, |
|
max_retries=2, |
|
api_key=GEMINI_API_KEY, |
|
max_tokens=5000 |
|
) |
|
elif provider == "huggingface": |
|
llm = ChatHuggingFace( |
|
llm=HuggingFaceEndpoint( |
|
url="https://api-inference.huggingface.co/models/Meta-DeepLearning/llama-2-7b-chat-hf", |
|
), |
|
temperature=0, |
|
) |
|
else: |
|
raise ValueError("Invalid provider. Choose 'gemini' or 'huggingface'.") |
|
|
|
llm_with_tools = llm.bind_tools(tools) |
|
|
|
def assistant(state: MessagesState): |
|
messages_to_send = [sys_msg] + state["messages"] |
|
llm_response = llm_with_tools.invoke(messages_to_send) |
|
print(f"LLM Raw Response: {llm_response}") |
|
return {"messages": [llm_response]} |
|
|
|
builder = StateGraph(MessagesState) |
|
builder.add_node("assistant", assistant) |
|
builder.add_node("tools", ToolNode(tools)) |
|
builder.add_edge(START, "assistant") |
|
builder.add_conditional_edges("assistant", tools_condition) |
|
builder.add_edge("tools", "assistant") |
|
|
|
return builder.compile() |
|
|
|
if __name__ == "__main__": |
|
pass |