scratch_agent / utils /agent.py
WebashalarForML's picture
Upload 24 files
3d3703f verified
#─── Basic imports ─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
import os
import math
import sqlite3
import fitz # PyMuPDF for PDF parsing
import re
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv() # This line ensures .env variables are loaded
from langgraph.graph import START, StateGraph, MessagesState, END
from langgraph.prebuilt import tools_condition
from langgraph.prebuilt import ToolNode
from langgraph.constants import START
from langchain_core.tools import tool
from langchain.schema import SystemMessage
#from langchain.chat_models import init_chat_model
#from langgraph.prebuilt import create_react_agent
from langchain.embeddings import HuggingFaceEmbeddings
#from langchain.vectorstores import Pinecone
from langchain.tools.retriever import create_retriever_tool
#import pinecone
#from pinecone import Pinecone as PineconeClient, ServerlessSpec
#from pinecone import Index # the blocking‐call client constructor
#from pinecone import Pinecone as PineconeClient, ServerlessSpec
from langchain.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores.pinecone import Pinecone as LC_Pinecone
# ─── Langchain Frameworks ─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
#from langchain.tools import Tool
from langchain.chat_models import ChatOpenAI
from langchain_groq import ChatGroq
from langchain_mistralai import ChatMistralAI
from langchain.agents import initialize_agent, AgentType
from langchain.schema import Document
from langchain.chains import RetrievalQA
from langchain.embeddings import OpenAIEmbeddings
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.prompts import PromptTemplate
from langchain_community.document_loaders import TextLoader, PyMuPDFLoader
from langchain_community.document_loaders.wikipedia import WikipediaLoader
from langchain_community.document_loaders.arxiv import ArxivLoader
from langchain_experimental.tools.python.tool import PythonREPLTool
# ─── Memory ───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
from langchain.agents import initialize_agent, AgentType
from langchain.tools import Tool
from typing import List, Callable
from langchain.schema import BaseMemory, AIMessage, HumanMessage, SystemMessage
from langchain.schema import HumanMessage, SystemMessage
from langchain.llms.base import LLM
from langchain.memory.chat_memory import BaseChatMemory
from pydantic import PrivateAttr
from langchain_core.messages import get_buffer_string
# ─── Image Processing ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
from PIL import Image
import pytesseract
from transformers import pipeline
from groq import Groq
import requests
from io import BytesIO
from transformers import pipeline, TrOCRProcessor, VisionEncoderDecoderModel
import requests
import base64
from PIL import UnidentifiedImageError
# ─── Browser var ─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
from typing import List, Dict
import json
from io import BytesIO
#from langchain.tools import tool # or langchain_core.tools
from playwright.sync_api import sync_playwright
from duckduckgo_search import DDGS
import time
import random
import logging
from functools import lru_cache, wraps
import requests
from playwright.sync_api import sync_playwright
from bs4 import BeautifulSoup
import tenacity
from tenacity import retry, stop_after_attempt, wait_exponential
# Initialize logger
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# Additional imports for new functionality
import pandas as pd
from PyPDF2 import PdfReader
import docx
import pytesseract
import speech_recognition as sr
from pydub import AudioSegment
from pytube import YouTube
from newspaper import Article
from langchain.document_loaders import ArxivLoader
from langchain_community.document_loaders.youtube import YoutubeLoader, TranscriptFormat
from playwright.sync_api import sync_playwright
# Attempt to import Playwright for dynamic page rendering
try:
from playwright.sync_api import sync_playwright
_playwright_available = True
except ImportError:
_playwright_available = False
# Define forbidden keywords for basic NSFW filtering
_forbidden = ["porn", "sex", "xxx", "nude", "erotic"]
# ─── LLM Setup ───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
# Load OpenAI API key from environment (required for LLM and embeddings)
# API Keys from .env file
os.environ.setdefault("OPENAI_API_KEY", "<YOUR_OPENAI_KEY>") # Set your own key or env var
os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY", "default_key_or_placeholder")
os.environ["MISTRAL_API_KEY"] = os.getenv("MISTRAL_API_KEY", "default_key_or_placeholder")
# Tavily API Key
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY", "default_key_or_placeholder")
_forbidden = ["nsfw", "porn", "sex", "explicit"]
_playwright_available = True # set False to disable Playwright
# Globals for RAG system
vector_store = None
rag_chain = None
DB_PATH = None # will be set when a .db is uploaded
DOC_PATH = None # will be set when a document is uploaded
IMG_PATH = None # will be set when an image is uploaded
OTH_PATH = None # will be set when an other file is uploaded
# ─── LLMS ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
#llm = ChatOpenAI(model_name="gpt-3.5-turbo", streaming=True, temperature=0)
from tenacity import retry, stop_after_attempt, wait_exponential
# Import the RetryingChatGroq client
from retry_groq import RetryingChatGroq
# Use the retrying version instead
llm = RetryingChatGroq(model="deepseek-r1-distill-llama-70b", streaming=False, temperature=0)
#llm = ChatMistralAI(model="mistral-large-latest", streaming=True, temperature=0)
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
# ─────────────────────────────────────────────── Tool for multiply ──────────────────────────────────────────────────────────────────────
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
@tool(parse_docstring=True)
def multiply(a: int, b: int) -> int:
"""
Multiply two numbers.
Args:
a (int): The first factor.
b (int): The second factor.
Returns:
int: The product of a and b.
"""
try:
# Direct calculation without relying on LangChain handling
result = a * b
return result
except Exception as e:
return f"Error in multiplication: {str(e)}"
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
# ─────────────────────────────────────────────── Tool for add ──────────────────────────────────────────────────────────────────────────
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
@tool(parse_docstring=True)
def add(a: int, b: int) -> int:
"""
Add two numbers.
Args:
a (int): The first factor.
b (int): The second factor.
Returns:
int: The addition of a and b.
"""
try:
# Direct calculation without relying on LangChain handling
result = a + b
return result
except Exception as e:
return f"Error in addition: {str(e)}"
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
# ─────────────────────────────────────────────── Tool for subtract ──────────────────────────────────────────────────────────────────────
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
@tool(parse_docstring=True)
def subtract(a: int, b: int) -> int:
"""
Subtract two numbers.
Args:
a (int): The first factor.
b (int): The second factor.
Returns:
int: The subtraction of a and b.
"""
try:
# Direct calculation without relying on LangChain handling
result = a - b
return result
except Exception as e:
return f"Error in subtraction: {str(e)}"
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
# ─────────────────────────────────────────────── Tool for divide ──────────────────────────────────────────────────────────────────────
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
@tool(parse_docstring=True)
def divide(a: int, b: int) -> int:
"""
Divide two numbers.
Args:
a (int): The numerator.
b (int): The denominator.
Returns:
float: The result of a divided by b.
Raises:
ValueError: If b is zero.
"""
try:
if b == 0:
return "Error: Cannot divide by zero."
# Direct calculation without relying on LangChain handling
result = a / b
return result
except Exception as e:
return f"Error in division: {str(e)}"
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
# ─────────────────────────────────────────────── Tool for modulus ──────────────────────────────────────────────────────────────────────
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
@tool(parse_docstring=True)
def modulus(a: int, b: int) -> int:
"""
Get the modulus (remainder) of two numbers.
Args:
a (int): The dividend.
b (int): The divisor.
Returns:
int: The remainder when a is divided by b.
"""
try:
if b == 0:
return "Error: Cannot calculate modulus with zero divisor."
# Direct calculation without relying on LangChain handling
result = a % b
return result
except Exception as e:
return f"Error in modulus calculation: {str(e)}"
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
# ─────────────────────────────────────────────── Tool for browsing ──────────────────────────────────────────────────────────────────────
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
def with_retry(max_attempts: int = 3, backoff_base: int = 2):
"""
Decorator for retrying a function with exponential backoff on exception.
"""
def decorator(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return fn(*args, **kwargs)
except Exception as e:
wait = backoff_base ** attempt + random.uniform(0, 1)
logger.warning(f"{fn.__name__} failed (attempt {attempt+1}/{max_attempts}): {e}")
if attempt < max_attempts - 1:
time.sleep(wait)
logger.error(f"{fn.__name__} failed after {max_attempts} attempts.")
return []
return wrapper
return decorator
@with_retry()
@lru_cache(maxsize=128)
def tavily_search(query: str, top_k: int = 3) -> List[Dict]:
"""Call Tavily API and return a list of result dicts."""
if not TAVILY_API_KEY:
logger.info("[Tavily] No API key set. Skipping Tavily search.")
return []
url = "https://api.tavily.com/search"
headers = {
"Authorization": f"Bearer {TAVILY_API_KEY}",
"Content-Type": "application/json",
}
payload = {"query": query, "num_results": top_k}
resp = requests.post(url, headers=headers, json=payload, timeout=10)
resp.raise_for_status()
data = resp.json()
results = []
for item in data.get("results", []):
results.append({
"title": item.get("title", ""),
"url": item.get("url", ""),
"content": item.get("content", "")[:200],
"source": "Tavily"
})
return results
@with_retry()
@lru_cache(maxsize=128)
def duckduckgo_search(query: str, top_k: int = 3) -> List[Dict]:
"""Query DuckDuckGo and return up to top_k raw SERP hits."""
results = []
try:
with DDGS(timeout=15) as ddgs: # Increase timeout from default
for hit in ddgs.text(query, safesearch="On", max_results=top_k, timeout=15):
results.append({
"title": hit.get("title", ""),
"url": hit.get("href") or hit.get("url", ""),
"content": hit.get("body", ""),
"source": "DuckDuckGo"
})
if len(results) >= top_k:
break
except Exception as e:
logger.warning(f"DuckDuckGo search failed: {e}")
# Don't re-raise - just return empty results to allow fallbacks to work
return results
# Additional fallback search alternative
def simple_google_search(query: str, top_k: int = 3) -> List[Dict]:
"""Simplified Google search as a fallback when other methods fail."""
try:
# Encode the query
import urllib.parse
import bs4
encoded_query = urllib.parse.quote(query)
url = f"https://www.google.com/search?q={encoded_query}"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Referer": "https://www.google.com/",
"Connection": "keep-alive",
}
response = requests.get(url, headers=headers, timeout=20)
response.raise_for_status()
soup = bs4.BeautifulSoup(response.text, "html.parser")
results = []
# Extract search results
for result in soup.select("div.g")[:top_k]:
title_elem = result.select_one("h3")
link_elem = result.select_one("a")
snippet_elem = result.select_one("div.VwiC3b")
if title_elem and link_elem and snippet_elem and "href" in link_elem.attrs:
href = link_elem["href"]
if href.startswith("/url?q="):
href = href.split("/url?q=")[1].split("&")[0]
if href.startswith("http"):
results.append({
"title": title_elem.get_text(),
"url": href,
"content": snippet_elem.get_text(),
"source": "Google"
})
return results
except Exception as e:
logger.warning(f"Simple Google search failed: {e}")
return []
def hybrid_search(query: str, top_k: int = 3) -> List[Dict]:
"""Combine multiple search sources with fallbacks."""
# Try primary search methods first
results = []
# Start with Tavily if API key is available
if TAVILY_API_KEY and TAVILY_API_KEY != "default_key_or_placeholder":
try:
tavily_results = tavily_search(query, top_k)
results.extend(tavily_results)
logger.info(f"Retrieved {len(tavily_results)} results from Tavily")
except Exception as e:
logger.warning(f"Tavily search failed: {e}")
# If we don't have enough results, try DuckDuckGo
if len(results) < top_k:
try:
ddg_results = duckduckgo_search(query, top_k - len(results))
results.extend(ddg_results)
logger.info(f"Retrieved {len(ddg_results)} results from DuckDuckGo")
except Exception as e:
logger.warning(f"DuckDuckGo search failed: {e}")
# If we still don't have enough results, try Google
if len(results) < top_k:
try:
google_results = simple_google_search(query, top_k - len(results))
results.extend(google_results)
logger.info(f"Retrieved {len(google_results)} results from Google")
except Exception as e:
logger.warning(f"Google search failed: {e}")
# If all search methods failed, return a dummy result
if not results:
results.append({
"title": "Search Failed",
"url": "",
"content": f"Sorry, I couldn't find results for '{query}'. Please try refining your search terms or check your internet connection.",
"source": "No results"
})
return results[:top_k] # Ensure we only return top_k results
def format_search_docs(search_docs: List[Dict]) -> Dict[str, str]:
"""
Turn a list of {source, page, content} dicts into one big
string with <Document ...>…</Document> entries separated by `---`.
"""
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc["source"]}" page="{doc.get("page", "")}"/>\n'
f'{doc.get("content", "")}\n'
f'</Document>'
for doc in search_docs
]
)
return {"web_results": formatted_search_docs}
@tool(parse_docstring=True)
def web_search(query: str, top_k: int = 3) -> Dict[str, str]:
"""
Perform a hybrid web search combining multiple search engines with robust fallbacks.
Args:
query: The search query string to look up.
top_k: The maximum number of search results to return (default is 3).
Returns:
A dictionary mapping result indices to XML-like <Document> blocks, each containing:
- source: The URL of the webpage.
- page: Placeholder for page identifier (empty string by default).
- content: The first 200 words of the page text, cleaned of HTML tags.
"""
try:
# Use our robust hybrid search to get initial results
search_results = hybrid_search(query, top_k)
results = []
# Process each search result to get better content
for hit in search_results:
url = hit.get("url")
if not url:
continue
# Start with the snippet from search
content = hit.get("content", "")
title = hit.get("title", "")
# Try to scrape additional content if possible
try:
# Use a random user agent to avoid blocking
headers = {
"User-Agent": random.choice([
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Safari/605.1.15",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36 Edg/97.0.1072.62"
]),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Referer": "https://www.google.com/",
"DNT": "1",
"Connection": "keep-alive"
}
# Higher timeout for better reliability
resp = requests.get(url, timeout=15, headers=headers)
# Only process if successful
if resp.status_code == 200:
soup = BeautifulSoup(resp.text, "html.parser")
# Try to find main content
main_content = soup.find('main') or soup.find('article') or soup.find('div', class_='content')
# If we found main content, use it
if main_content:
extracted_text = main_content.get_text(separator=" ", strip=True)
# Take first 200 words
content = " ".join(extracted_text.split()[:200])
else:
# Otherwise use all text
all_text = soup.get_text(separator=" ", strip=True)
content = " ".join(all_text.split()[:200])
# Use content from page only if it's substantial
if len(content) < 50:
content = hit.get("content", "")[:200]
# Random delay between 0.5-1.5 seconds to avoid rate limits
time.sleep(0.5 + random.random())
except requests.exceptions.HTTPError as e:
logger.warning(f"HTTP error when scraping {url}: {e}")
# Keep the search snippet as a fallback
except requests.exceptions.RequestException as e:
logger.warning(f"Request error when scraping {url}: {e}")
# Keep the search snippet as a fallback
except Exception as e:
logger.warning(f"Unexpected error when scraping {url}: {e}")
# Keep the search snippet as a fallback
# Filter out inappropriate content
if any(f in content.lower() for f in _forbidden):
continue
# Add to results
results.append({
"source": url,
"page": "",
"content": content
})
# Return formatted search docs
return format_search_docs(results[:top_k])
except Exception as e:
logger.error(f"Web search failed: {e}")
# Return a helpful error message
return format_search_docs([{
"source": "Error",
"page": "",
"content": f"Search failed with error: {e}. Please try again with different search terms."
}])
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
# ─────────────────────────────────────────────── Tool for File System ───────────────────────────────────────────────────────────────────
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
@tool(parse_docstring=True)
def download_file(url: str, dest_path: str) -> str:
"""
Download a file from a given URL and save it locally.
Args:
url: The direct URL of the file to download.
dest_path: The local path to save the downloaded file.
Returns:
The destination path where the file was saved.
"""
r = requests.get(url, stream=True)
r.raise_for_status()
with open(dest_path, 'wb') as f:
for chunk in r.iter_content(8192):
f.write(chunk)
return dest_path
@tool(parse_docstring=True)
def process_excel_to_text(file_path: str) -> str:
"""
Convert an Excel file into CSV-formatted text.
Args:
file_path: Path to the Excel (.xlsx) file.
Returns:
A string of CSV-formatted content extracted from the Excel file.
"""
try:
# Check if file exists
import os
if not os.path.exists(file_path):
return f"Error: Excel file '{file_path}' does not exist."
# Try different engines
engines = ['openpyxl', 'xlrd', None]
for engine in engines:
try:
# For engine=None, pandas will try to auto-detect
if engine:
df = pd.read_excel(file_path, engine=engine)
else:
df = pd.read_excel(file_path)
return df.to_csv(index=False)
except Exception as e:
print(f"Excel engine {engine} failed: {e}")
last_error = e
continue
# If we got here, all engines failed
return f"Error processing Excel file: {str(last_error)}"
except Exception as e:
return f"Error with Excel file: {str(e)}"
@tool(parse_docstring=True)
def read_text_from_pdf(file_path: str, question: str = None) -> str:
"""
Extract text from a PDF file, chunking large documents if needed.
Args:
file_path: Path to the PDF file.
question: Optional question to help retrieve relevant parts of long documents.
Returns:
The extracted text content, potentially chunked if the document is large.
"""
try:
# Check if file exists
import os
if not os.path.exists(file_path):
return f"Error: PDF file '{file_path}' does not exist."
reader = PdfReader(file_path)
full_text = "\n".join([page.extract_text() or "" for page in reader.pages])
# If a question is provided, use retrieval to get relevant parts
if question and len(full_text) > 5000: # Only chunk if text is large
return process_large_document(full_text, question)
return full_text
except Exception as e:
return f"Error reading PDF: {str(e)}"
@tool(parse_docstring=True)
def read_text_from_docx(file_path: str, question: str = None) -> str:
"""
Extract text from a DOCX (Word) document, chunking large documents if needed.
Args:
file_path: Path to the DOCX file.
question: Optional question to help retrieve relevant parts of long documents.
Returns:
The extracted text, potentially chunked if the document is large.
"""
try:
# Check if file exists
import os
if not os.path.exists(file_path):
return f"Error: File '{file_path}' does not exist."
try:
doc = docx.Document(file_path)
full_text = "\n".join([para.text for para in doc.paragraphs])
except Exception as docx_err:
# Handle "Package not found" error specifically
if "Package not found" in str(docx_err):
# Try to read raw text if possible
try:
import zipfile
from xml.etree.ElementTree import XML
WORD_NAMESPACE = '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}'
PARA = WORD_NAMESPACE + 'p'
TEXT = WORD_NAMESPACE + 't'
with zipfile.ZipFile(file_path) as docx_file:
with docx_file.open('word/document.xml') as document:
tree = XML(document.read())
paragraphs = []
for paragraph in tree.iter(PARA):
texts = [node.text for node in paragraph.iter(TEXT) if node.text]
if texts:
paragraphs.append(''.join(texts))
full_text = '\n'.join(paragraphs)
except Exception as e:
return f"Error reading DOCX file: {str(e)}"
else:
return f"Error reading DOCX file: {str(docx_err)}"
# If a question is provided, use retrieval to get relevant parts
if question and len(full_text) > 5000: # Only chunk if text is large
return process_large_document(full_text, question)
return full_text
except Exception as e:
return f"Error reading DOCX file: {str(e)}"
@tool(parse_docstring=True)
def transcribe_audio(file_path: str) -> str:
"""
Transcribe speech from a local audio file to text.
Args:
file_path: Path to the audio file.
Returns:
Transcribed text using Google Web Speech API.
"""
try:
# Check if file exists
import os
if not os.path.exists(file_path):
return f"Error: Audio file '{file_path}' does not exist."
# For non-WAV files, convert to WAV first
if not file_path.lower().endswith('.wav'):
try:
from pydub import AudioSegment
temp_wav = os.path.splitext(file_path)[0] + "_temp.wav"
audio = AudioSegment.from_file(file_path)
audio.export(temp_wav, format="wav")
file_path = temp_wav
except Exception as e:
return f"Failed to convert audio to WAV format: {str(e)}"
recognizer = sr.Recognizer()
with sr.AudioFile(file_path) as src:
audio = recognizer.record(src)
return recognizer.recognize_google(audio)
except Exception as e:
if "Audio file could not be read" in str(e):
return f"Error: Audio format not supported. Try converting to WAV, MP3, OGG, or FLAC."
return f"Error transcribing audio: {str(e)}"
@tool(parse_docstring=True)
def youtube_audio_processing(youtube_url: str) -> str:
"""
Download and transcribe audio from a YouTube video.
Args:
youtube_url: URL of the YouTube video.
Returns:
Transcription text extracted from the video's audio.
"""
yt = YouTube(youtube_url)
audio_stream = yt.streams.filter(only_audio=True).first()
out_file = audio_stream.download(output_path='.', filename='yt_audio')
wav_path = 'yt_audio.wav'
AudioSegment.from_file(out_file).export(wav_path, format='wav')
return transcribe_audio(wav_path)
@tool(parse_docstring=True)
def extract_article_text(url: str, question: str = None) -> str:
"""
Download and extract the main article content from a webpage, chunking large articles if needed.
Args:
url: The URL of the article to extract.
question: Optional question to help retrieve relevant parts of long articles.
Returns:
The article's textual content, potentially chunked if large.
"""
try:
art = Article(url)
art.download()
art.parse()
full_text = art.text
# If a question is provided, use retrieval to get relevant parts
if question and len(full_text) > 5000: # Only chunk if text is large
return process_large_document(full_text, question)
return full_text
except Exception as e:
return f"Error extracting article: {str(e)}"
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
# ───────────────────────────────────────────────────────────── Tool for ArXiv ────────────────────────────────────────────────────────────
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
@tool(parse_docstring=True)
def arvix_search(query: str) -> Dict[str, str]:
"""
Search for academic papers on ArXiv.
Args:
query: The search term to look for in ArXiv.
Returns:
A dictionary of up to 3 relevant paper entries in JSON format.
"""
papers = ArxivLoader(query=query, load_max_docs=3).load()
results = []
for doc in papers:
try:
# Handle different metadata formats that might be returned
source = doc.metadata.get("source", "ArXiv")
doc_id = doc.metadata.get("id", doc.metadata.get("entry_id", ""))
result = {
"source": source,
"id": doc_id,
"summary": doc.page_content[:1000] if hasattr(doc, "page_content") else str(doc)[:1000],
}
results.append(result)
except Exception as e:
# Add error information as a fallback
results.append({
"source": "ArXiv Error",
"id": "error",
"summary": f"Error processing paper: {str(e)}"
})
return {"arvix_results": json.dumps(results)}
@tool(parse_docstring=True)
def answer_youtube_video_question(
youtube_url: str,
question: str,
chunk_size_seconds: int = 30
) -> str:
"""
Answer a question based on a YouTube video's transcript.
Args:
youtube_url: URL of the YouTube video.
question: The question to be answered using video content.
chunk_size_seconds: Duration of each transcript chunk.
Returns:
The answer to the question generated from the video transcript.
"""
loader = YoutubeLoader.from_youtube_url(
youtube_url,
add_video_info=True,
transcript_format=TranscriptFormat.CHUNKS,
chunk_size_seconds=chunk_size_seconds,
)
documents = loader.load()
embeddings = HuggingFaceEmbeddings(model_name='sentence-transformers/all-mpnet-base-v2')
vectorstore = FAISS.from_documents(documents, embeddings)
llm = RetryingChatGroq(model="deepseek-r1-distill-llama-70b", streaming=False)
qa_chain = RetrievalQA.from_chain_type(llm=llm, retriever=vectorstore.as_retriever())
return qa_chain.run(question)
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
# ───────────────────────────────────────────────────────────── Tool for Python REPL tool ────────────────────────────────────────────────
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
python_repl = PythonREPLTool()
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
# ───────────────────────────────────────────────────────────── Tool for Wiki ────────────────────────────────────────────────────────────
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
@tool(parse_docstring=True)
def wiki_search(query: str) -> str:
"""
Search Wikipedia for information on a given topic.
Args:
query: The search term for Wikipedia.
Returns:
A JSON string with up to 3 summary results.
"""
# load up to top_k pages
pages = WikipediaLoader(query=query, load_max_docs=3).load()
results: List[Dict] = []
for doc in pages:
results.append({
"source": doc.metadata["source"],
"page": doc.metadata.get("page", ""),
"content": doc.page_content[:1000], # truncate if you like
})
return {"wiki_results": format_search_docs(results)}
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
# ───────────────────────────────────── Tool for Image (understading, captioning & classification) ─────────────────────────────────────────
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
def _load_image(img_path: str, resize_to=(512, 512)) -> Image.Image:
"""
Load, verify, convert, and resize an image.
Raises ValueError on failure.
"""
if not img_path:
raise ValueError("No image path provided.")
try:
with Image.open(img_path) as img:
img.verify()
img = Image.open(img_path).convert("RGB")
img = img.resize(resize_to)
return img
except UnidentifiedImageError:
raise ValueError(f"File at {img_path} is not a valid image.")
except Exception as e:
raise ValueError(f"Failed to load image at {img_path}: {e}")
def _encode_image_to_base64(img_path: str) -> str:
"""
Load an image, save optimized PNG into memory, and base64‑encode it.
"""
img = _load_image(img_path)
buffer = BytesIO()
img.save(buffer, format="PNG", optimize=True)
return base64.b64encode(buffer.getvalue()).decode("utf-8")
@tool
def image_processing(prompt: str, img_path: str) -> str:
"""Process an image using a vision LLM, with OCR fallback.
Args:
prompt: Instruction or question related to the image.
img_path: Path to the image file.
Returns:
The model's response or fallback OCR result.
"""
try:
import os
# Check if file exists
if not os.path.exists(img_path):
return f"Error: Image file '{img_path}' does not exist."
try:
b64 = _encode_image_to_base64(img_path)
# Build a single markdown string with inline base64 image
md = f"{prompt}\n\n![](data:image/png;base64,{b64})"
message = HumanMessage(content=md)
# Use RetryingChatGroq with Llama 4 Maverick for vision
llm = RetryingChatGroq(model="meta-llama/llama-4-maverick-17b-128e-instruct", streaming=False, temperature=0)
try:
resp = llm.invoke([message])
if hasattr(resp, 'content'):
return resp.content.strip()
elif isinstance(resp, str):
return resp.strip()
else:
# Handle dictionary or other response types
return str(resp)
except Exception as invoke_err:
print(f"[LLM invoke error] {invoke_err}")
# Fall back to OCR
raise ValueError("LLM invocation failed")
except Exception as llama_err:
print(f"[LLM vision failed] {llama_err}")
try:
img = _load_image(img_path)
return pytesseract.image_to_string(img).strip()
except Exception as ocr_err:
print(f"[OCR fallback failed] {ocr_err}")
return "Unable to process the image. Please check the file and try again."
except Exception as e:
# Catch any other errors
print(f"[image_processing error] {e}")
return f"Error processing image: {str(e)}"
python_repl_tool = PythonREPLTool()
@tool
def echo(text: str) -> str:
"""Echo back the input text.
Args:
text: The string to be echoed.
Returns:
The same text that was provided as input.
"""
return text
# ─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
# ─────────────────────────────────────────────── Langgraph Agent ───────────────────────────────────────────────────────────────────────
# ─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
# Build graph function
from langchain_core.tools import tool
from langchain.chat_models import ChatOpenAI
from langgraph.prebuilt.chat_agent_executor import create_react_agent, AgentState
from langchain.chat_models import init_chat_model
def build_graph(provider: str = "groq"):
"""Construct and compile the multi‑agent GAIA workflow StateGraph.
This graph wires together three React‑style agents into a streamlined pipeline:
PerceptionAgent β†’ ActionAgent β†’ EvaluationAgent (with appropriate entry/exit points)
The agents have the following responsibilities:
- PerceptionAgent: Handles web searches, Wikipedia, ArXiv, and image processing
- ActionAgent: Performs calculations, file operations, and code analysis
- EvaluationAgent: Reviews results and ensures the final answer is properly formatted
Args:
provider: The name of the LLM provider. Must be "groq".
Returns:
CompiledGraph: A compiled LangGraph state machine ready for invocation.
Raises:
ValueError: If `provider` is anything other than "groq".
"""
try:
if provider != "groq":
raise ValueError("Invalid provider. Expected 'groq'.")
# Initialize LLM
try:
logger.info("Initializing LLM with model: deepseek-r1-distill-llama-70b")
api_key = os.getenv("GROQ_API_KEY")
if not api_key or api_key == "default_key_or_placeholder":
logger.error("GROQ_API_KEY is not set or is using placeholder value")
raise ValueError("GROQ_API_KEY environment variable is not set properly. Please set a valid API key.")
llm = RetryingChatGroq(model="deepseek-r1-distill-llama-70b", temperature=0)
logger.info("LLM initialized successfully")
except Exception as e:
logger.error(f"Error initializing LLM: {str(e)}")
raise
# General system message for agents
sys_msg = SystemMessage(content="""
You are a general AI assistant. I will ask you a question. Report your thoughts, and finish your answer with the following template:
FINAL ANSWER: [YOUR FINAL ANSWER]
YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma-separated list of numbers and/or strings.
If you are asked for a number, don't use commas or units (e.g., $, %, kg) unless specified otherwise.
If you are asked for a string, don't use articles (a, an, the), and don't use abbreviations (e.g., for states).
If you are asked for a comma-separated list, apply the above rules to each element in the list.
""".strip())
# Special system message for the evaluation agent with stricter formatting requirements
eval_sys_msg = SystemMessage(content="""
You are a specialized evaluation agent. Your job is to review the work done by other agents
and provide a final, properly formatted answer.
IMPORTANT: You MUST ALWAYS format your answer using this exact template:
FINAL ANSWER: [concise answer]
Rules for formatting the answer:
1. The answer must be extremely concise - use as few words as possible
2. For numeric answers, provide only the number without units unless units are specifically requested
3. For text answers, avoid articles (a, an, the) and unnecessary words
4. For list answers, use a comma-separated format
5. NEVER explain your reasoning in the FINAL ANSWER section
6. NEVER skip the "FINAL ANSWER:" prefix
Example good answers:
FINAL ANSWER: 42
FINAL ANSWER: Paris
FINAL ANSWER: 1912, 1945, 1989
Example bad answers (don't do these):
- Based on my analysis, the answer is 42.
- I think it's Paris because that's the capital of France.
- The years were 1912, 1945, and 1989.
Remember: ALWAYS include "FINAL ANSWER:" followed by the most concise answer possible.
""".strip())
# Define tools for each agent
logger.info("Setting up agent tools")
perception_tools = [web_search, wiki_search, news_article_search, arvix_search, image_processing, echo]
execution_tools = [
multiply, add, subtract, divide, modulus,
download_file, process_excel_to_text,
read_text_from_pdf, read_text_from_docx,
transcribe_audio, youtube_audio_processing,
extract_article_text, answer_youtube_video_question,
python_repl_tool, analyze_code, read_code_file, analyze_python_function
]
# ─────────────── Agent Creation ───────────────
logger.info("Creating agents")
try:
# Create agents with proper error handling
PerceptionAgent = create_react_agent(
model=llm,
tools=perception_tools,
prompt=sys_msg,
state_schema=AgentState,
name="PerceptionAgent"
)
logger.info("Created PerceptionAgent successfully")
# Combined Planning and Execution agent for better efficiency
ActionAgent = create_react_agent(
model=llm,
tools=execution_tools, # Has access to all execution tools
prompt=sys_msg,
state_schema=AgentState,
name="ActionAgent"
)
logger.info("Created ActionAgent successfully")
# Evaluation agent with stricter prompt
EvaluationAgent = create_react_agent(
model=llm,
tools=[], # No tools needed for evaluation
prompt=eval_sys_msg, # Use the specialized evaluation prompt
state_schema=AgentState,
name="EvaluationAgent"
)
logger.info("Created EvaluationAgent successfully")
except Exception as e:
logger.error(f"Error creating agent: {str(e)}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
raise
# Build the StateGraph
logger.info("Building StateGraph")
try:
builder = StateGraph(AgentState)
# Add agent nodes first
builder.add_node("PerceptionAgent", PerceptionAgent)
builder.add_node("ActionAgent", ActionAgent)
builder.add_node("EvaluationAgent", EvaluationAgent)
# Define the flow with a starting edge
builder.set_entry_point("PerceptionAgent")
# Add the edges for the simpler linear flow
builder.add_edge("PerceptionAgent", "ActionAgent")
builder.add_edge("ActionAgent", "EvaluationAgent")
# Set EvaluationAgent as the end node
builder.set_finish_point("EvaluationAgent")
logger.info("Compiling StateGraph")
return builder.compile()
except Exception as e:
logger.error(f"Error building graph: {str(e)}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
raise
except Exception as e:
logger.error(f"Overall error in build_graph: {str(e)}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
raise
def get_final_answer(text):
"""Extract just the FINAL ANSWER from the model's response.
Args:
text: The full text response from the LLM
Returns:
str: The extracted answer without the "FINAL ANSWER:" prefix
"""
# Log the raw text for debugging if needed
logger.debug(f"Extracting answer from: {text[:200]}...")
if not text:
logger.warning("Empty response received")
return "No answer provided."
# Method 1: Look for "FINAL ANSWER:" with most comprehensive pattern matching
pattern = r'(?:^|\n)FINAL ANSWER:\s*(.*?)(?:\n\s*$|$)'
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
if match:
# Return just the answer part, cleaned up
logger.debug("Found answer using pattern 1")
return match.group(1).strip()
# Method 2: Try looking for variations on the final answer format
for variant in ["FINAL ANSWER:", "FINAL_ANSWER:", "Final Answer:", "Answer:"]:
lines = text.split('\n')
for i, line in enumerate(reversed(lines)):
if variant in line:
# Extract everything after the variant text
logger.debug(f"Found answer using variant: {variant}")
answer = line[line.find(variant) + len(variant):].strip()
if answer:
return answer
# If the answer is on the next line, return that
if i > 0:
next_line = lines[len(lines) - i]
if next_line.strip():
return next_line.strip()
# Method 3: Look for phrases that suggest an answer
for phrase in ["The answer is", "The result is", "We get", "Therefore,", "In conclusion,"]:
phrase_pos = text.find(phrase)
if phrase_pos != -1:
# Try to extract everything after the phrase until the end of the sentence
sentence_end = text.find(".", phrase_pos)
if sentence_end != -1:
logger.debug(f"Found answer using phrase: {phrase}")
return text[phrase_pos + len(phrase):sentence_end].strip()
# Method 4: Fall back to taking the last paragraph with actual content
paragraphs = text.strip().split('\n\n')
for para in reversed(paragraphs):
para = para.strip()
if para and not para.startswith("I ") and not para.lower().startswith("to "):
logger.debug("Using last meaningful paragraph")
# If paragraph is very long, try to extract a concise answer
if len(para) > 100:
sentences = re.split(r'[.!?]', para)
for sentence in reversed(sentences):
sent = sentence.strip()
if sent and len(sent) > 5 and not sent.startswith("I "):
return sent
return para
# Method 5: Last resort - just return the last line with content
lines = text.strip().split('\n')
for line in reversed(lines):
line = line.strip()
if line and len(line) > 3:
logger.debug("Using last line with content")
return line
# If everything fails, warn and return the truncated response
logger.warning("Could not find a properly formatted answer")
return text[:100] + "..." if len(text) > 100 else text
# test
if __name__ == "__main__":
question = "When was a picture of St. Thomas Aquinas first added to the Wikipedia page on the Principle of double effect?"
# Build the graph
graph = build_graph(provider="groq")
# Run the graph
messages = [HumanMessage(content=question)]
messages = graph.invoke({"messages": messages})
for m in messages["messages"]:
m.pretty_print()
# ─────────────────────────────────────────────── Tool for Code Analysis ───────────────────────────────────────────────────────────────
@tool
def analyze_code(code_string: str) -> str:
"""Analyze a string of code to understand its structure, functionality, and potential issues.
Args:
code_string: The code to analyze as a string.
Returns:
A structured analysis of the code including functions, classes, and key operations.
"""
try:
import ast
# Try to parse with Python's AST module
try:
parsed = ast.parse(code_string)
# Extract functions and classes
functions = [node.name for node in ast.walk(parsed) if isinstance(node, ast.FunctionDef)]
classes = [node.name for node in ast.walk(parsed) if isinstance(node, ast.ClassDef)]
imports = [node.names[0].name for node in ast.walk(parsed) if isinstance(node, ast.Import)]
imports.extend([f"{node.module}.{name.name}" if node.module else name.name
for node in ast.walk(parsed) if isinstance(node, ast.ImportFrom)
for name in node.names])
# Count various node types for complexity assessment
num_loops = len([node for node in ast.walk(parsed)
if isinstance(node, (ast.For, ast.While))])
num_conditionals = len([node for node in ast.walk(parsed)
if isinstance(node, (ast.If, ast.IfExp))])
analysis = {
"language": "Python",
"functions": functions,
"classes": classes,
"imports": imports,
"complexity": {
"functions": len(functions),
"classes": len(classes),
"loops": num_loops,
"conditionals": num_conditionals
}
}
return str(analysis)
except SyntaxError:
# If not valid Python, try some simple pattern matching
if "{" in code_string and "}" in code_string:
if "function" in code_string or "=>" in code_string:
language = "JavaScript/TypeScript"
elif "func" in code_string or "struct" in code_string:
language = "Go or Rust"
elif "public" in code_string or "private" in code_string or "class" in code_string:
language = "Java/C#/C++"
else:
language = "Unknown C-like language"
elif "<" in code_string and ">" in code_string and ("/>" in code_string or "</"):
language = "HTML/XML/JSX"
else:
language = "Unknown"
return f"Non-Python code detected ({language}). Basic code structure analysis not available."
except Exception as e:
return f"Error analyzing code: {str(e)}"
@tool
def read_code_file(file_path: str) -> str:
"""Read a code file and return its contents with proper syntax detection.
Args:
file_path: Path to the code file.
Returns:
The file contents and detected language.
"""
try:
# Check if file exists
import os
if not os.path.exists(file_path):
return f"Error: File '{file_path}' does not exist."
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Try to detect language from extension
ext = os.path.splitext(file_path)[1].lower()
language_map = {
'.py': 'Python',
'.js': 'JavaScript',
'.ts': 'TypeScript',
'.html': 'HTML',
'.css': 'CSS',
'.java': 'Java',
'.c': 'C',
'.cpp': 'C++',
'.cs': 'C#',
'.go': 'Go',
'.rs': 'Rust',
'.php': 'PHP',
'.rb': 'Ruby',
'.sh': 'Shell',
'.bat': 'Batch',
'.ps1': 'PowerShell',
'.sql': 'SQL',
'.json': 'JSON',
'.xml': 'XML',
'.yaml': 'YAML',
'.yml': 'YAML',
}
language = language_map.get(ext, 'Unknown')
return f"File content ({language}):\n\n{content}"
except Exception as e:
return f"Error reading file: {str(e)}"
@tool
def analyze_python_function(function_name: str, code_string: str) -> str:
"""Extract and analyze a specific function from Python code.
Args:
function_name: The name of the function to analyze.
code_string: The complete code containing the function.
Returns:
Analysis of the function including parameters, return type, and docstring.
"""
try:
import ast
import inspect
from types import CodeType, FunctionType
# Parse the code string
parsed = ast.parse(code_string)
# Find the function definition
function_def = None
for node in ast.walk(parsed):
if isinstance(node, ast.FunctionDef) and node.name == function_name:
function_def = node
break
if not function_def:
return f"Function '{function_name}' not found in the provided code."
# Extract parameters
params = []
for arg in function_def.args.args:
param_name = arg.arg
# Get annotation if it exists
if arg.annotation:
if isinstance(arg.annotation, ast.Name):
param_type = arg.annotation.id
elif isinstance(arg.annotation, ast.Attribute):
param_type = f"{arg.annotation.value.id}.{arg.annotation.attr}"
else:
param_type = "complex_type"
params.append(f"{param_name}: {param_type}")
else:
params.append(param_name)
# Extract return type if it exists
return_type = None
if function_def.returns:
if isinstance(function_def.returns, ast.Name):
return_type = function_def.returns.id
elif isinstance(function_def.returns, ast.Attribute):
return_type = f"{function_def.returns.value.id}.{function_def.returns.attr}"
else:
return_type = "complex_return_type"
# Extract docstring
docstring = ast.get_docstring(function_def)
# Create a summary
summary = {
"function_name": function_name,
"parameters": params,
"return_type": return_type,
"docstring": docstring,
"decorators": [d.id if isinstance(d, ast.Name) else "complex_decorator" for d in function_def.decorator_list],
"line_count": len(function_def.body)
}
# Create a more explicit string representation that ensures key terms are included
result = f"Function '{function_name}' analysis:\n"
result += f"- Parameters: {', '.join(params)}\n"
result += f"- Return type: {return_type or 'None specified'}\n"
result += f"- Docstring: {docstring or 'None'}\n"
result += f"- Line count: {len(function_def.body)}"
return result
except Exception as e:
return f"Error analyzing function: {str(e)}"
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
# ─────────────────────────────────────────────── Tool for News Article Retrieval ──────────────────────────────────────────────────────────────────────
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
@tool
def news_article_search(query: str, top_k: int = 3) -> Dict[str, str]:
"""Search for and retrieve news articles with robust error handling for news sites.
Args:
query: The news topic or keywords to search for.
top_k: Maximum number of articles to retrieve.
Returns:
A dictionary with search results formatted as XML-like document entries.
"""
# First, get URLs from DuckDuckGo with "news" focus
results = []
news_sources = [
"bbc.com", "reuters.com", "apnews.com", "nasa.gov",
"space.com", "universetoday.com", "nature.com", "science.org",
"scientificamerican.com", "nytimes.com", "theguardian.com"
]
# Find news from reliable sources
try:
with DDGS() as ddgs:
search_query = f"{query} site:{' OR site:'.join(news_sources)}"
for hit in ddgs.text(search_query, safesearch="On", max_results=top_k*2):
url = hit.get("href") or hit.get("url", "")
if not url:
continue
# Add the search snippet first as a fallback
result = {
"source": url,
"page": "",
"content": hit.get("body", "")[:250],
"title": hit.get("title", "")
}
# Try to get better content via a more robust method
try:
headers = {
"User-Agent": random.choice([
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Safari/605.1.15",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36"
]),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Referer": "https://www.google.com/",
"DNT": "1",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1"
}
# Add a short delay between requests
time.sleep(1 + random.random())
# Try to use newspaper3k for more reliable article extraction
from newspaper import Article
article = Article(url)
article.download()
article.parse()
# If we got meaningful content, update the result
if article.text and len(article.text) > 100:
# Get a summary - first paragraph + some highlights
paragraphs = article.text.split('\n\n')
first_para = paragraphs[0] if paragraphs else ""
summary = first_para[:300]
if len(paragraphs) > 1:
summary += "... " + paragraphs[1][:200]
result["content"] = summary
if article.title:
result["title"] = article.title
except Exception as article_err:
logger.warning(f"Article extraction failed for {url}: {article_err}")
# Fallback to simple requests-based extraction
try:
resp = requests.get(url, timeout=12, headers=headers)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
# Try to get main content
main_content = soup.find('main') or soup.find('article') or soup.find('div', class_='content')
if main_content:
content = " ".join(main_content.get_text(separator=" ", strip=True).split()[:250])
result["content"] = content
except Exception as req_err:
logger.warning(f"Fallback extraction failed for {url}: {req_err}")
# Keep the original snippet as fallback
results.append(result)
if len(results) >= top_k:
break
except Exception as e:
logger.error(f"News search failed: {e}")
return format_search_docs([{
"source": "Error",
"page": "",
"content": f"Failed to retrieve news articles for '{query}': {str(e)}"
}])
if not results:
# Fallback to regular web search
logger.info(f"No news results found, falling back to web_search for {query}")
return web_search(query, top_k)
return format_search_docs(results[:top_k])
# ───────────────────────────────────────────────────────────── Document Chunking Utilities ──────────────────────────────────────────────────────────
def chunk_document(text: str, chunk_size: int = 1000, overlap: int = 100) -> List[str]:
"""
Split a large document into smaller chunks with overlap to maintain context across chunks.
Args:
text: The document text to split into chunks
chunk_size: Maximum size of each chunk in characters
overlap: Number of characters to overlap between chunks
Returns:
List of text chunks
"""
# If text is smaller than chunk_size, return it as is
if len(text) <= chunk_size:
return [text]
chunks = []
start = 0
while start < len(text):
# Get chunk with overlap
end = min(start + chunk_size, len(text))
# Try to find sentence boundary for cleaner breaks
if end < len(text):
# Look for sentence endings: period, question mark, or exclamation followed by space
for sentence_end in ['. ', '? ', '! ']:
last_period = text[start:end].rfind(sentence_end)
if last_period != -1:
end = start + last_period + 2 # +2 to include the period and space
break
# Add chunk to list
chunks.append(text[start:end])
# Move start position, accounting for overlap
start = end - overlap if end < len(text) else len(text)
return chunks
# Document processing utility that uses chunking
def process_large_document(text: str, question: str, llm=None) -> str:
"""
Process a large document by chunking it and using retrieval to find relevant parts.
Args:
text: The document text to process
question: The question being asked about the document
llm: Optional language model to use (defaults to agent's LLM)
Returns:
Summarized answer based on relevant chunks
"""
if not llm:
llm = RetryingChatGroq(model="deepseek-r1-distill-llama-70b", streaming=False, temperature=0)
# Split document into chunks
chunks = chunk_document(text)
# If document is small enough, don't bother with retrieval
if len(chunks) <= 1:
return text
# For larger documents, create embeddings to find relevant chunks
try:
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.schema import Document
# Create documents with chunk content
documents = [Document(page_content=chunk, metadata={"chunk_id": i}) for i, chunk in enumerate(chunks)]
# Create embeddings and vector store
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2")
vectorstore = FAISS.from_documents(documents, embeddings)
# Get most relevant chunks
relevant_chunks = vectorstore.similarity_search(question, k=2) # Get top 2 most relevant chunks
# Join the relevant chunks
relevant_text = "\n\n".join([doc.page_content for doc in relevant_chunks])
# Option 1: Return relevant chunks directly
return relevant_text
# Option 2: Summarize with LLM (commented out for now)
# prompt = f"Using only the following information, answer the question: '{question}'\n\nInformation:\n{relevant_text}"
# response = llm.invoke([HumanMessage(content=prompt)])
# return response.content
except Exception as e:
# Fall back to first chunk if retrieval fails
logger.warning(f"Retrieval failed: {e}. Falling back to first chunk.")
return chunks[0]