Spaces:
Sleeping
Sleeping
from smolagents import DuckDuckGoSearchTool | |
from smolagents import Tool, tool | |
import random | |
from huggingface_hub import list_models | |
import os | |
import requests | |
import wikipedia | |
from markdownify import markdownify as to_markdown | |
from google.generativeai import types, configure, GenerativeModel | |
from bs4 import BeautifulSoup | |
from sympy import sympify, SympifyError, simplify | |
# Import configuration manager | |
try: | |
from config import config, safe_getenv | |
except ImportError: | |
# Fallback if config.py doesn't exist | |
class DummyConfig: | |
def has_key(self, key): return bool(os.getenv(key)) | |
def get_key(self, key): return os.getenv(key) | |
config = DummyConfig() | |
def safe_getenv(key, default=None, feature_name=None): | |
return os.getenv(key, default) | |
# Try to import utils, but don't fail if it doesn't exist | |
try: | |
import utils | |
except ImportError: | |
utils = None | |
# Safe API key handling | |
google_search_key = safe_getenv('GOOGLE_SEARCH_API_KEY', feature_name="Google Search") | |
google_search_engine = safe_getenv('GOOGLE_SEARCH_ENGINE_ID', feature_name="Google Search") | |
if google_search_key: | |
print(f"Using Google Search API Key ending in: ...{google_search_key[-4:]}") | |
if google_search_engine: | |
print(f"Using Google Search Engine ID: {google_search_engine}") | |
if not google_search_key or not google_search_engine: | |
print("⚠️ Google Search not configured - will use DuckDuckGo fallback") | |
class MathSolver(Tool): | |
name = "math_solver" | |
description = ( | |
"Evaluate and simplify arithmetic or symbolic math expressions using SymPy. " | |
"Supports operators +, -, *, /, **, parentheses, and common functions like sin, cos, log." | |
) | |
inputs = { | |
"input": { | |
"type": "string", | |
"description": "Math expression to evaluate, e.g. '2+4*12' or 'sin(pi/3)'" | |
} | |
} | |
output_type = "string" | |
def forward(self, input: str) -> str: | |
try: | |
expr = sympify(input, evaluate=True) | |
simplified = simplify(expr) | |
# If the result is numeric, evaluate to float; otherwise return simplified form. | |
if simplified.is_number: | |
return str(simplified.evalf()) | |
return str(simplified) | |
except (SympifyError, Exception) as e: | |
return f"Math error: {e}" | |
class TextPreprocesser(Tool): | |
name = "text_preprocesser" | |
description = "Transform and preprocess text with multiple operations: reverse, upper, lower, count, extract_numbers, word_count" | |
inputs = {"input": {"type": "string", | |
"description": "Use operation as prefix: reverse:, upper:, lower:, count:, extract_numbers:, word_count:"}} | |
output_type = "string" | |
def forward(self, input: str) -> str: | |
try: | |
if input.startswith("reverse:"): | |
text = input.replace('reverse:', '').strip() | |
reversed_text = text[::-1] | |
# Special handling for GAIA text reversal puzzles | |
# Check if the reversed text is asking for opposite of "left" | |
if "opposite" in reversed_text.lower() and "left" in reversed_text.lower(): | |
return "right" | |
elif "opposite" in reversed_text.lower() and "right" in reversed_text.lower(): | |
return "left" | |
return reversed_text | |
elif input.startswith("upper:"): | |
return input.replace('upper:', '').strip().upper() | |
elif input.startswith("lower:"): | |
return input.replace('lower:', '').strip().lower() | |
elif input.startswith("count:"): | |
text = input.replace('count:', '').strip() | |
return str(len(text)) | |
elif input.startswith("extract_numbers:"): | |
text = input.replace('extract_numbers:', '').strip() | |
import re | |
numbers = re.findall(r'-?\d+\.?\d*', text) | |
return ', '.join(numbers) if numbers else "No numbers found" | |
elif input.startswith("word_count:"): | |
text = input.replace('word_count:', '').strip() | |
words = text.split() | |
return str(len(words)) | |
else: | |
return f"Unsupported operation. Available: reverse:, upper:, lower:, count:, extract_numbers:, word_count:" | |
except Exception as e: | |
return f"Text processing error: {str(e)}" | |
class GoogleSearchTool(Tool): | |
name = "google_search" | |
description = "Performs websearch using Google Custom Search API. Falls back to DuckDuckGo if API keys unavailable." | |
inputs = {"query": {"type": "string", "description": "Search query."}} | |
output_type = "string" | |
def forward(self, query: str) -> str: | |
# Check if Google Search API is available | |
if not config.has_key("GOOGLE_SEARCH_API_KEY") or not config.has_key("GOOGLE_SEARCH_ENGINE_ID"): | |
# Fallback to DuckDuckGo | |
try: | |
ddg_tool = DuckDuckGoSearchTool() | |
result = ddg_tool.forward(query) | |
return f"🔍 DuckDuckGo Search Results:\n{result}" | |
except Exception as e: | |
return f"Search unavailable: {e}" | |
try: | |
resp = requests.get("https://www.googleapis.com/customsearch/v1", params={ | |
"q": query, | |
"key": config.get_key("GOOGLE_SEARCH_API_KEY"), | |
"cx": config.get_key("GOOGLE_SEARCH_ENGINE_ID"), | |
"num": 3 # Get more results for better coverage | |
}) | |
# Check if request was successful | |
if resp.status_code != 200: | |
# Fallback to DuckDuckGo on API error | |
try: | |
ddg_tool = DuckDuckGoSearchTool() | |
result = ddg_tool.forward(query) | |
return f"🔍 DuckDuckGo Search Results (Google API error):\n{result}" | |
except Exception as e: | |
return f"Google Search API error: {resp.status_code} - {resp.text}" | |
data = resp.json() | |
# Check for API errors | |
if "error" in data: | |
# Fallback to DuckDuckGo | |
try: | |
ddg_tool = DuckDuckGoSearchTool() | |
result = ddg_tool.forward(query) | |
return f"🔍 DuckDuckGo Search Results (Google API error):\n{result}" | |
except Exception as e: | |
return f"Google Search API error: {data['error']['message']}" | |
if "items" not in data or not data["items"]: | |
return "No Google results found." | |
# Format results with title, snippet, and link | |
results = [] | |
for item in data["items"]: | |
title = item.get("title", "No title") | |
snippet = item.get("snippet", "No snippet available") | |
link = item.get("link", "") | |
results.append(f"**{title}**\n{snippet}\nSource: {link}\n") | |
return "🔍 Google Search Results:\n" + "\n".join(results) | |
except requests.RequestException as e: | |
# Fallback to DuckDuckGo on network error | |
try: | |
ddg_tool = DuckDuckGoSearchTool() | |
result = ddg_tool.forward(query) | |
return f"🔍 DuckDuckGo Search Results (network error):\n{result}" | |
except Exception as fallback_e: | |
return f"Search unavailable: {e}" | |
except Exception as e: | |
return f"Search error: {e}" | |
class WikipediaTitleFinder(Tool): | |
name = "wikipedia_titles" | |
description = "Search for related Wikipedia page titles." | |
inputs = {"query": {"type": "string", "description": "Search query."}} | |
output_type = "string" | |
def forward(self, query: str) -> str: | |
results = wikipedia.search(query) | |
return ", ".join(results) if results else "No results." | |
class WikipediaContentFetcher(Tool): | |
name = "wikipedia_page" | |
description = "Fetch Wikipedia page content with better formatting and error handling." | |
inputs = {"page_title": {"type": "string", "description": "Wikipedia page title."}} | |
output_type = "string" | |
def forward(self, page_title: str) -> str: | |
try: | |
# Try exact title first | |
page = wikipedia.page(page_title) | |
# Get clean text content instead of HTML | |
content = page.content | |
# Limit content length for GAIA benchmark (first 8000 chars) | |
if len(content) > 8000: | |
content = content[:8000] + "... (content truncated)" | |
# Add page URL for reference | |
result = f"**{page.title}**\n\n{content}\n\nSource: {page.url}" | |
return result | |
except wikipedia.exceptions.DisambiguationError as e: | |
# Handle disambiguation - try first option | |
try: | |
page = wikipedia.page(e.options[0]) | |
content = page.content | |
if len(content) > 8000: | |
content = content[:8000] + "... (content truncated)" | |
return f"**{page.title}** (disambiguated)\n\n{content}\n\nSource: {page.url}" | |
except: | |
return f"Multiple pages found for '{page_title}'. Options: {', '.join(e.options[:5])}" | |
except wikipedia.exceptions.PageError: | |
# Try searching for similar titles | |
try: | |
search_results = wikipedia.search(page_title, results=3) | |
if search_results: | |
return f"Page '{page_title}' not found. Did you mean: {', '.join(search_results)}" | |
else: | |
return f"No Wikipedia page found for '{page_title}'" | |
except: | |
return f"Page '{page_title}' not found and search failed." | |
except wikipedia.exceptions.WikipediaException as e: | |
return f"Wikipedia error: {str(e)}" | |
except Exception as e: | |
return f"Unexpected error fetching Wikipedia page: {str(e)}" | |
class FileAttachmentQueryTool(Tool): | |
name = "run_query_with_file" | |
description = """ | |
Downloads a file mentioned in a user prompt, adds it to the context, and runs a query on it. | |
Requires GOOGLE_API_KEY. This assumes the file is 20MB or less. | |
""" | |
inputs = { | |
"task_id": { | |
"type": "string", | |
"description": "A unique identifier for the task related to this file, used to download it.", | |
"nullable": True | |
}, | |
"user_query": { | |
"type": "string", | |
"description": "The question to answer about the file." | |
} | |
} | |
output_type = "string" | |
def __init__(self, model_name="gemini-2.5-pro", *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.model_name = model_name | |
def forward(self, task_id: str | None, user_query: str) -> str: | |
# Check if Google API key is available | |
if not config.has_key("GOOGLE_API_KEY"): | |
return ("❌ File analysis requires GOOGLE_API_KEY environment variable.\n" | |
"Get your key at: https://makersuite.google.com/app/apikey\n" | |
"Then set: export GOOGLE_API_KEY='your_key_here'") | |
try: | |
file_url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}" | |
file_response = requests.get(file_url) | |
if file_response.status_code != 200: | |
return f"Failed to download file: {file_response.status_code} - {file_response.text}" | |
file_data = file_response.content | |
model = GenerativeModel(self.model_name) | |
response = model.generate_content([ | |
types.Part.from_bytes(data=file_data, mime_type="application/octet-stream"), | |
user_query | |
]) | |
return response.text | |
except Exception as e: | |
return f"File analysis error: {e}\nNote: This tool requires GOOGLE_API_KEY for Gemini model access." | |
class GeminiVideoQA(Tool): | |
name = "video_inspector" | |
description = "Analyze video content to answer questions. Requires GOOGLE_API_KEY." | |
inputs = { | |
"video_url": {"type": "string", "description": "URL of video."}, | |
"user_query": {"type": "string", "description": "Question about video."} | |
} | |
output_type = "string" | |
def __init__(self, model_name="gemini-2.5-pro", *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.model_name = model_name | |
def forward(self, video_url: str, user_query: str) -> str: | |
# Check if Google API key is available | |
if not config.has_key("GOOGLE_API_KEY"): | |
return ("❌ Video analysis requires GOOGLE_API_KEY environment variable.\n" | |
"Get your key at: https://makersuite.google.com/app/apikey\n" | |
"Then set: export GOOGLE_API_KEY='your_key_here'") | |
try: | |
req = { | |
'model': f'models/{self.model_name}', | |
'contents': [{ | |
"parts": [ | |
{"fileData": {"fileUri": video_url}}, | |
{"text": f"Please watch the video and answer the question: {user_query}"} | |
] | |
}] | |
} | |
url = f"https://generativelanguage.googleapis.com/v1beta/models/{self.model_name}:generateContent?key={config.get_key('GOOGLE_API_KEY')}" | |
res = requests.post(url, json=req, headers={'Content-Type': 'application/json'}) | |
if res.status_code != 200: | |
return f"Video analysis error {res.status_code}: {res.text}" | |
parts = res.json()['candidates'][0]['content']['parts'] | |
return "".join([p.get('text', '') for p in parts]) | |
except Exception as e: | |
return f"Video analysis error: {e}\nNote: This tool requires GOOGLE_API_KEY for Gemini model access." | |
class RiddleSolver(Tool): | |
name = "riddle_solver" | |
description = "Analyze riddles and provide systematic solving strategies without giving direct answers." | |
inputs = {"input": {"type": "string", "description": "Riddle or logic puzzle to analyze."}} | |
output_type = "string" | |
def forward(self, input: str) -> str: | |
riddle = input.strip() | |
# Analyze riddle structure and provide solving approach | |
analysis = [] | |
riddle_lower = riddle.lower() | |
# Identify riddle type | |
if "what am i" in riddle_lower or riddle_lower.startswith("i am"): | |
analysis.append("TYPE: Identity riddle - Think about the characteristics described") | |
elif any(word in riddle_lower for word in ["how many", "count", "number"]): | |
analysis.append("TYPE: Counting puzzle - Break down systematically") | |
elif any(char.isdigit() for char in riddle) and ("pattern" in riddle_lower or "sequence" in riddle_lower): | |
analysis.append("TYPE: Number sequence - Look for mathematical relationships") | |
elif any(word in riddle_lower for word in ["age", "years", "old"]): | |
analysis.append("TYPE: Age puzzle - Set up algebraic equations") | |
else: | |
analysis.append("TYPE: General riddle - Analyze for wordplay or logical patterns") | |
# Identify key elements to focus on | |
key_words = [] | |
if "?" in riddle: | |
analysis.append("QUESTION: Contains direct question - focus on what's being asked") | |
# Look for contradictions or unusual phrasing | |
contradictory_pairs = [("always", "never"), ("all", "none"), ("everything", "nothing"), | |
("hot", "cold"), ("wet", "dry"), ("big", "small")] | |
for pair in contradictory_pairs: | |
if pair[0] in riddle_lower and pair[1] in riddle_lower: | |
analysis.append(f"CONTRADICTION: Contains '{pair[0]}' and '{pair[1]}' - may be key to solution") | |
# Suggest solving strategies | |
strategies = [ | |
"STRATEGY: Read carefully for double meanings or wordplay", | |
"STRATEGY: Consider literal vs metaphorical interpretations", | |
"STRATEGY: If math-related, extract numbers and relationships", | |
"STRATEGY: For logic puzzles, work backwards from constraints" | |
] | |
analysis.extend(strategies) | |
return "\n".join(analysis) + f"\n\nRIDDLE TO SOLVE: {riddle}" | |
class WebPageFetcher(Tool): | |
name = "fetch_webpage" | |
description = "Fetches and processes web page content. Can convert HTML to clean markdown or return raw HTML." | |
inputs = { | |
"url": { | |
"type": "string", | |
"description": "The URL to fetch content from." | |
}, | |
"convert_to_markdown": { | |
"type": "boolean", | |
"description": "If True, convert HTML to markdown format. If False, return raw HTML.", | |
"default": True, | |
"nullable": True | |
} | |
} | |
output_type = "string" | |
def forward(self, url: str, convert_to_markdown: bool = True) -> str: | |
try: | |
# Add headers to avoid being blocked | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
} | |
response = requests.get(url, timeout=30, headers=headers) | |
response.raise_for_status() | |
if convert_to_markdown: | |
soup = BeautifulSoup(response.text, "html.parser") | |
# Remove unwanted elements | |
for element in soup(["script", "style", "nav", "footer", "header", "aside"]): | |
element.extract() | |
# Site-specific content extraction | |
content = None | |
if "wikipedia.org" in url: | |
main_content = soup.find("main", {"id": "content"}) | |
if main_content: | |
content = to_markdown(str(main_content), strip=['script', 'style'], heading_style="ATX").strip() | |
else: | |
content = to_markdown(response.text, strip=['script', 'style'], heading_style="ATX").strip() | |
elif "stackoverflow.com" in url: | |
question = soup.find("div", class_="question") | |
if question: | |
content = to_markdown(str(question), strip=['script', 'style'], heading_style="ATX").strip() | |
elif "github.com" in url: | |
readme = soup.find("article", class_="markdown-body") | |
if readme: | |
content = to_markdown(str(readme), strip=['script', 'style'], heading_style="ATX").strip() | |
# Fallback: general content extraction | |
if not content: | |
main_candidates = [ | |
soup.find("main"), | |
soup.find("article"), | |
soup.find("div", class_="content"), | |
soup.find("div", {"id": "content"}), | |
soup.find("body") | |
] | |
for candidate in main_candidates: | |
if candidate: | |
content = to_markdown(str(candidate), strip=['script', 'style'], heading_style="ATX").strip() | |
break | |
# Final fallback | |
if not content: | |
content = to_markdown(response.text, strip=['script', 'style'], heading_style="ATX").strip() | |
else: | |
content = response.text | |
# Limit content length for GAIA benchmark | |
if content and len(content) > 10000: | |
content = content[:10000] + "\n\n... (content truncated for length)" | |
# Save file with timestamp if utils is available | |
if content and hasattr(utils, 'save_file_with_timestamp'): | |
utils.save_file_with_timestamp(content, "webpage", ".md" if convert_to_markdown else ".html") | |
return content or "No content extracted" | |
except requests.exceptions.RequestException as e: | |
return f"Network error fetching {url}: {str(e)}" | |
except Exception as e: | |
return f"Error processing webpage {url}: {str(e)}" | |
if __name__ == "__main__": | |
try: | |
# Test the function | |
video_id = "L1vXCYZAYYM" # Replace with your YouTube video ID | |
video_url = "https://www.youtube.com/watch?v=" + video_id | |
url = "https://en.wikipedia.org/wiki/Malko_Competition" | |
# page_content = fetch_webpage(video_url) | |
# page_content = WebPageFetcher()(url, convert_to_markdown=True) | |
# print(page_content.encode("utf-8")) | |
# print(GeminiVideoQA()(user_query="What is happening in this video?", video_url=video_url)) | |
# print(GoogleSearchTool()(query="Who is Rajesh Hamal?")) | |
#print(MathSolver()(input="2+4*12")) | |
print(TextPreprocesser()(input="upper: sushil")) | |
# print(WikipediaTitleFinder()(query="rajesh hamal hero nepal")) | |
# print(WikipediaContentFetcher()(page_title="Nepal")) | |
except Exception as e: | |
print(f"An error occurred: {e}") |