Spaces:
No application file
No application file
#βββ Basic imports βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
import os | |
import math | |
import sqlite3 | |
import fitz # PyMuPDF for PDF parsing | |
import re | |
from dotenv import load_dotenv | |
# Load environment variables from .env file | |
load_dotenv() # This line ensures .env variables are loaded | |
from langgraph.graph import START, StateGraph, MessagesState, END | |
from langgraph.prebuilt import tools_condition | |
from langgraph.prebuilt import ToolNode | |
from langgraph.constants import START | |
from langchain_core.tools import tool | |
from langchain.schema import SystemMessage | |
#from langchain.chat_models import init_chat_model | |
#from langgraph.prebuilt import create_react_agent | |
from langchain.embeddings import HuggingFaceEmbeddings | |
#from langchain.vectorstores import Pinecone | |
from langchain.tools.retriever import create_retriever_tool | |
#import pinecone | |
#from pinecone import Pinecone as PineconeClient, ServerlessSpec | |
#from pinecone import Index # the blockingβcall client constructor | |
#from pinecone import Pinecone as PineconeClient, ServerlessSpec | |
from langchain.embeddings import HuggingFaceEmbeddings | |
from langchain_community.vectorstores.pinecone import Pinecone as LC_Pinecone | |
# βββ Langchain Frameworks βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
#from langchain.tools import Tool | |
from langchain.chat_models import ChatOpenAI | |
from langchain_groq import ChatGroq | |
from langchain_mistralai import ChatMistralAI | |
from langchain.agents import initialize_agent, AgentType | |
from langchain.schema import Document | |
from langchain.chains import RetrievalQA | |
from langchain.embeddings import OpenAIEmbeddings | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from langchain.vectorstores import FAISS | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.prompts import PromptTemplate | |
from langchain_community.document_loaders import TextLoader, PyMuPDFLoader | |
from langchain_community.document_loaders.wikipedia import WikipediaLoader | |
from langchain_community.document_loaders.arxiv import ArxivLoader | |
from langchain_experimental.tools.python.tool import PythonREPLTool | |
# βββ Memory βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
from langchain.agents import initialize_agent, AgentType | |
from langchain.tools import Tool | |
from typing import List, Callable | |
from langchain.schema import BaseMemory, AIMessage, HumanMessage, SystemMessage | |
from langchain.schema import HumanMessage, SystemMessage | |
from langchain.llms.base import LLM | |
from langchain.memory.chat_memory import BaseChatMemory | |
from pydantic import PrivateAttr | |
from langchain_core.messages import get_buffer_string | |
# βββ Image Processing ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
from PIL import Image | |
import pytesseract | |
from transformers import pipeline | |
from groq import Groq | |
import requests | |
from io import BytesIO | |
from transformers import pipeline, TrOCRProcessor, VisionEncoderDecoderModel | |
import requests | |
import base64 | |
from PIL import UnidentifiedImageError | |
# βββ Browser var βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
from typing import List, Dict | |
import json | |
from io import BytesIO | |
#from langchain.tools import tool # or langchain_core.tools | |
from playwright.sync_api import sync_playwright | |
from duckduckgo_search import DDGS | |
import time | |
import random | |
import logging | |
from functools import lru_cache, wraps | |
import requests | |
from playwright.sync_api import sync_playwright | |
from bs4 import BeautifulSoup | |
import tenacity | |
from tenacity import retry, stop_after_attempt, wait_exponential | |
# Initialize logger | |
logger = logging.getLogger(__name__) | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
# Additional imports for new functionality | |
import pandas as pd | |
from PyPDF2 import PdfReader | |
import docx | |
import pytesseract | |
import speech_recognition as sr | |
from pydub import AudioSegment | |
from pytube import YouTube | |
from newspaper import Article | |
from langchain.document_loaders import ArxivLoader | |
from langchain_community.document_loaders.youtube import YoutubeLoader, TranscriptFormat | |
from playwright.sync_api import sync_playwright | |
# Attempt to import Playwright for dynamic page rendering | |
try: | |
from playwright.sync_api import sync_playwright | |
_playwright_available = True | |
except ImportError: | |
_playwright_available = False | |
# Define forbidden keywords for basic NSFW filtering | |
_forbidden = ["porn", "sex", "xxx", "nude", "erotic"] | |
# βββ LLM Setup βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# Load OpenAI API key from environment (required for LLM and embeddings) | |
# API Keys from .env file | |
os.environ.setdefault("OPENAI_API_KEY", "<YOUR_OPENAI_KEY>") # Set your own key or env var | |
os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY", "default_key_or_placeholder") | |
os.environ["MISTRAL_API_KEY"] = os.getenv("MISTRAL_API_KEY", "default_key_or_placeholder") | |
# Tavily API Key | |
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY", "default_key_or_placeholder") | |
_forbidden = ["nsfw", "porn", "sex", "explicit"] | |
_playwright_available = True # set False to disable Playwright | |
# Globals for RAG system | |
vector_store = None | |
rag_chain = None | |
DB_PATH = None # will be set when a .db is uploaded | |
DOC_PATH = None # will be set when a document is uploaded | |
IMG_PATH = None # will be set when an image is uploaded | |
OTH_PATH = None # will be set when an other file is uploaded | |
# βββ LLMS ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
#llm = ChatOpenAI(model_name="gpt-3.5-turbo", streaming=True, temperature=0) | |
from tenacity import retry, stop_after_attempt, wait_exponential | |
# Import the RetryingChatGroq client | |
from retry_groq import RetryingChatGroq | |
# Use the retrying version instead | |
llm = RetryingChatGroq(model="deepseek-r1-distill-llama-70b", streaming=False, temperature=0) | |
#llm = ChatMistralAI(model="mistral-large-latest", streaming=True, temperature=0) | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# βββββββββββββββββββββββββββββββββββββββββββββββ Tool for multiply ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def multiply(a: int, b: int) -> int: | |
""" | |
Multiply two numbers. | |
Args: | |
a (int): The first factor. | |
b (int): The second factor. | |
Returns: | |
int: The product of a and b. | |
""" | |
try: | |
# Direct calculation without relying on LangChain handling | |
result = a * b | |
return result | |
except Exception as e: | |
return f"Error in multiplication: {str(e)}" | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# βββββββββββββββββββββββββββββββββββββββββββββββ Tool for add ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def add(a: int, b: int) -> int: | |
""" | |
Add two numbers. | |
Args: | |
a (int): The first factor. | |
b (int): The second factor. | |
Returns: | |
int: The addition of a and b. | |
""" | |
try: | |
# Direct calculation without relying on LangChain handling | |
result = a + b | |
return result | |
except Exception as e: | |
return f"Error in addition: {str(e)}" | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# βββββββββββββββββββββββββββββββββββββββββββββββ Tool for subtract ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def subtract(a: int, b: int) -> int: | |
""" | |
Subtract two numbers. | |
Args: | |
a (int): The first factor. | |
b (int): The second factor. | |
Returns: | |
int: The subtraction of a and b. | |
""" | |
try: | |
# Direct calculation without relying on LangChain handling | |
result = a - b | |
return result | |
except Exception as e: | |
return f"Error in subtraction: {str(e)}" | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# βββββββββββββββββββββββββββββββββββββββββββββββ Tool for divide ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def divide(a: int, b: int) -> int: | |
""" | |
Divide two numbers. | |
Args: | |
a (int): The numerator. | |
b (int): The denominator. | |
Returns: | |
float: The result of a divided by b. | |
Raises: | |
ValueError: If b is zero. | |
""" | |
try: | |
if b == 0: | |
return "Error: Cannot divide by zero." | |
# Direct calculation without relying on LangChain handling | |
result = a / b | |
return result | |
except Exception as e: | |
return f"Error in division: {str(e)}" | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# βββββββββββββββββββββββββββββββββββββββββββββββ Tool for modulus ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def modulus(a: int, b: int) -> int: | |
""" | |
Get the modulus (remainder) of two numbers. | |
Args: | |
a (int): The dividend. | |
b (int): The divisor. | |
Returns: | |
int: The remainder when a is divided by b. | |
""" | |
try: | |
if b == 0: | |
return "Error: Cannot calculate modulus with zero divisor." | |
# Direct calculation without relying on LangChain handling | |
result = a % b | |
return result | |
except Exception as e: | |
return f"Error in modulus calculation: {str(e)}" | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# βββββββββββββββββββββββββββββββββββββββββββββββ Tool for browsing ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def with_retry(max_attempts: int = 3, backoff_base: int = 2): | |
""" | |
Decorator for retrying a function with exponential backoff on exception. | |
""" | |
def decorator(fn): | |
def wrapper(*args, **kwargs): | |
for attempt in range(max_attempts): | |
try: | |
return fn(*args, **kwargs) | |
except Exception as e: | |
wait = backoff_base ** attempt + random.uniform(0, 1) | |
logger.warning(f"{fn.__name__} failed (attempt {attempt+1}/{max_attempts}): {e}") | |
if attempt < max_attempts - 1: | |
time.sleep(wait) | |
logger.error(f"{fn.__name__} failed after {max_attempts} attempts.") | |
return [] | |
return wrapper | |
return decorator | |
def tavily_search(query: str, top_k: int = 3) -> List[Dict]: | |
"""Call Tavily API and return a list of result dicts.""" | |
if not TAVILY_API_KEY: | |
logger.info("[Tavily] No API key set. Skipping Tavily search.") | |
return [] | |
url = "https://api.tavily.com/search" | |
headers = { | |
"Authorization": f"Bearer {TAVILY_API_KEY}", | |
"Content-Type": "application/json", | |
} | |
payload = {"query": query, "num_results": top_k} | |
resp = requests.post(url, headers=headers, json=payload, timeout=10) | |
resp.raise_for_status() | |
data = resp.json() | |
results = [] | |
for item in data.get("results", []): | |
results.append({ | |
"title": item.get("title", ""), | |
"url": item.get("url", ""), | |
"content": item.get("content", "")[:200], | |
"source": "Tavily" | |
}) | |
return results | |
def duckduckgo_search(query: str, top_k: int = 3) -> List[Dict]: | |
"""Query DuckDuckGo and return up to top_k raw SERP hits.""" | |
results = [] | |
try: | |
with DDGS(timeout=15) as ddgs: # Increase timeout from default | |
for hit in ddgs.text(query, safesearch="On", max_results=top_k, timeout=15): | |
results.append({ | |
"title": hit.get("title", ""), | |
"url": hit.get("href") or hit.get("url", ""), | |
"content": hit.get("body", ""), | |
"source": "DuckDuckGo" | |
}) | |
if len(results) >= top_k: | |
break | |
except Exception as e: | |
logger.warning(f"DuckDuckGo search failed: {e}") | |
# Don't re-raise - just return empty results to allow fallbacks to work | |
return results | |
# Additional fallback search alternative | |
def simple_google_search(query: str, top_k: int = 3) -> List[Dict]: | |
"""Simplified Google search as a fallback when other methods fail.""" | |
try: | |
# Encode the query | |
import urllib.parse | |
import bs4 | |
encoded_query = urllib.parse.quote(query) | |
url = f"https://www.google.com/search?q={encoded_query}" | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36", | |
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
"Accept-Language": "en-US,en;q=0.5", | |
"Referer": "https://www.google.com/", | |
"Connection": "keep-alive", | |
} | |
response = requests.get(url, headers=headers, timeout=20) | |
response.raise_for_status() | |
soup = bs4.BeautifulSoup(response.text, "html.parser") | |
results = [] | |
# Extract search results | |
for result in soup.select("div.g")[:top_k]: | |
title_elem = result.select_one("h3") | |
link_elem = result.select_one("a") | |
snippet_elem = result.select_one("div.VwiC3b") | |
if title_elem and link_elem and snippet_elem and "href" in link_elem.attrs: | |
href = link_elem["href"] | |
if href.startswith("/url?q="): | |
href = href.split("/url?q=")[1].split("&")[0] | |
if href.startswith("http"): | |
results.append({ | |
"title": title_elem.get_text(), | |
"url": href, | |
"content": snippet_elem.get_text(), | |
"source": "Google" | |
}) | |
return results | |
except Exception as e: | |
logger.warning(f"Simple Google search failed: {e}") | |
return [] | |
def hybrid_search(query: str, top_k: int = 3) -> List[Dict]: | |
"""Combine multiple search sources with fallbacks.""" | |
# Try primary search methods first | |
results = [] | |
# Start with Tavily if API key is available | |
if TAVILY_API_KEY and TAVILY_API_KEY != "default_key_or_placeholder": | |
try: | |
tavily_results = tavily_search(query, top_k) | |
results.extend(tavily_results) | |
logger.info(f"Retrieved {len(tavily_results)} results from Tavily") | |
except Exception as e: | |
logger.warning(f"Tavily search failed: {e}") | |
# If we don't have enough results, try DuckDuckGo | |
if len(results) < top_k: | |
try: | |
ddg_results = duckduckgo_search(query, top_k - len(results)) | |
results.extend(ddg_results) | |
logger.info(f"Retrieved {len(ddg_results)} results from DuckDuckGo") | |
except Exception as e: | |
logger.warning(f"DuckDuckGo search failed: {e}") | |
# If we still don't have enough results, try Google | |
if len(results) < top_k: | |
try: | |
google_results = simple_google_search(query, top_k - len(results)) | |
results.extend(google_results) | |
logger.info(f"Retrieved {len(google_results)} results from Google") | |
except Exception as e: | |
logger.warning(f"Google search failed: {e}") | |
# If all search methods failed, return a dummy result | |
if not results: | |
results.append({ | |
"title": "Search Failed", | |
"url": "", | |
"content": f"Sorry, I couldn't find results for '{query}'. Please try refining your search terms or check your internet connection.", | |
"source": "No results" | |
}) | |
return results[:top_k] # Ensure we only return top_k results | |
def format_search_docs(search_docs: List[Dict]) -> Dict[str, str]: | |
""" | |
Turn a list of {source, page, content} dicts into one big | |
string with <Document ...>β¦</Document> entries separated by `---`. | |
""" | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc["source"]}" page="{doc.get("page", "")}"/>\n' | |
f'{doc.get("content", "")}\n' | |
f'</Document>' | |
for doc in search_docs | |
] | |
) | |
return {"web_results": formatted_search_docs} | |
def web_search(query: str, top_k: int = 3) -> Dict[str, str]: | |
""" | |
Perform a hybrid web search combining multiple search engines with robust fallbacks. | |
Args: | |
query: The search query string to look up. | |
top_k: The maximum number of search results to return (default is 3). | |
Returns: | |
A dictionary mapping result indices to XML-like <Document> blocks, each containing: | |
- source: The URL of the webpage. | |
- page: Placeholder for page identifier (empty string by default). | |
- content: The first 200 words of the page text, cleaned of HTML tags. | |
""" | |
try: | |
# Use our robust hybrid search to get initial results | |
search_results = hybrid_search(query, top_k) | |
results = [] | |
# Process each search result to get better content | |
for hit in search_results: | |
url = hit.get("url") | |
if not url: | |
continue | |
# Start with the snippet from search | |
content = hit.get("content", "") | |
title = hit.get("title", "") | |
# Try to scrape additional content if possible | |
try: | |
# Use a random user agent to avoid blocking | |
headers = { | |
"User-Agent": random.choice([ | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36", | |
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Safari/605.1.15", | |
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36", | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36 Edg/97.0.1072.62" | |
]), | |
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
"Accept-Language": "en-US,en;q=0.5", | |
"Referer": "https://www.google.com/", | |
"DNT": "1", | |
"Connection": "keep-alive" | |
} | |
# Higher timeout for better reliability | |
resp = requests.get(url, timeout=15, headers=headers) | |
# Only process if successful | |
if resp.status_code == 200: | |
soup = BeautifulSoup(resp.text, "html.parser") | |
# Try to find main content | |
main_content = soup.find('main') or soup.find('article') or soup.find('div', class_='content') | |
# If we found main content, use it | |
if main_content: | |
extracted_text = main_content.get_text(separator=" ", strip=True) | |
# Take first 200 words | |
content = " ".join(extracted_text.split()[:200]) | |
else: | |
# Otherwise use all text | |
all_text = soup.get_text(separator=" ", strip=True) | |
content = " ".join(all_text.split()[:200]) | |
# Use content from page only if it's substantial | |
if len(content) < 50: | |
content = hit.get("content", "")[:200] | |
# Random delay between 0.5-1.5 seconds to avoid rate limits | |
time.sleep(0.5 + random.random()) | |
except requests.exceptions.HTTPError as e: | |
logger.warning(f"HTTP error when scraping {url}: {e}") | |
# Keep the search snippet as a fallback | |
except requests.exceptions.RequestException as e: | |
logger.warning(f"Request error when scraping {url}: {e}") | |
# Keep the search snippet as a fallback | |
except Exception as e: | |
logger.warning(f"Unexpected error when scraping {url}: {e}") | |
# Keep the search snippet as a fallback | |
# Filter out inappropriate content | |
if any(f in content.lower() for f in _forbidden): | |
continue | |
# Add to results | |
results.append({ | |
"source": url, | |
"page": "", | |
"content": content | |
}) | |
# Return formatted search docs | |
return format_search_docs(results[:top_k]) | |
except Exception as e: | |
logger.error(f"Web search failed: {e}") | |
# Return a helpful error message | |
return format_search_docs([{ | |
"source": "Error", | |
"page": "", | |
"content": f"Search failed with error: {e}. Please try again with different search terms." | |
}]) | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# βββββββββββββββββββββββββββββββββββββββββββββββ Tool for File System βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def download_file(url: str, dest_path: str) -> str: | |
""" | |
Download a file from a given URL and save it locally. | |
Args: | |
url: The direct URL of the file to download. | |
dest_path: The local path to save the downloaded file. | |
Returns: | |
The destination path where the file was saved. | |
""" | |
r = requests.get(url, stream=True) | |
r.raise_for_status() | |
with open(dest_path, 'wb') as f: | |
for chunk in r.iter_content(8192): | |
f.write(chunk) | |
return dest_path | |
def process_excel_to_text(file_path: str) -> str: | |
""" | |
Convert an Excel file into CSV-formatted text. | |
Args: | |
file_path: Path to the Excel (.xlsx) file. | |
Returns: | |
A string of CSV-formatted content extracted from the Excel file. | |
""" | |
try: | |
# Check if file exists | |
import os | |
if not os.path.exists(file_path): | |
return f"Error: Excel file '{file_path}' does not exist." | |
# Try different engines | |
engines = ['openpyxl', 'xlrd', None] | |
for engine in engines: | |
try: | |
# For engine=None, pandas will try to auto-detect | |
if engine: | |
df = pd.read_excel(file_path, engine=engine) | |
else: | |
df = pd.read_excel(file_path) | |
return df.to_csv(index=False) | |
except Exception as e: | |
print(f"Excel engine {engine} failed: {e}") | |
last_error = e | |
continue | |
# If we got here, all engines failed | |
return f"Error processing Excel file: {str(last_error)}" | |
except Exception as e: | |
return f"Error with Excel file: {str(e)}" | |
def read_text_from_pdf(file_path: str, question: str = None) -> str: | |
""" | |
Extract text from a PDF file, chunking large documents if needed. | |
Args: | |
file_path: Path to the PDF file. | |
question: Optional question to help retrieve relevant parts of long documents. | |
Returns: | |
The extracted text content, potentially chunked if the document is large. | |
""" | |
try: | |
# Check if file exists | |
import os | |
if not os.path.exists(file_path): | |
return f"Error: PDF file '{file_path}' does not exist." | |
reader = PdfReader(file_path) | |
full_text = "\n".join([page.extract_text() or "" for page in reader.pages]) | |
# If a question is provided, use retrieval to get relevant parts | |
if question and len(full_text) > 5000: # Only chunk if text is large | |
return process_large_document(full_text, question) | |
return full_text | |
except Exception as e: | |
return f"Error reading PDF: {str(e)}" | |
def read_text_from_docx(file_path: str, question: str = None) -> str: | |
""" | |
Extract text from a DOCX (Word) document, chunking large documents if needed. | |
Args: | |
file_path: Path to the DOCX file. | |
question: Optional question to help retrieve relevant parts of long documents. | |
Returns: | |
The extracted text, potentially chunked if the document is large. | |
""" | |
try: | |
# Check if file exists | |
import os | |
if not os.path.exists(file_path): | |
return f"Error: File '{file_path}' does not exist." | |
try: | |
doc = docx.Document(file_path) | |
full_text = "\n".join([para.text for para in doc.paragraphs]) | |
except Exception as docx_err: | |
# Handle "Package not found" error specifically | |
if "Package not found" in str(docx_err): | |
# Try to read raw text if possible | |
try: | |
import zipfile | |
from xml.etree.ElementTree import XML | |
WORD_NAMESPACE = '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}' | |
PARA = WORD_NAMESPACE + 'p' | |
TEXT = WORD_NAMESPACE + 't' | |
with zipfile.ZipFile(file_path) as docx_file: | |
with docx_file.open('word/document.xml') as document: | |
tree = XML(document.read()) | |
paragraphs = [] | |
for paragraph in tree.iter(PARA): | |
texts = [node.text for node in paragraph.iter(TEXT) if node.text] | |
if texts: | |
paragraphs.append(''.join(texts)) | |
full_text = '\n'.join(paragraphs) | |
except Exception as e: | |
return f"Error reading DOCX file: {str(e)}" | |
else: | |
return f"Error reading DOCX file: {str(docx_err)}" | |
# If a question is provided, use retrieval to get relevant parts | |
if question and len(full_text) > 5000: # Only chunk if text is large | |
return process_large_document(full_text, question) | |
return full_text | |
except Exception as e: | |
return f"Error reading DOCX file: {str(e)}" | |
def transcribe_audio(file_path: str) -> str: | |
""" | |
Transcribe speech from a local audio file to text. | |
Args: | |
file_path: Path to the audio file. | |
Returns: | |
Transcribed text using Google Web Speech API. | |
""" | |
try: | |
# Check if file exists | |
import os | |
if not os.path.exists(file_path): | |
return f"Error: Audio file '{file_path}' does not exist." | |
# For non-WAV files, convert to WAV first | |
if not file_path.lower().endswith('.wav'): | |
try: | |
from pydub import AudioSegment | |
temp_wav = os.path.splitext(file_path)[0] + "_temp.wav" | |
audio = AudioSegment.from_file(file_path) | |
audio.export(temp_wav, format="wav") | |
file_path = temp_wav | |
except Exception as e: | |
return f"Failed to convert audio to WAV format: {str(e)}" | |
recognizer = sr.Recognizer() | |
with sr.AudioFile(file_path) as src: | |
audio = recognizer.record(src) | |
return recognizer.recognize_google(audio) | |
except Exception as e: | |
if "Audio file could not be read" in str(e): | |
return f"Error: Audio format not supported. Try converting to WAV, MP3, OGG, or FLAC." | |
return f"Error transcribing audio: {str(e)}" | |
def youtube_audio_processing(youtube_url: str) -> str: | |
""" | |
Download and transcribe audio from a YouTube video. | |
Args: | |
youtube_url: URL of the YouTube video. | |
Returns: | |
Transcription text extracted from the video's audio. | |
""" | |
yt = YouTube(youtube_url) | |
audio_stream = yt.streams.filter(only_audio=True).first() | |
out_file = audio_stream.download(output_path='.', filename='yt_audio') | |
wav_path = 'yt_audio.wav' | |
AudioSegment.from_file(out_file).export(wav_path, format='wav') | |
return transcribe_audio(wav_path) | |
def extract_article_text(url: str, question: str = None) -> str: | |
""" | |
Download and extract the main article content from a webpage, chunking large articles if needed. | |
Args: | |
url: The URL of the article to extract. | |
question: Optional question to help retrieve relevant parts of long articles. | |
Returns: | |
The article's textual content, potentially chunked if large. | |
""" | |
try: | |
art = Article(url) | |
art.download() | |
art.parse() | |
full_text = art.text | |
# If a question is provided, use retrieval to get relevant parts | |
if question and len(full_text) > 5000: # Only chunk if text is large | |
return process_large_document(full_text, question) | |
return full_text | |
except Exception as e: | |
return f"Error extracting article: {str(e)}" | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ Tool for ArXiv ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def arvix_search(query: str) -> Dict[str, str]: | |
""" | |
Search for academic papers on ArXiv. | |
Args: | |
query: The search term to look for in ArXiv. | |
Returns: | |
A dictionary of up to 3 relevant paper entries in JSON format. | |
""" | |
papers = ArxivLoader(query=query, load_max_docs=3).load() | |
results = [] | |
for doc in papers: | |
try: | |
# Handle different metadata formats that might be returned | |
source = doc.metadata.get("source", "ArXiv") | |
doc_id = doc.metadata.get("id", doc.metadata.get("entry_id", "")) | |
result = { | |
"source": source, | |
"id": doc_id, | |
"summary": doc.page_content[:1000] if hasattr(doc, "page_content") else str(doc)[:1000], | |
} | |
results.append(result) | |
except Exception as e: | |
# Add error information as a fallback | |
results.append({ | |
"source": "ArXiv Error", | |
"id": "error", | |
"summary": f"Error processing paper: {str(e)}" | |
}) | |
return {"arvix_results": json.dumps(results)} | |
def answer_youtube_video_question( | |
youtube_url: str, | |
question: str, | |
chunk_size_seconds: int = 30 | |
) -> str: | |
""" | |
Answer a question based on a YouTube video's transcript. | |
Args: | |
youtube_url: URL of the YouTube video. | |
question: The question to be answered using video content. | |
chunk_size_seconds: Duration of each transcript chunk. | |
Returns: | |
The answer to the question generated from the video transcript. | |
""" | |
loader = YoutubeLoader.from_youtube_url( | |
youtube_url, | |
add_video_info=True, | |
transcript_format=TranscriptFormat.CHUNKS, | |
chunk_size_seconds=chunk_size_seconds, | |
) | |
documents = loader.load() | |
embeddings = HuggingFaceEmbeddings(model_name='sentence-transformers/all-mpnet-base-v2') | |
vectorstore = FAISS.from_documents(documents, embeddings) | |
llm = RetryingChatGroq(model="deepseek-r1-distill-llama-70b", streaming=False) | |
qa_chain = RetrievalQA.from_chain_type(llm=llm, retriever=vectorstore.as_retriever()) | |
return qa_chain.run(question) | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ Tool for Python REPL tool ββββββββββββββββββββββββββββββββββββββββββββββββ | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
python_repl = PythonREPLTool() | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ Tool for Wiki ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def wiki_search(query: str) -> str: | |
""" | |
Search Wikipedia for information on a given topic. | |
Args: | |
query: The search term for Wikipedia. | |
Returns: | |
A JSON string with up to 3 summary results. | |
""" | |
# load up to top_k pages | |
pages = WikipediaLoader(query=query, load_max_docs=3).load() | |
results: List[Dict] = [] | |
for doc in pages: | |
results.append({ | |
"source": doc.metadata["source"], | |
"page": doc.metadata.get("page", ""), | |
"content": doc.page_content[:1000], # truncate if you like | |
}) | |
return {"wiki_results": format_search_docs(results)} | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# βββββββββββββββββββββββββββββββββββββ Tool for Image (understading, captioning & classification) βββββββββββββββββββββββββββββββββββββββββ | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def _load_image(img_path: str, resize_to=(512, 512)) -> Image.Image: | |
""" | |
Load, verify, convert, and resize an image. | |
Raises ValueError on failure. | |
""" | |
if not img_path: | |
raise ValueError("No image path provided.") | |
try: | |
with Image.open(img_path) as img: | |
img.verify() | |
img = Image.open(img_path).convert("RGB") | |
img = img.resize(resize_to) | |
return img | |
except UnidentifiedImageError: | |
raise ValueError(f"File at {img_path} is not a valid image.") | |
except Exception as e: | |
raise ValueError(f"Failed to load image at {img_path}: {e}") | |
def _encode_image_to_base64(img_path: str) -> str: | |
""" | |
Load an image, save optimized PNG into memory, and base64βencode it. | |
""" | |
img = _load_image(img_path) | |
buffer = BytesIO() | |
img.save(buffer, format="PNG", optimize=True) | |
return base64.b64encode(buffer.getvalue()).decode("utf-8") | |
def image_processing(prompt: str, img_path: str) -> str: | |
"""Process an image using a vision LLM, with OCR fallback. | |
Args: | |
prompt: Instruction or question related to the image. | |
img_path: Path to the image file. | |
Returns: | |
The model's response or fallback OCR result. | |
""" | |
try: | |
import os | |
# Check if file exists | |
if not os.path.exists(img_path): | |
return f"Error: Image file '{img_path}' does not exist." | |
try: | |
b64 = _encode_image_to_base64(img_path) | |
# Build a single markdown string with inline base64 image | |
md = f"{prompt}\n\n" | |
message = HumanMessage(content=md) | |
# Use RetryingChatGroq with Llama 4 Maverick for vision | |
llm = RetryingChatGroq(model="meta-llama/llama-4-maverick-17b-128e-instruct", streaming=False, temperature=0) | |
try: | |
resp = llm.invoke([message]) | |
if hasattr(resp, 'content'): | |
return resp.content.strip() | |
elif isinstance(resp, str): | |
return resp.strip() | |
else: | |
# Handle dictionary or other response types | |
return str(resp) | |
except Exception as invoke_err: | |
print(f"[LLM invoke error] {invoke_err}") | |
# Fall back to OCR | |
raise ValueError("LLM invocation failed") | |
except Exception as llama_err: | |
print(f"[LLM vision failed] {llama_err}") | |
try: | |
img = _load_image(img_path) | |
return pytesseract.image_to_string(img).strip() | |
except Exception as ocr_err: | |
print(f"[OCR fallback failed] {ocr_err}") | |
return "Unable to process the image. Please check the file and try again." | |
except Exception as e: | |
# Catch any other errors | |
print(f"[image_processing error] {e}") | |
return f"Error processing image: {str(e)}" | |
python_repl_tool = PythonREPLTool() | |
def echo(text: str) -> str: | |
"""Echo back the input text. | |
Args: | |
text: The string to be echoed. | |
Returns: | |
The same text that was provided as input. | |
""" | |
return text | |
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# βββββββββββββββββββββββββββββββββββββββββββββββ Langgraph Agent βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# Build graph function | |
from langchain_core.tools import tool | |
from langchain.chat_models import ChatOpenAI | |
from langgraph.prebuilt.chat_agent_executor import create_react_agent, AgentState | |
from langchain.chat_models import init_chat_model | |
def build_graph(provider: str = "groq"): | |
"""Construct and compile the multiβagent GAIA workflow StateGraph. | |
This graph wires together three Reactβstyle agents into a streamlined pipeline: | |
PerceptionAgent β ActionAgent β EvaluationAgent (with appropriate entry/exit points) | |
The agents have the following responsibilities: | |
- PerceptionAgent: Handles web searches, Wikipedia, ArXiv, and image processing | |
- ActionAgent: Performs calculations, file operations, and code analysis | |
- EvaluationAgent: Reviews results and ensures the final answer is properly formatted | |
Args: | |
provider: The name of the LLM provider. Must be "groq". | |
Returns: | |
CompiledGraph: A compiled LangGraph state machine ready for invocation. | |
Raises: | |
ValueError: If `provider` is anything other than "groq". | |
""" | |
try: | |
if provider != "groq": | |
raise ValueError("Invalid provider. Expected 'groq'.") | |
# Initialize LLM | |
try: | |
logger.info("Initializing LLM with model: deepseek-r1-distill-llama-70b") | |
api_key = os.getenv("GROQ_API_KEY") | |
if not api_key or api_key == "default_key_or_placeholder": | |
logger.error("GROQ_API_KEY is not set or is using placeholder value") | |
raise ValueError("GROQ_API_KEY environment variable is not set properly. Please set a valid API key.") | |
llm = RetryingChatGroq(model="deepseek-r1-distill-llama-70b", temperature=0) | |
logger.info("LLM initialized successfully") | |
except Exception as e: | |
logger.error(f"Error initializing LLM: {str(e)}") | |
raise | |
# General system message for agents | |
sys_msg = SystemMessage(content=""" | |
You are a general AI assistant. I will ask you a question. Report your thoughts, and finish your answer with the following template: | |
FINAL ANSWER: [YOUR FINAL ANSWER] | |
YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma-separated list of numbers and/or strings. | |
If you are asked for a number, don't use commas or units (e.g., $, %, kg) unless specified otherwise. | |
If you are asked for a string, don't use articles (a, an, the), and don't use abbreviations (e.g., for states). | |
If you are asked for a comma-separated list, apply the above rules to each element in the list. | |
""".strip()) | |
# Special system message for the evaluation agent with stricter formatting requirements | |
eval_sys_msg = SystemMessage(content=""" | |
You are a specialized evaluation agent. Your job is to review the work done by other agents | |
and provide a final, properly formatted answer. | |
IMPORTANT: You MUST ALWAYS format your answer using this exact template: | |
FINAL ANSWER: [concise answer] | |
Rules for formatting the answer: | |
1. The answer must be extremely concise - use as few words as possible | |
2. For numeric answers, provide only the number without units unless units are specifically requested | |
3. For text answers, avoid articles (a, an, the) and unnecessary words | |
4. For list answers, use a comma-separated format | |
5. NEVER explain your reasoning in the FINAL ANSWER section | |
6. NEVER skip the "FINAL ANSWER:" prefix | |
Example good answers: | |
FINAL ANSWER: 42 | |
FINAL ANSWER: Paris | |
FINAL ANSWER: 1912, 1945, 1989 | |
Example bad answers (don't do these): | |
- Based on my analysis, the answer is 42. | |
- I think it's Paris because that's the capital of France. | |
- The years were 1912, 1945, and 1989. | |
Remember: ALWAYS include "FINAL ANSWER:" followed by the most concise answer possible. | |
""".strip()) | |
# Define tools for each agent | |
logger.info("Setting up agent tools") | |
perception_tools = [web_search, wiki_search, news_article_search, arvix_search, image_processing, echo] | |
execution_tools = [ | |
multiply, add, subtract, divide, modulus, | |
download_file, process_excel_to_text, | |
read_text_from_pdf, read_text_from_docx, | |
transcribe_audio, youtube_audio_processing, | |
extract_article_text, answer_youtube_video_question, | |
python_repl_tool, analyze_code, read_code_file, analyze_python_function | |
] | |
# βββββββββββββββ Agent Creation βββββββββββββββ | |
logger.info("Creating agents") | |
try: | |
# Create agents with proper error handling | |
PerceptionAgent = create_react_agent( | |
model=llm, | |
tools=perception_tools, | |
prompt=sys_msg, | |
state_schema=AgentState, | |
name="PerceptionAgent" | |
) | |
logger.info("Created PerceptionAgent successfully") | |
# Combined Planning and Execution agent for better efficiency | |
ActionAgent = create_react_agent( | |
model=llm, | |
tools=execution_tools, # Has access to all execution tools | |
prompt=sys_msg, | |
state_schema=AgentState, | |
name="ActionAgent" | |
) | |
logger.info("Created ActionAgent successfully") | |
# Evaluation agent with stricter prompt | |
EvaluationAgent = create_react_agent( | |
model=llm, | |
tools=[], # No tools needed for evaluation | |
prompt=eval_sys_msg, # Use the specialized evaluation prompt | |
state_schema=AgentState, | |
name="EvaluationAgent" | |
) | |
logger.info("Created EvaluationAgent successfully") | |
except Exception as e: | |
logger.error(f"Error creating agent: {str(e)}") | |
import traceback | |
logger.error(f"Traceback: {traceback.format_exc()}") | |
raise | |
# Build the StateGraph | |
logger.info("Building StateGraph") | |
try: | |
builder = StateGraph(AgentState) | |
# Add agent nodes first | |
builder.add_node("PerceptionAgent", PerceptionAgent) | |
builder.add_node("ActionAgent", ActionAgent) | |
builder.add_node("EvaluationAgent", EvaluationAgent) | |
# Define the flow with a starting edge | |
builder.set_entry_point("PerceptionAgent") | |
# Add the edges for the simpler linear flow | |
builder.add_edge("PerceptionAgent", "ActionAgent") | |
builder.add_edge("ActionAgent", "EvaluationAgent") | |
# Set EvaluationAgent as the end node | |
builder.set_finish_point("EvaluationAgent") | |
logger.info("Compiling StateGraph") | |
return builder.compile() | |
except Exception as e: | |
logger.error(f"Error building graph: {str(e)}") | |
import traceback | |
logger.error(f"Traceback: {traceback.format_exc()}") | |
raise | |
except Exception as e: | |
logger.error(f"Overall error in build_graph: {str(e)}") | |
import traceback | |
logger.error(f"Traceback: {traceback.format_exc()}") | |
raise | |
def get_final_answer(text): | |
"""Extract just the FINAL ANSWER from the model's response. | |
Args: | |
text: The full text response from the LLM | |
Returns: | |
str: The extracted answer without the "FINAL ANSWER:" prefix | |
""" | |
# Log the raw text for debugging if needed | |
logger.debug(f"Extracting answer from: {text[:200]}...") | |
if not text: | |
logger.warning("Empty response received") | |
return "No answer provided." | |
# Method 1: Look for "FINAL ANSWER:" with most comprehensive pattern matching | |
pattern = r'(?:^|\n)FINAL ANSWER:\s*(.*?)(?:\n\s*$|$)' | |
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE) | |
if match: | |
# Return just the answer part, cleaned up | |
logger.debug("Found answer using pattern 1") | |
return match.group(1).strip() | |
# Method 2: Try looking for variations on the final answer format | |
for variant in ["FINAL ANSWER:", "FINAL_ANSWER:", "Final Answer:", "Answer:"]: | |
lines = text.split('\n') | |
for i, line in enumerate(reversed(lines)): | |
if variant in line: | |
# Extract everything after the variant text | |
logger.debug(f"Found answer using variant: {variant}") | |
answer = line[line.find(variant) + len(variant):].strip() | |
if answer: | |
return answer | |
# If the answer is on the next line, return that | |
if i > 0: | |
next_line = lines[len(lines) - i] | |
if next_line.strip(): | |
return next_line.strip() | |
# Method 3: Look for phrases that suggest an answer | |
for phrase in ["The answer is", "The result is", "We get", "Therefore,", "In conclusion,"]: | |
phrase_pos = text.find(phrase) | |
if phrase_pos != -1: | |
# Try to extract everything after the phrase until the end of the sentence | |
sentence_end = text.find(".", phrase_pos) | |
if sentence_end != -1: | |
logger.debug(f"Found answer using phrase: {phrase}") | |
return text[phrase_pos + len(phrase):sentence_end].strip() | |
# Method 4: Fall back to taking the last paragraph with actual content | |
paragraphs = text.strip().split('\n\n') | |
for para in reversed(paragraphs): | |
para = para.strip() | |
if para and not para.startswith("I ") and not para.lower().startswith("to "): | |
logger.debug("Using last meaningful paragraph") | |
# If paragraph is very long, try to extract a concise answer | |
if len(para) > 100: | |
sentences = re.split(r'[.!?]', para) | |
for sentence in reversed(sentences): | |
sent = sentence.strip() | |
if sent and len(sent) > 5 and not sent.startswith("I "): | |
return sent | |
return para | |
# Method 5: Last resort - just return the last line with content | |
lines = text.strip().split('\n') | |
for line in reversed(lines): | |
line = line.strip() | |
if line and len(line) > 3: | |
logger.debug("Using last line with content") | |
return line | |
# If everything fails, warn and return the truncated response | |
logger.warning("Could not find a properly formatted answer") | |
return text[:100] + "..." if len(text) > 100 else text | |
# test | |
if __name__ == "__main__": | |
question = "When was a picture of St. Thomas Aquinas first added to the Wikipedia page on the Principle of double effect?" | |
# Build the graph | |
graph = build_graph(provider="groq") | |
# Run the graph | |
messages = [HumanMessage(content=question)] | |
messages = graph.invoke({"messages": messages}) | |
for m in messages["messages"]: | |
m.pretty_print() | |
# βββββββββββββββββββββββββββββββββββββββββββββββ Tool for Code Analysis βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def analyze_code(code_string: str) -> str: | |
"""Analyze a string of code to understand its structure, functionality, and potential issues. | |
Args: | |
code_string: The code to analyze as a string. | |
Returns: | |
A structured analysis of the code including functions, classes, and key operations. | |
""" | |
try: | |
import ast | |
# Try to parse with Python's AST module | |
try: | |
parsed = ast.parse(code_string) | |
# Extract functions and classes | |
functions = [node.name for node in ast.walk(parsed) if isinstance(node, ast.FunctionDef)] | |
classes = [node.name for node in ast.walk(parsed) if isinstance(node, ast.ClassDef)] | |
imports = [node.names[0].name for node in ast.walk(parsed) if isinstance(node, ast.Import)] | |
imports.extend([f"{node.module}.{name.name}" if node.module else name.name | |
for node in ast.walk(parsed) if isinstance(node, ast.ImportFrom) | |
for name in node.names]) | |
# Count various node types for complexity assessment | |
num_loops = len([node for node in ast.walk(parsed) | |
if isinstance(node, (ast.For, ast.While))]) | |
num_conditionals = len([node for node in ast.walk(parsed) | |
if isinstance(node, (ast.If, ast.IfExp))]) | |
analysis = { | |
"language": "Python", | |
"functions": functions, | |
"classes": classes, | |
"imports": imports, | |
"complexity": { | |
"functions": len(functions), | |
"classes": len(classes), | |
"loops": num_loops, | |
"conditionals": num_conditionals | |
} | |
} | |
return str(analysis) | |
except SyntaxError: | |
# If not valid Python, try some simple pattern matching | |
if "{" in code_string and "}" in code_string: | |
if "function" in code_string or "=>" in code_string: | |
language = "JavaScript/TypeScript" | |
elif "func" in code_string or "struct" in code_string: | |
language = "Go or Rust" | |
elif "public" in code_string or "private" in code_string or "class" in code_string: | |
language = "Java/C#/C++" | |
else: | |
language = "Unknown C-like language" | |
elif "<" in code_string and ">" in code_string and ("/>" in code_string or "</"): | |
language = "HTML/XML/JSX" | |
else: | |
language = "Unknown" | |
return f"Non-Python code detected ({language}). Basic code structure analysis not available." | |
except Exception as e: | |
return f"Error analyzing code: {str(e)}" | |
def read_code_file(file_path: str) -> str: | |
"""Read a code file and return its contents with proper syntax detection. | |
Args: | |
file_path: Path to the code file. | |
Returns: | |
The file contents and detected language. | |
""" | |
try: | |
# Check if file exists | |
import os | |
if not os.path.exists(file_path): | |
return f"Error: File '{file_path}' does not exist." | |
with open(file_path, 'r', encoding='utf-8') as f: | |
content = f.read() | |
# Try to detect language from extension | |
ext = os.path.splitext(file_path)[1].lower() | |
language_map = { | |
'.py': 'Python', | |
'.js': 'JavaScript', | |
'.ts': 'TypeScript', | |
'.html': 'HTML', | |
'.css': 'CSS', | |
'.java': 'Java', | |
'.c': 'C', | |
'.cpp': 'C++', | |
'.cs': 'C#', | |
'.go': 'Go', | |
'.rs': 'Rust', | |
'.php': 'PHP', | |
'.rb': 'Ruby', | |
'.sh': 'Shell', | |
'.bat': 'Batch', | |
'.ps1': 'PowerShell', | |
'.sql': 'SQL', | |
'.json': 'JSON', | |
'.xml': 'XML', | |
'.yaml': 'YAML', | |
'.yml': 'YAML', | |
} | |
language = language_map.get(ext, 'Unknown') | |
return f"File content ({language}):\n\n{content}" | |
except Exception as e: | |
return f"Error reading file: {str(e)}" | |
def analyze_python_function(function_name: str, code_string: str) -> str: | |
"""Extract and analyze a specific function from Python code. | |
Args: | |
function_name: The name of the function to analyze. | |
code_string: The complete code containing the function. | |
Returns: | |
Analysis of the function including parameters, return type, and docstring. | |
""" | |
try: | |
import ast | |
import inspect | |
from types import CodeType, FunctionType | |
# Parse the code string | |
parsed = ast.parse(code_string) | |
# Find the function definition | |
function_def = None | |
for node in ast.walk(parsed): | |
if isinstance(node, ast.FunctionDef) and node.name == function_name: | |
function_def = node | |
break | |
if not function_def: | |
return f"Function '{function_name}' not found in the provided code." | |
# Extract parameters | |
params = [] | |
for arg in function_def.args.args: | |
param_name = arg.arg | |
# Get annotation if it exists | |
if arg.annotation: | |
if isinstance(arg.annotation, ast.Name): | |
param_type = arg.annotation.id | |
elif isinstance(arg.annotation, ast.Attribute): | |
param_type = f"{arg.annotation.value.id}.{arg.annotation.attr}" | |
else: | |
param_type = "complex_type" | |
params.append(f"{param_name}: {param_type}") | |
else: | |
params.append(param_name) | |
# Extract return type if it exists | |
return_type = None | |
if function_def.returns: | |
if isinstance(function_def.returns, ast.Name): | |
return_type = function_def.returns.id | |
elif isinstance(function_def.returns, ast.Attribute): | |
return_type = f"{function_def.returns.value.id}.{function_def.returns.attr}" | |
else: | |
return_type = "complex_return_type" | |
# Extract docstring | |
docstring = ast.get_docstring(function_def) | |
# Create a summary | |
summary = { | |
"function_name": function_name, | |
"parameters": params, | |
"return_type": return_type, | |
"docstring": docstring, | |
"decorators": [d.id if isinstance(d, ast.Name) else "complex_decorator" for d in function_def.decorator_list], | |
"line_count": len(function_def.body) | |
} | |
# Create a more explicit string representation that ensures key terms are included | |
result = f"Function '{function_name}' analysis:\n" | |
result += f"- Parameters: {', '.join(params)}\n" | |
result += f"- Return type: {return_type or 'None specified'}\n" | |
result += f"- Docstring: {docstring or 'None'}\n" | |
result += f"- Line count: {len(function_def.body)}" | |
return result | |
except Exception as e: | |
return f"Error analyzing function: {str(e)}" | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# βββββββββββββββββββββββββββββββββββββββββββββββ Tool for News Article Retrieval ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def news_article_search(query: str, top_k: int = 3) -> Dict[str, str]: | |
"""Search for and retrieve news articles with robust error handling for news sites. | |
Args: | |
query: The news topic or keywords to search for. | |
top_k: Maximum number of articles to retrieve. | |
Returns: | |
A dictionary with search results formatted as XML-like document entries. | |
""" | |
# First, get URLs from DuckDuckGo with "news" focus | |
results = [] | |
news_sources = [ | |
"bbc.com", "reuters.com", "apnews.com", "nasa.gov", | |
"space.com", "universetoday.com", "nature.com", "science.org", | |
"scientificamerican.com", "nytimes.com", "theguardian.com" | |
] | |
# Find news from reliable sources | |
try: | |
with DDGS() as ddgs: | |
search_query = f"{query} site:{' OR site:'.join(news_sources)}" | |
for hit in ddgs.text(search_query, safesearch="On", max_results=top_k*2): | |
url = hit.get("href") or hit.get("url", "") | |
if not url: | |
continue | |
# Add the search snippet first as a fallback | |
result = { | |
"source": url, | |
"page": "", | |
"content": hit.get("body", "")[:250], | |
"title": hit.get("title", "") | |
} | |
# Try to get better content via a more robust method | |
try: | |
headers = { | |
"User-Agent": random.choice([ | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36", | |
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Safari/605.1.15", | |
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36" | |
]), | |
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
"Accept-Language": "en-US,en;q=0.5", | |
"Referer": "https://www.google.com/", | |
"DNT": "1", | |
"Connection": "keep-alive", | |
"Upgrade-Insecure-Requests": "1" | |
} | |
# Add a short delay between requests | |
time.sleep(1 + random.random()) | |
# Try to use newspaper3k for more reliable article extraction | |
from newspaper import Article | |
article = Article(url) | |
article.download() | |
article.parse() | |
# If we got meaningful content, update the result | |
if article.text and len(article.text) > 100: | |
# Get a summary - first paragraph + some highlights | |
paragraphs = article.text.split('\n\n') | |
first_para = paragraphs[0] if paragraphs else "" | |
summary = first_para[:300] | |
if len(paragraphs) > 1: | |
summary += "... " + paragraphs[1][:200] | |
result["content"] = summary | |
if article.title: | |
result["title"] = article.title | |
except Exception as article_err: | |
logger.warning(f"Article extraction failed for {url}: {article_err}") | |
# Fallback to simple requests-based extraction | |
try: | |
resp = requests.get(url, timeout=12, headers=headers) | |
resp.raise_for_status() | |
soup = BeautifulSoup(resp.text, "html.parser") | |
# Try to get main content | |
main_content = soup.find('main') or soup.find('article') or soup.find('div', class_='content') | |
if main_content: | |
content = " ".join(main_content.get_text(separator=" ", strip=True).split()[:250]) | |
result["content"] = content | |
except Exception as req_err: | |
logger.warning(f"Fallback extraction failed for {url}: {req_err}") | |
# Keep the original snippet as fallback | |
results.append(result) | |
if len(results) >= top_k: | |
break | |
except Exception as e: | |
logger.error(f"News search failed: {e}") | |
return format_search_docs([{ | |
"source": "Error", | |
"page": "", | |
"content": f"Failed to retrieve news articles for '{query}': {str(e)}" | |
}]) | |
if not results: | |
# Fallback to regular web search | |
logger.info(f"No news results found, falling back to web_search for {query}") | |
return web_search(query, top_k) | |
return format_search_docs(results[:top_k]) | |
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ Document Chunking Utilities ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def chunk_document(text: str, chunk_size: int = 1000, overlap: int = 100) -> List[str]: | |
""" | |
Split a large document into smaller chunks with overlap to maintain context across chunks. | |
Args: | |
text: The document text to split into chunks | |
chunk_size: Maximum size of each chunk in characters | |
overlap: Number of characters to overlap between chunks | |
Returns: | |
List of text chunks | |
""" | |
# If text is smaller than chunk_size, return it as is | |
if len(text) <= chunk_size: | |
return [text] | |
chunks = [] | |
start = 0 | |
while start < len(text): | |
# Get chunk with overlap | |
end = min(start + chunk_size, len(text)) | |
# Try to find sentence boundary for cleaner breaks | |
if end < len(text): | |
# Look for sentence endings: period, question mark, or exclamation followed by space | |
for sentence_end in ['. ', '? ', '! ']: | |
last_period = text[start:end].rfind(sentence_end) | |
if last_period != -1: | |
end = start + last_period + 2 # +2 to include the period and space | |
break | |
# Add chunk to list | |
chunks.append(text[start:end]) | |
# Move start position, accounting for overlap | |
start = end - overlap if end < len(text) else len(text) | |
return chunks | |
# Document processing utility that uses chunking | |
def process_large_document(text: str, question: str, llm=None) -> str: | |
""" | |
Process a large document by chunking it and using retrieval to find relevant parts. | |
Args: | |
text: The document text to process | |
question: The question being asked about the document | |
llm: Optional language model to use (defaults to agent's LLM) | |
Returns: | |
Summarized answer based on relevant chunks | |
""" | |
if not llm: | |
llm = RetryingChatGroq(model="deepseek-r1-distill-llama-70b", streaming=False, temperature=0) | |
# Split document into chunks | |
chunks = chunk_document(text) | |
# If document is small enough, don't bother with retrieval | |
if len(chunks) <= 1: | |
return text | |
# For larger documents, create embeddings to find relevant chunks | |
try: | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from langchain.vectorstores import FAISS | |
from langchain.schema import Document | |
# Create documents with chunk content | |
documents = [Document(page_content=chunk, metadata={"chunk_id": i}) for i, chunk in enumerate(chunks)] | |
# Create embeddings and vector store | |
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2") | |
vectorstore = FAISS.from_documents(documents, embeddings) | |
# Get most relevant chunks | |
relevant_chunks = vectorstore.similarity_search(question, k=2) # Get top 2 most relevant chunks | |
# Join the relevant chunks | |
relevant_text = "\n\n".join([doc.page_content for doc in relevant_chunks]) | |
# Option 1: Return relevant chunks directly | |
return relevant_text | |
# Option 2: Summarize with LLM (commented out for now) | |
# prompt = f"Using only the following information, answer the question: '{question}'\n\nInformation:\n{relevant_text}" | |
# response = llm.invoke([HumanMessage(content=prompt)]) | |
# return response.content | |
except Exception as e: | |
# Fall back to first chunk if retrieval fails | |
logger.warning(f"Retrieval failed: {e}. Falling back to first chunk.") | |
return chunks[0] |