#─── Basic imports ───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── import os import math import sqlite3 import fitz # PyMuPDF for PDF parsing import re from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() # This line ensures .env variables are loaded from langgraph.graph import START, StateGraph, MessagesState, END from langgraph.prebuilt import tools_condition from langgraph.prebuilt import ToolNode from langgraph.constants import START from langchain_core.tools import tool from langchain.schema import SystemMessage #from langchain.chat_models import init_chat_model #from langgraph.prebuilt import create_react_agent from langchain.embeddings import HuggingFaceEmbeddings #from langchain.vectorstores import Pinecone from langchain.tools.retriever import create_retriever_tool #import pinecone #from pinecone import Pinecone as PineconeClient, ServerlessSpec #from pinecone import Index # the blocking‐call client constructor #from pinecone import Pinecone as PineconeClient, ServerlessSpec from langchain.embeddings import HuggingFaceEmbeddings from langchain_community.vectorstores.pinecone import Pinecone as LC_Pinecone # ─── Langchain Frameworks ───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── #from langchain.tools import Tool from langchain.chat_models import ChatOpenAI from langchain_groq import ChatGroq from langchain_mistralai import ChatMistralAI from langchain.agents import initialize_agent, AgentType from langchain.schema import Document from langchain.chains import RetrievalQA from langchain.embeddings import OpenAIEmbeddings from langchain_community.embeddings import HuggingFaceEmbeddings from langchain.vectorstores import FAISS from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.prompts import PromptTemplate from langchain_community.document_loaders import TextLoader, PyMuPDFLoader from langchain_community.document_loaders.wikipedia import WikipediaLoader from langchain_community.document_loaders.arxiv import ArxivLoader from langchain_experimental.tools.python.tool import PythonREPLTool # ─── Memory ─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── from langchain.agents import initialize_agent, AgentType from langchain.tools import Tool from typing import List, Callable from langchain.schema import BaseMemory, AIMessage, HumanMessage, SystemMessage from langchain.schema import HumanMessage, SystemMessage from langchain.llms.base import LLM from langchain.memory.chat_memory import BaseChatMemory from pydantic import PrivateAttr from langchain_core.messages import get_buffer_string # ─── Image Processing ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────── from PIL import Image import pytesseract from transformers import pipeline from groq import Groq import requests from io import BytesIO from transformers import pipeline, TrOCRProcessor, VisionEncoderDecoderModel import requests import base64 from PIL import UnidentifiedImageError # ─── Browser var ───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── from typing import List, Dict import json from io import BytesIO #from langchain.tools import tool # or langchain_core.tools from playwright.sync_api import sync_playwright from duckduckgo_search import DDGS import time import random import logging from functools import lru_cache, wraps import requests from playwright.sync_api import sync_playwright from bs4 import BeautifulSoup import tenacity from tenacity import retry, stop_after_attempt, wait_exponential # Initialize logger logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') # Additional imports for new functionality import pandas as pd from PyPDF2 import PdfReader import docx import pytesseract import speech_recognition as sr from pydub import AudioSegment from pytube import YouTube from newspaper import Article from langchain.document_loaders import ArxivLoader from langchain_community.document_loaders.youtube import YoutubeLoader, TranscriptFormat from playwright.sync_api import sync_playwright # Attempt to import Playwright for dynamic page rendering try: from playwright.sync_api import sync_playwright _playwright_available = True except ImportError: _playwright_available = False # Define forbidden keywords for basic NSFW filtering _forbidden = ["porn", "sex", "xxx", "nude", "erotic"] # ─── LLM Setup ─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── # Load OpenAI API key from environment (required for LLM and embeddings) # API Keys from .env file os.environ.setdefault("OPENAI_API_KEY", "") # Set your own key or env var os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY", "default_key_or_placeholder") os.environ["MISTRAL_API_KEY"] = os.getenv("MISTRAL_API_KEY", "default_key_or_placeholder") # Tavily API Key TAVILY_API_KEY = os.getenv("TAVILY_API_KEY", "default_key_or_placeholder") _forbidden = ["nsfw", "porn", "sex", "explicit"] _playwright_available = True # set False to disable Playwright # Globals for RAG system vector_store = None rag_chain = None DB_PATH = None # will be set when a .db is uploaded DOC_PATH = None # will be set when a document is uploaded IMG_PATH = None # will be set when an image is uploaded OTH_PATH = None # will be set when an other file is uploaded # ─── LLMS ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── #llm = ChatOpenAI(model_name="gpt-3.5-turbo", streaming=True, temperature=0) from tenacity import retry, stop_after_attempt, wait_exponential # Import the RetryingChatGroq client from retry_groq import RetryingChatGroq # Use the retrying version instead llm = RetryingChatGroq(model="deepseek-r1-distill-llama-70b", streaming=False, temperature=0) #llm = ChatMistralAI(model="mistral-large-latest", streaming=True, temperature=0) # ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────── Tool for multiply ────────────────────────────────────────────────────────────────────── # ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── @tool(parse_docstring=True) def multiply(a: int, b: int) -> int: """ Multiply two numbers. Args: a (int): The first factor. b (int): The second factor. Returns: int: The product of a and b. """ try: # Direct calculation without relying on LangChain handling result = a * b return result except Exception as e: return f"Error in multiplication: {str(e)}" # ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────── Tool for add ────────────────────────────────────────────────────────────────────────── # ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── @tool(parse_docstring=True) def add(a: int, b: int) -> int: """ Add two numbers. Args: a (int): The first factor. b (int): The second factor. Returns: int: The addition of a and b. """ try: # Direct calculation without relying on LangChain handling result = a + b return result except Exception as e: return f"Error in addition: {str(e)}" # ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────── Tool for subtract ────────────────────────────────────────────────────────────────────── # ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── @tool(parse_docstring=True) def subtract(a: int, b: int) -> int: """ Subtract two numbers. Args: a (int): The first factor. b (int): The second factor. Returns: int: The subtraction of a and b. """ try: # Direct calculation without relying on LangChain handling result = a - b return result except Exception as e: return f"Error in subtraction: {str(e)}" # ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────── Tool for divide ────────────────────────────────────────────────────────────────────── # ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── @tool(parse_docstring=True) def divide(a: int, b: int) -> int: """ Divide two numbers. Args: a (int): The numerator. b (int): The denominator. Returns: float: The result of a divided by b. Raises: ValueError: If b is zero. """ try: if b == 0: return "Error: Cannot divide by zero." # Direct calculation without relying on LangChain handling result = a / b return result except Exception as e: return f"Error in division: {str(e)}" # ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────── Tool for modulus ────────────────────────────────────────────────────────────────────── # ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── @tool(parse_docstring=True) def modulus(a: int, b: int) -> int: """ Get the modulus (remainder) of two numbers. Args: a (int): The dividend. b (int): The divisor. Returns: int: The remainder when a is divided by b. """ try: if b == 0: return "Error: Cannot calculate modulus with zero divisor." # Direct calculation without relying on LangChain handling result = a % b return result except Exception as e: return f"Error in modulus calculation: {str(e)}" # ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────── Tool for browsing ────────────────────────────────────────────────────────────────────── # ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── def with_retry(max_attempts: int = 3, backoff_base: int = 2): """ Decorator for retrying a function with exponential backoff on exception. """ def decorator(fn): @wraps(fn) def wrapper(*args, **kwargs): for attempt in range(max_attempts): try: return fn(*args, **kwargs) except Exception as e: wait = backoff_base ** attempt + random.uniform(0, 1) logger.warning(f"{fn.__name__} failed (attempt {attempt+1}/{max_attempts}): {e}") if attempt < max_attempts - 1: time.sleep(wait) logger.error(f"{fn.__name__} failed after {max_attempts} attempts.") return [] return wrapper return decorator @with_retry() @lru_cache(maxsize=128) def tavily_search(query: str, top_k: int = 3) -> List[Dict]: """Call Tavily API and return a list of result dicts.""" if not TAVILY_API_KEY: logger.info("[Tavily] No API key set. Skipping Tavily search.") return [] url = "https://api.tavily.com/search" headers = { "Authorization": f"Bearer {TAVILY_API_KEY}", "Content-Type": "application/json", } payload = {"query": query, "num_results": top_k} resp = requests.post(url, headers=headers, json=payload, timeout=10) resp.raise_for_status() data = resp.json() results = [] for item in data.get("results", []): results.append({ "title": item.get("title", ""), "url": item.get("url", ""), "content": item.get("content", "")[:200], "source": "Tavily" }) return results @with_retry() @lru_cache(maxsize=128) def duckduckgo_search(query: str, top_k: int = 3) -> List[Dict]: """Query DuckDuckGo and return up to top_k raw SERP hits.""" results = [] try: with DDGS(timeout=15) as ddgs: # Increase timeout from default for hit in ddgs.text(query, safesearch="On", max_results=top_k, timeout=15): results.append({ "title": hit.get("title", ""), "url": hit.get("href") or hit.get("url", ""), "content": hit.get("body", ""), "source": "DuckDuckGo" }) if len(results) >= top_k: break except Exception as e: logger.warning(f"DuckDuckGo search failed: {e}") # Don't re-raise - just return empty results to allow fallbacks to work return results # Additional fallback search alternative def simple_google_search(query: str, top_k: int = 3) -> List[Dict]: """Simplified Google search as a fallback when other methods fail.""" try: # Encode the query import urllib.parse import bs4 encoded_query = urllib.parse.quote(query) url = f"https://www.google.com/search?q={encoded_query}" headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Referer": "https://www.google.com/", "Connection": "keep-alive", } response = requests.get(url, headers=headers, timeout=20) response.raise_for_status() soup = bs4.BeautifulSoup(response.text, "html.parser") results = [] # Extract search results for result in soup.select("div.g")[:top_k]: title_elem = result.select_one("h3") link_elem = result.select_one("a") snippet_elem = result.select_one("div.VwiC3b") if title_elem and link_elem and snippet_elem and "href" in link_elem.attrs: href = link_elem["href"] if href.startswith("/url?q="): href = href.split("/url?q=")[1].split("&")[0] if href.startswith("http"): results.append({ "title": title_elem.get_text(), "url": href, "content": snippet_elem.get_text(), "source": "Google" }) return results except Exception as e: logger.warning(f"Simple Google search failed: {e}") return [] def hybrid_search(query: str, top_k: int = 3) -> List[Dict]: """Combine multiple search sources with fallbacks.""" # Try primary search methods first results = [] # Start with Tavily if API key is available if TAVILY_API_KEY and TAVILY_API_KEY != "default_key_or_placeholder": try: tavily_results = tavily_search(query, top_k) results.extend(tavily_results) logger.info(f"Retrieved {len(tavily_results)} results from Tavily") except Exception as e: logger.warning(f"Tavily search failed: {e}") # If we don't have enough results, try DuckDuckGo if len(results) < top_k: try: ddg_results = duckduckgo_search(query, top_k - len(results)) results.extend(ddg_results) logger.info(f"Retrieved {len(ddg_results)} results from DuckDuckGo") except Exception as e: logger.warning(f"DuckDuckGo search failed: {e}") # If we still don't have enough results, try Google if len(results) < top_k: try: google_results = simple_google_search(query, top_k - len(results)) results.extend(google_results) logger.info(f"Retrieved {len(google_results)} results from Google") except Exception as e: logger.warning(f"Google search failed: {e}") # If all search methods failed, return a dummy result if not results: results.append({ "title": "Search Failed", "url": "", "content": f"Sorry, I couldn't find results for '{query}'. Please try refining your search terms or check your internet connection.", "source": "No results" }) return results[:top_k] # Ensure we only return top_k results def format_search_docs(search_docs: List[Dict]) -> Dict[str, str]: """ Turn a list of {source, page, content} dicts into one big string with entries separated by `---`. """ formatted_search_docs = "\n\n---\n\n".join( [ f'\n' f'{doc.get("content", "")}\n' f'' for doc in search_docs ] ) return {"web_results": formatted_search_docs} @tool(parse_docstring=True) def web_search(query: str, top_k: int = 3) -> Dict[str, str]: """ Perform a hybrid web search combining multiple search engines with robust fallbacks. Args: query: The search query string to look up. top_k: The maximum number of search results to return (default is 3). Returns: A dictionary mapping result indices to XML-like blocks, each containing: - source: The URL of the webpage. - page: Placeholder for page identifier (empty string by default). - content: The first 200 words of the page text, cleaned of HTML tags. """ try: # Use our robust hybrid search to get initial results search_results = hybrid_search(query, top_k) results = [] # Process each search result to get better content for hit in search_results: url = hit.get("url") if not url: continue # Start with the snippet from search content = hit.get("content", "") title = hit.get("title", "") # Try to scrape additional content if possible try: # Use a random user agent to avoid blocking headers = { "User-Agent": random.choice([ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Safari/605.1.15", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36 Edg/97.0.1072.62" ]), "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Referer": "https://www.google.com/", "DNT": "1", "Connection": "keep-alive" } # Higher timeout for better reliability resp = requests.get(url, timeout=15, headers=headers) # Only process if successful if resp.status_code == 200: soup = BeautifulSoup(resp.text, "html.parser") # Try to find main content main_content = soup.find('main') or soup.find('article') or soup.find('div', class_='content') # If we found main content, use it if main_content: extracted_text = main_content.get_text(separator=" ", strip=True) # Take first 200 words content = " ".join(extracted_text.split()[:200]) else: # Otherwise use all text all_text = soup.get_text(separator=" ", strip=True) content = " ".join(all_text.split()[:200]) # Use content from page only if it's substantial if len(content) < 50: content = hit.get("content", "")[:200] # Random delay between 0.5-1.5 seconds to avoid rate limits time.sleep(0.5 + random.random()) except requests.exceptions.HTTPError as e: logger.warning(f"HTTP error when scraping {url}: {e}") # Keep the search snippet as a fallback except requests.exceptions.RequestException as e: logger.warning(f"Request error when scraping {url}: {e}") # Keep the search snippet as a fallback except Exception as e: logger.warning(f"Unexpected error when scraping {url}: {e}") # Keep the search snippet as a fallback # Filter out inappropriate content if any(f in content.lower() for f in _forbidden): continue # Add to results results.append({ "source": url, "page": "", "content": content }) # Return formatted search docs return format_search_docs(results[:top_k]) except Exception as e: logger.error(f"Web search failed: {e}") # Return a helpful error message return format_search_docs([{ "source": "Error", "page": "", "content": f"Search failed with error: {e}. Please try again with different search terms." }]) # ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────── Tool for File System ─────────────────────────────────────────────────────────────────── # ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── @tool(parse_docstring=True) def download_file(url: str, dest_path: str) -> str: """ Download a file from a given URL and save it locally. Args: url: The direct URL of the file to download. dest_path: The local path to save the downloaded file. Returns: The destination path where the file was saved. """ r = requests.get(url, stream=True) r.raise_for_status() with open(dest_path, 'wb') as f: for chunk in r.iter_content(8192): f.write(chunk) return dest_path @tool(parse_docstring=True) def process_excel_to_text(file_path: str) -> str: """ Convert an Excel file into CSV-formatted text. Args: file_path: Path to the Excel (.xlsx) file. Returns: A string of CSV-formatted content extracted from the Excel file. """ try: # Check if file exists import os if not os.path.exists(file_path): return f"Error: Excel file '{file_path}' does not exist." # Try different engines engines = ['openpyxl', 'xlrd', None] for engine in engines: try: # For engine=None, pandas will try to auto-detect if engine: df = pd.read_excel(file_path, engine=engine) else: df = pd.read_excel(file_path) return df.to_csv(index=False) except Exception as e: print(f"Excel engine {engine} failed: {e}") last_error = e continue # If we got here, all engines failed return f"Error processing Excel file: {str(last_error)}" except Exception as e: return f"Error with Excel file: {str(e)}" @tool(parse_docstring=True) def read_text_from_pdf(file_path: str, question: str = None) -> str: """ Extract text from a PDF file, chunking large documents if needed. Args: file_path: Path to the PDF file. question: Optional question to help retrieve relevant parts of long documents. Returns: The extracted text content, potentially chunked if the document is large. """ try: # Check if file exists import os if not os.path.exists(file_path): return f"Error: PDF file '{file_path}' does not exist." reader = PdfReader(file_path) full_text = "\n".join([page.extract_text() or "" for page in reader.pages]) # If a question is provided, use retrieval to get relevant parts if question and len(full_text) > 5000: # Only chunk if text is large return process_large_document(full_text, question) return full_text except Exception as e: return f"Error reading PDF: {str(e)}" @tool(parse_docstring=True) def read_text_from_docx(file_path: str, question: str = None) -> str: """ Extract text from a DOCX (Word) document, chunking large documents if needed. Args: file_path: Path to the DOCX file. question: Optional question to help retrieve relevant parts of long documents. Returns: The extracted text, potentially chunked if the document is large. """ try: # Check if file exists import os if not os.path.exists(file_path): return f"Error: File '{file_path}' does not exist." try: doc = docx.Document(file_path) full_text = "\n".join([para.text for para in doc.paragraphs]) except Exception as docx_err: # Handle "Package not found" error specifically if "Package not found" in str(docx_err): # Try to read raw text if possible try: import zipfile from xml.etree.ElementTree import XML WORD_NAMESPACE = '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}' PARA = WORD_NAMESPACE + 'p' TEXT = WORD_NAMESPACE + 't' with zipfile.ZipFile(file_path) as docx_file: with docx_file.open('word/document.xml') as document: tree = XML(document.read()) paragraphs = [] for paragraph in tree.iter(PARA): texts = [node.text for node in paragraph.iter(TEXT) if node.text] if texts: paragraphs.append(''.join(texts)) full_text = '\n'.join(paragraphs) except Exception as e: return f"Error reading DOCX file: {str(e)}" else: return f"Error reading DOCX file: {str(docx_err)}" # If a question is provided, use retrieval to get relevant parts if question and len(full_text) > 5000: # Only chunk if text is large return process_large_document(full_text, question) return full_text except Exception as e: return f"Error reading DOCX file: {str(e)}" @tool(parse_docstring=True) def transcribe_audio(file_path: str) -> str: """ Transcribe speech from a local audio file to text. Args: file_path: Path to the audio file. Returns: Transcribed text using Google Web Speech API. """ try: # Check if file exists import os if not os.path.exists(file_path): return f"Error: Audio file '{file_path}' does not exist." # For non-WAV files, convert to WAV first if not file_path.lower().endswith('.wav'): try: from pydub import AudioSegment temp_wav = os.path.splitext(file_path)[0] + "_temp.wav" audio = AudioSegment.from_file(file_path) audio.export(temp_wav, format="wav") file_path = temp_wav except Exception as e: return f"Failed to convert audio to WAV format: {str(e)}" recognizer = sr.Recognizer() with sr.AudioFile(file_path) as src: audio = recognizer.record(src) return recognizer.recognize_google(audio) except Exception as e: if "Audio file could not be read" in str(e): return f"Error: Audio format not supported. Try converting to WAV, MP3, OGG, or FLAC." return f"Error transcribing audio: {str(e)}" @tool(parse_docstring=True) def youtube_audio_processing(youtube_url: str) -> str: """ Download and transcribe audio from a YouTube video. Args: youtube_url: URL of the YouTube video. Returns: Transcription text extracted from the video's audio. """ yt = YouTube(youtube_url) audio_stream = yt.streams.filter(only_audio=True).first() out_file = audio_stream.download(output_path='.', filename='yt_audio') wav_path = 'yt_audio.wav' AudioSegment.from_file(out_file).export(wav_path, format='wav') return transcribe_audio(wav_path) @tool(parse_docstring=True) def extract_article_text(url: str, question: str = None) -> str: """ Download and extract the main article content from a webpage, chunking large articles if needed. Args: url: The URL of the article to extract. question: Optional question to help retrieve relevant parts of long articles. Returns: The article's textual content, potentially chunked if large. """ try: art = Article(url) art.download() art.parse() full_text = art.text # If a question is provided, use retrieval to get relevant parts if question and len(full_text) > 5000: # Only chunk if text is large return process_large_document(full_text, question) return full_text except Exception as e: return f"Error extracting article: {str(e)}" # ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── # ───────────────────────────────────────────────────────────── Tool for ArXiv ──────────────────────────────────────────────────────────── # ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── @tool(parse_docstring=True) def arvix_search(query: str) -> Dict[str, str]: """ Search for academic papers on ArXiv. Args: query: The search term to look for in ArXiv. Returns: A dictionary of up to 3 relevant paper entries in JSON format. """ papers = ArxivLoader(query=query, load_max_docs=3).load() results = [] for doc in papers: try: # Handle different metadata formats that might be returned source = doc.metadata.get("source", "ArXiv") doc_id = doc.metadata.get("id", doc.metadata.get("entry_id", "")) result = { "source": source, "id": doc_id, "summary": doc.page_content[:1000] if hasattr(doc, "page_content") else str(doc)[:1000], } results.append(result) except Exception as e: # Add error information as a fallback results.append({ "source": "ArXiv Error", "id": "error", "summary": f"Error processing paper: {str(e)}" }) return {"arvix_results": json.dumps(results)} @tool(parse_docstring=True) def answer_youtube_video_question( youtube_url: str, question: str, chunk_size_seconds: int = 30 ) -> str: """ Answer a question based on a YouTube video's transcript. Args: youtube_url: URL of the YouTube video. question: The question to be answered using video content. chunk_size_seconds: Duration of each transcript chunk. Returns: The answer to the question generated from the video transcript. """ loader = YoutubeLoader.from_youtube_url( youtube_url, add_video_info=True, transcript_format=TranscriptFormat.CHUNKS, chunk_size_seconds=chunk_size_seconds, ) documents = loader.load() embeddings = HuggingFaceEmbeddings(model_name='sentence-transformers/all-mpnet-base-v2') vectorstore = FAISS.from_documents(documents, embeddings) llm = RetryingChatGroq(model="deepseek-r1-distill-llama-70b", streaming=False) qa_chain = RetrievalQA.from_chain_type(llm=llm, retriever=vectorstore.as_retriever()) return qa_chain.run(question) # ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── # ───────────────────────────────────────────────────────────── Tool for Python REPL tool ──────────────────────────────────────────────── # ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── python_repl = PythonREPLTool() # ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── # ───────────────────────────────────────────────────────────── Tool for Wiki ──────────────────────────────────────────────────────────── # ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── @tool(parse_docstring=True) def wiki_search(query: str) -> str: """ Search Wikipedia for information on a given topic. Args: query: The search term for Wikipedia. Returns: A JSON string with up to 3 summary results. """ # load up to top_k pages pages = WikipediaLoader(query=query, load_max_docs=3).load() results: List[Dict] = [] for doc in pages: results.append({ "source": doc.metadata["source"], "page": doc.metadata.get("page", ""), "content": doc.page_content[:1000], # truncate if you like }) return {"wiki_results": format_search_docs(results)} # ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── # ───────────────────────────────────── Tool for Image (understading, captioning & classification) ───────────────────────────────────────── # ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── def _load_image(img_path: str, resize_to=(512, 512)) -> Image.Image: """ Load, verify, convert, and resize an image. Raises ValueError on failure. """ if not img_path: raise ValueError("No image path provided.") try: with Image.open(img_path) as img: img.verify() img = Image.open(img_path).convert("RGB") img = img.resize(resize_to) return img except UnidentifiedImageError: raise ValueError(f"File at {img_path} is not a valid image.") except Exception as e: raise ValueError(f"Failed to load image at {img_path}: {e}") def _encode_image_to_base64(img_path: str) -> str: """ Load an image, save optimized PNG into memory, and base64‑encode it. """ img = _load_image(img_path) buffer = BytesIO() img.save(buffer, format="PNG", optimize=True) return base64.b64encode(buffer.getvalue()).decode("utf-8") @tool def image_processing(prompt: str, img_path: str) -> str: """Process an image using a vision LLM, with OCR fallback. Args: prompt: Instruction or question related to the image. img_path: Path to the image file. Returns: The model's response or fallback OCR result. """ try: import os # Check if file exists if not os.path.exists(img_path): return f"Error: Image file '{img_path}' does not exist." try: b64 = _encode_image_to_base64(img_path) # Build a single markdown string with inline base64 image md = f"{prompt}\n\n![](data:image/png;base64,{b64})" message = HumanMessage(content=md) # Use RetryingChatGroq with Llama 4 Maverick for vision llm = RetryingChatGroq(model="meta-llama/llama-4-maverick-17b-128e-instruct", streaming=False, temperature=0) try: resp = llm.invoke([message]) if hasattr(resp, 'content'): return resp.content.strip() elif isinstance(resp, str): return resp.strip() else: # Handle dictionary or other response types return str(resp) except Exception as invoke_err: print(f"[LLM invoke error] {invoke_err}") # Fall back to OCR raise ValueError("LLM invocation failed") except Exception as llama_err: print(f"[LLM vision failed] {llama_err}") try: img = _load_image(img_path) return pytesseract.image_to_string(img).strip() except Exception as ocr_err: print(f"[OCR fallback failed] {ocr_err}") return "Unable to process the image. Please check the file and try again." except Exception as e: # Catch any other errors print(f"[image_processing error] {e}") return f"Error processing image: {str(e)}" python_repl_tool = PythonREPLTool() @tool def echo(text: str) -> str: """Echo back the input text. Args: text: The string to be echoed. Returns: The same text that was provided as input. """ return text # ───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────── Langgraph Agent ─────────────────────────────────────────────────────────────────────── # ───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── # Build graph function from langchain_core.tools import tool from langchain.chat_models import ChatOpenAI from langgraph.prebuilt.chat_agent_executor import create_react_agent, AgentState from langchain.chat_models import init_chat_model def build_graph(provider: str = "groq"): """Construct and compile the multi‑agent GAIA workflow StateGraph. This graph wires together three React‑style agents into a streamlined pipeline: PerceptionAgent → ActionAgent → EvaluationAgent (with appropriate entry/exit points) The agents have the following responsibilities: - PerceptionAgent: Handles web searches, Wikipedia, ArXiv, and image processing - ActionAgent: Performs calculations, file operations, and code analysis - EvaluationAgent: Reviews results and ensures the final answer is properly formatted Args: provider: The name of the LLM provider. Must be "groq". Returns: CompiledGraph: A compiled LangGraph state machine ready for invocation. Raises: ValueError: If `provider` is anything other than "groq". """ try: if provider != "groq": raise ValueError("Invalid provider. Expected 'groq'.") # Initialize LLM try: logger.info("Initializing LLM with model: deepseek-r1-distill-llama-70b") api_key = os.getenv("GROQ_API_KEY") if not api_key or api_key == "default_key_or_placeholder": logger.error("GROQ_API_KEY is not set or is using placeholder value") raise ValueError("GROQ_API_KEY environment variable is not set properly. Please set a valid API key.") llm = RetryingChatGroq(model="deepseek-r1-distill-llama-70b", temperature=0) logger.info("LLM initialized successfully") except Exception as e: logger.error(f"Error initializing LLM: {str(e)}") raise # General system message for agents sys_msg = SystemMessage(content=""" You are a general AI assistant. I will ask you a question. Report your thoughts, and finish your answer with the following template: FINAL ANSWER: [YOUR FINAL ANSWER] YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma-separated list of numbers and/or strings. If you are asked for a number, don't use commas or units (e.g., $, %, kg) unless specified otherwise. If you are asked for a string, don't use articles (a, an, the), and don't use abbreviations (e.g., for states). If you are asked for a comma-separated list, apply the above rules to each element in the list. """.strip()) # Special system message for the evaluation agent with stricter formatting requirements eval_sys_msg = SystemMessage(content=""" You are a specialized evaluation agent. Your job is to review the work done by other agents and provide a final, properly formatted answer. IMPORTANT: You MUST ALWAYS format your answer using this exact template: FINAL ANSWER: [concise answer] Rules for formatting the answer: 1. The answer must be extremely concise - use as few words as possible 2. For numeric answers, provide only the number without units unless units are specifically requested 3. For text answers, avoid articles (a, an, the) and unnecessary words 4. For list answers, use a comma-separated format 5. NEVER explain your reasoning in the FINAL ANSWER section 6. NEVER skip the "FINAL ANSWER:" prefix Example good answers: FINAL ANSWER: 42 FINAL ANSWER: Paris FINAL ANSWER: 1912, 1945, 1989 Example bad answers (don't do these): - Based on my analysis, the answer is 42. - I think it's Paris because that's the capital of France. - The years were 1912, 1945, and 1989. Remember: ALWAYS include "FINAL ANSWER:" followed by the most concise answer possible. """.strip()) # Define tools for each agent logger.info("Setting up agent tools") perception_tools = [web_search, wiki_search, news_article_search, arvix_search, image_processing, echo] execution_tools = [ multiply, add, subtract, divide, modulus, download_file, process_excel_to_text, read_text_from_pdf, read_text_from_docx, transcribe_audio, youtube_audio_processing, extract_article_text, answer_youtube_video_question, python_repl_tool, analyze_code, read_code_file, analyze_python_function ] # ─────────────── Agent Creation ─────────────── logger.info("Creating agents") try: # Create agents with proper error handling PerceptionAgent = create_react_agent( model=llm, tools=perception_tools, prompt=sys_msg, state_schema=AgentState, name="PerceptionAgent" ) logger.info("Created PerceptionAgent successfully") # Combined Planning and Execution agent for better efficiency ActionAgent = create_react_agent( model=llm, tools=execution_tools, # Has access to all execution tools prompt=sys_msg, state_schema=AgentState, name="ActionAgent" ) logger.info("Created ActionAgent successfully") # Evaluation agent with stricter prompt EvaluationAgent = create_react_agent( model=llm, tools=[], # No tools needed for evaluation prompt=eval_sys_msg, # Use the specialized evaluation prompt state_schema=AgentState, name="EvaluationAgent" ) logger.info("Created EvaluationAgent successfully") except Exception as e: logger.error(f"Error creating agent: {str(e)}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") raise # Build the StateGraph logger.info("Building StateGraph") try: builder = StateGraph(AgentState) # Add agent nodes first builder.add_node("PerceptionAgent", PerceptionAgent) builder.add_node("ActionAgent", ActionAgent) builder.add_node("EvaluationAgent", EvaluationAgent) # Define the flow with a starting edge builder.set_entry_point("PerceptionAgent") # Add the edges for the simpler linear flow builder.add_edge("PerceptionAgent", "ActionAgent") builder.add_edge("ActionAgent", "EvaluationAgent") # Set EvaluationAgent as the end node builder.set_finish_point("EvaluationAgent") logger.info("Compiling StateGraph") return builder.compile() except Exception as e: logger.error(f"Error building graph: {str(e)}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") raise except Exception as e: logger.error(f"Overall error in build_graph: {str(e)}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") raise def get_final_answer(text): """Extract just the FINAL ANSWER from the model's response. Args: text: The full text response from the LLM Returns: str: The extracted answer without the "FINAL ANSWER:" prefix """ # Log the raw text for debugging if needed logger.debug(f"Extracting answer from: {text[:200]}...") if not text: logger.warning("Empty response received") return "No answer provided." # Method 1: Look for "FINAL ANSWER:" with most comprehensive pattern matching pattern = r'(?:^|\n)FINAL ANSWER:\s*(.*?)(?:\n\s*$|$)' match = re.search(pattern, text, re.DOTALL | re.IGNORECASE) if match: # Return just the answer part, cleaned up logger.debug("Found answer using pattern 1") return match.group(1).strip() # Method 2: Try looking for variations on the final answer format for variant in ["FINAL ANSWER:", "FINAL_ANSWER:", "Final Answer:", "Answer:"]: lines = text.split('\n') for i, line in enumerate(reversed(lines)): if variant in line: # Extract everything after the variant text logger.debug(f"Found answer using variant: {variant}") answer = line[line.find(variant) + len(variant):].strip() if answer: return answer # If the answer is on the next line, return that if i > 0: next_line = lines[len(lines) - i] if next_line.strip(): return next_line.strip() # Method 3: Look for phrases that suggest an answer for phrase in ["The answer is", "The result is", "We get", "Therefore,", "In conclusion,"]: phrase_pos = text.find(phrase) if phrase_pos != -1: # Try to extract everything after the phrase until the end of the sentence sentence_end = text.find(".", phrase_pos) if sentence_end != -1: logger.debug(f"Found answer using phrase: {phrase}") return text[phrase_pos + len(phrase):sentence_end].strip() # Method 4: Fall back to taking the last paragraph with actual content paragraphs = text.strip().split('\n\n') for para in reversed(paragraphs): para = para.strip() if para and not para.startswith("I ") and not para.lower().startswith("to "): logger.debug("Using last meaningful paragraph") # If paragraph is very long, try to extract a concise answer if len(para) > 100: sentences = re.split(r'[.!?]', para) for sentence in reversed(sentences): sent = sentence.strip() if sent and len(sent) > 5 and not sent.startswith("I "): return sent return para # Method 5: Last resort - just return the last line with content lines = text.strip().split('\n') for line in reversed(lines): line = line.strip() if line and len(line) > 3: logger.debug("Using last line with content") return line # If everything fails, warn and return the truncated response logger.warning("Could not find a properly formatted answer") return text[:100] + "..." if len(text) > 100 else text # test if __name__ == "__main__": question = "When was a picture of St. Thomas Aquinas first added to the Wikipedia page on the Principle of double effect?" # Build the graph graph = build_graph(provider="groq") # Run the graph messages = [HumanMessage(content=question)] messages = graph.invoke({"messages": messages}) for m in messages["messages"]: m.pretty_print() # ─────────────────────────────────────────────── Tool for Code Analysis ─────────────────────────────────────────────────────────────── @tool def analyze_code(code_string: str) -> str: """Analyze a string of code to understand its structure, functionality, and potential issues. Args: code_string: The code to analyze as a string. Returns: A structured analysis of the code including functions, classes, and key operations. """ try: import ast # Try to parse with Python's AST module try: parsed = ast.parse(code_string) # Extract functions and classes functions = [node.name for node in ast.walk(parsed) if isinstance(node, ast.FunctionDef)] classes = [node.name for node in ast.walk(parsed) if isinstance(node, ast.ClassDef)] imports = [node.names[0].name for node in ast.walk(parsed) if isinstance(node, ast.Import)] imports.extend([f"{node.module}.{name.name}" if node.module else name.name for node in ast.walk(parsed) if isinstance(node, ast.ImportFrom) for name in node.names]) # Count various node types for complexity assessment num_loops = len([node for node in ast.walk(parsed) if isinstance(node, (ast.For, ast.While))]) num_conditionals = len([node for node in ast.walk(parsed) if isinstance(node, (ast.If, ast.IfExp))]) analysis = { "language": "Python", "functions": functions, "classes": classes, "imports": imports, "complexity": { "functions": len(functions), "classes": len(classes), "loops": num_loops, "conditionals": num_conditionals } } return str(analysis) except SyntaxError: # If not valid Python, try some simple pattern matching if "{" in code_string and "}" in code_string: if "function" in code_string or "=>" in code_string: language = "JavaScript/TypeScript" elif "func" in code_string or "struct" in code_string: language = "Go or Rust" elif "public" in code_string or "private" in code_string or "class" in code_string: language = "Java/C#/C++" else: language = "Unknown C-like language" elif "<" in code_string and ">" in code_string and ("/>" in code_string or " str: """Read a code file and return its contents with proper syntax detection. Args: file_path: Path to the code file. Returns: The file contents and detected language. """ try: # Check if file exists import os if not os.path.exists(file_path): return f"Error: File '{file_path}' does not exist." with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # Try to detect language from extension ext = os.path.splitext(file_path)[1].lower() language_map = { '.py': 'Python', '.js': 'JavaScript', '.ts': 'TypeScript', '.html': 'HTML', '.css': 'CSS', '.java': 'Java', '.c': 'C', '.cpp': 'C++', '.cs': 'C#', '.go': 'Go', '.rs': 'Rust', '.php': 'PHP', '.rb': 'Ruby', '.sh': 'Shell', '.bat': 'Batch', '.ps1': 'PowerShell', '.sql': 'SQL', '.json': 'JSON', '.xml': 'XML', '.yaml': 'YAML', '.yml': 'YAML', } language = language_map.get(ext, 'Unknown') return f"File content ({language}):\n\n{content}" except Exception as e: return f"Error reading file: {str(e)}" @tool def analyze_python_function(function_name: str, code_string: str) -> str: """Extract and analyze a specific function from Python code. Args: function_name: The name of the function to analyze. code_string: The complete code containing the function. Returns: Analysis of the function including parameters, return type, and docstring. """ try: import ast import inspect from types import CodeType, FunctionType # Parse the code string parsed = ast.parse(code_string) # Find the function definition function_def = None for node in ast.walk(parsed): if isinstance(node, ast.FunctionDef) and node.name == function_name: function_def = node break if not function_def: return f"Function '{function_name}' not found in the provided code." # Extract parameters params = [] for arg in function_def.args.args: param_name = arg.arg # Get annotation if it exists if arg.annotation: if isinstance(arg.annotation, ast.Name): param_type = arg.annotation.id elif isinstance(arg.annotation, ast.Attribute): param_type = f"{arg.annotation.value.id}.{arg.annotation.attr}" else: param_type = "complex_type" params.append(f"{param_name}: {param_type}") else: params.append(param_name) # Extract return type if it exists return_type = None if function_def.returns: if isinstance(function_def.returns, ast.Name): return_type = function_def.returns.id elif isinstance(function_def.returns, ast.Attribute): return_type = f"{function_def.returns.value.id}.{function_def.returns.attr}" else: return_type = "complex_return_type" # Extract docstring docstring = ast.get_docstring(function_def) # Create a summary summary = { "function_name": function_name, "parameters": params, "return_type": return_type, "docstring": docstring, "decorators": [d.id if isinstance(d, ast.Name) else "complex_decorator" for d in function_def.decorator_list], "line_count": len(function_def.body) } # Create a more explicit string representation that ensures key terms are included result = f"Function '{function_name}' analysis:\n" result += f"- Parameters: {', '.join(params)}\n" result += f"- Return type: {return_type or 'None specified'}\n" result += f"- Docstring: {docstring or 'None'}\n" result += f"- Line count: {len(function_def.body)}" return result except Exception as e: return f"Error analyzing function: {str(e)}" # ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────── Tool for News Article Retrieval ────────────────────────────────────────────────────────────────────── # ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── @tool def news_article_search(query: str, top_k: int = 3) -> Dict[str, str]: """Search for and retrieve news articles with robust error handling for news sites. Args: query: The news topic or keywords to search for. top_k: Maximum number of articles to retrieve. Returns: A dictionary with search results formatted as XML-like document entries. """ # First, get URLs from DuckDuckGo with "news" focus results = [] news_sources = [ "bbc.com", "reuters.com", "apnews.com", "nasa.gov", "space.com", "universetoday.com", "nature.com", "science.org", "scientificamerican.com", "nytimes.com", "theguardian.com" ] # Find news from reliable sources try: with DDGS() as ddgs: search_query = f"{query} site:{' OR site:'.join(news_sources)}" for hit in ddgs.text(search_query, safesearch="On", max_results=top_k*2): url = hit.get("href") or hit.get("url", "") if not url: continue # Add the search snippet first as a fallback result = { "source": url, "page": "", "content": hit.get("body", "")[:250], "title": hit.get("title", "") } # Try to get better content via a more robust method try: headers = { "User-Agent": random.choice([ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Safari/605.1.15", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36" ]), "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Referer": "https://www.google.com/", "DNT": "1", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1" } # Add a short delay between requests time.sleep(1 + random.random()) # Try to use newspaper3k for more reliable article extraction from newspaper import Article article = Article(url) article.download() article.parse() # If we got meaningful content, update the result if article.text and len(article.text) > 100: # Get a summary - first paragraph + some highlights paragraphs = article.text.split('\n\n') first_para = paragraphs[0] if paragraphs else "" summary = first_para[:300] if len(paragraphs) > 1: summary += "... " + paragraphs[1][:200] result["content"] = summary if article.title: result["title"] = article.title except Exception as article_err: logger.warning(f"Article extraction failed for {url}: {article_err}") # Fallback to simple requests-based extraction try: resp = requests.get(url, timeout=12, headers=headers) resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") # Try to get main content main_content = soup.find('main') or soup.find('article') or soup.find('div', class_='content') if main_content: content = " ".join(main_content.get_text(separator=" ", strip=True).split()[:250]) result["content"] = content except Exception as req_err: logger.warning(f"Fallback extraction failed for {url}: {req_err}") # Keep the original snippet as fallback results.append(result) if len(results) >= top_k: break except Exception as e: logger.error(f"News search failed: {e}") return format_search_docs([{ "source": "Error", "page": "", "content": f"Failed to retrieve news articles for '{query}': {str(e)}" }]) if not results: # Fallback to regular web search logger.info(f"No news results found, falling back to web_search for {query}") return web_search(query, top_k) return format_search_docs(results[:top_k]) # ───────────────────────────────────────────────────────────── Document Chunking Utilities ────────────────────────────────────────────────────────── def chunk_document(text: str, chunk_size: int = 1000, overlap: int = 100) -> List[str]: """ Split a large document into smaller chunks with overlap to maintain context across chunks. Args: text: The document text to split into chunks chunk_size: Maximum size of each chunk in characters overlap: Number of characters to overlap between chunks Returns: List of text chunks """ # If text is smaller than chunk_size, return it as is if len(text) <= chunk_size: return [text] chunks = [] start = 0 while start < len(text): # Get chunk with overlap end = min(start + chunk_size, len(text)) # Try to find sentence boundary for cleaner breaks if end < len(text): # Look for sentence endings: period, question mark, or exclamation followed by space for sentence_end in ['. ', '? ', '! ']: last_period = text[start:end].rfind(sentence_end) if last_period != -1: end = start + last_period + 2 # +2 to include the period and space break # Add chunk to list chunks.append(text[start:end]) # Move start position, accounting for overlap start = end - overlap if end < len(text) else len(text) return chunks # Document processing utility that uses chunking def process_large_document(text: str, question: str, llm=None) -> str: """ Process a large document by chunking it and using retrieval to find relevant parts. Args: text: The document text to process question: The question being asked about the document llm: Optional language model to use (defaults to agent's LLM) Returns: Summarized answer based on relevant chunks """ if not llm: llm = RetryingChatGroq(model="deepseek-r1-distill-llama-70b", streaming=False, temperature=0) # Split document into chunks chunks = chunk_document(text) # If document is small enough, don't bother with retrieval if len(chunks) <= 1: return text # For larger documents, create embeddings to find relevant chunks try: from langchain_community.embeddings import HuggingFaceEmbeddings from langchain.vectorstores import FAISS from langchain.schema import Document # Create documents with chunk content documents = [Document(page_content=chunk, metadata={"chunk_id": i}) for i, chunk in enumerate(chunks)] # Create embeddings and vector store embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2") vectorstore = FAISS.from_documents(documents, embeddings) # Get most relevant chunks relevant_chunks = vectorstore.similarity_search(question, k=2) # Get top 2 most relevant chunks # Join the relevant chunks relevant_text = "\n\n".join([doc.page_content for doc in relevant_chunks]) # Option 1: Return relevant chunks directly return relevant_text # Option 2: Summarize with LLM (commented out for now) # prompt = f"Using only the following information, answer the question: '{question}'\n\nInformation:\n{relevant_text}" # response = llm.invoke([HumanMessage(content=prompt)]) # return response.content except Exception as e: # Fall back to first chunk if retrieval fails logger.warning(f"Retrieval failed: {e}. Falling back to first chunk.") return chunks[0]