import os import json import logging from typing import Dict, List, Any, Optional, Union from datetime import datetime import asyncio import base64 from io import BytesIO import google.generativeai as genai from google.generativeai.types import HarmCategory, HarmBlockThreshold from PIL import Image import pandas as pd import numpy as np import requests from duckduckgo_search import DDGS import tempfile from pathlib import Path # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class GAIAQuestion: """GAIA benchmark question structure""" def __init__(self, question_id: str, question: str, level: int, final_answer: Optional[str] = None, file_name: Optional[str] = None, file_path: Optional[str] = None, annotator_metadata: Optional[Dict] = None): self.question_id = question_id self.question = question self.level = level self.final_answer = final_answer self.file_name = file_name self.file_path = file_path self.annotator_metadata = annotator_metadata class GeminiTool: """Base class for Gemini agent tools""" def __init__(self, name: str, description: str): self.name = name self.description = description def execute(self, input_data: str) -> str: raise NotImplementedError class CalculatorTool(GeminiTool): """Advanced calculator tool for mathematical operations""" def __init__(self): super().__init__( name="calculator", description=""" Performs mathematical calculations including: - Basic arithmetic (+, -, *, /, %) - Advanced math (sqrt, log, sin, cos, tan, exp, etc.) - Financial calculations (compound interest, annuities, etc.) - Statistical operations (mean, median, std, etc.) Examples: - "sqrt(144)" → 12 - "log(100)" → 2.0 (base 10) - "sin(pi/2)" → 1.0 - "compound_interest(1000, 0.05, 3)" → compound interest calculation """ ) def execute(self, expression: str) -> str: try: import math import statistics # Enhanced safe evaluation environment safe_dict = { "__builtins__": {}, # Basic operations "abs": abs, "round": round, "min": min, "max": max, "sum": sum, "pow": pow, "divmod": divmod, # Math functions "sqrt": math.sqrt, "log": math.log, "log10": math.log10, "ln": math.log, "exp": math.exp, "sin": math.sin, "cos": math.cos, "tan": math.tan, "asin": math.asin, "acos": math.acos, "atan": math.atan, "sinh": math.sinh, "cosh": math.cosh, "tanh": math.tanh, "pi": math.pi, "e": math.e, "floor": math.floor, "ceil": math.ceil, "factorial": math.factorial, "gcd": math.gcd, # Statistical functions "mean": statistics.mean, "median": statistics.median, "mode": statistics.mode, "stdev": statistics.stdev, # Financial functions "compound_interest": self._compound_interest, "simple_interest": self._simple_interest, "present_value": self._present_value, "future_value": self._future_value, } # Handle special financial calculations if "compound_interest" in expression.lower(): return self._handle_financial_calculation(expression) # Evaluate the expression safely result = eval(expression, safe_dict) return f"Calculation result: {result}" except Exception as e: return f"Calculation error: {str(e)}. Please check your mathematical expression." def _compound_interest(self, principal: float, rate: float, time: float, n: int = 1) -> float: """Calculate compound interest: A = P(1 + r/n)^(nt)""" return principal * (1 + rate/n) ** (n * time) def _simple_interest(self, principal: float, rate: float, time: float) -> float: """Calculate simple interest: A = P(1 + rt)""" return principal * (1 + rate * time) def _present_value(self, future_value: float, rate: float, time: float) -> float: """Calculate present value: PV = FV / (1 + r)^t""" return future_value / (1 + rate) ** time def _future_value(self, present_value: float, rate: float, time: float) -> float: """Calculate future value: FV = PV * (1 + r)^t""" return present_value * (1 + rate) ** time def _handle_financial_calculation(self, expression: str) -> str: """Handle complex financial calculations""" try: # Parse common financial calculation patterns if "compound" in expression.lower(): # Extract parameters from natural language # This is a simplified parser - in production, you'd use more sophisticated NLP import re # Look for patterns like "1000 at 5% for 3 years" money_pattern = r'\$?(\d+(?:\.\d+)?)' rate_pattern = r'(\d+(?:\.\d+)?)%' time_pattern = r'(\d+(?:\.\d+)?)\s*years?' money_match = re.search(money_pattern, expression) rate_match = re.search(rate_pattern, expression) time_match = re.search(time_pattern, expression) if money_match and rate_match and time_match: principal = float(money_match.group(1)) rate = float(rate_match.group(1)) / 100 # Convert percentage time = float(time_match.group(1)) # Default to annual compounding n = 12 if "monthly" in expression.lower() else 1 result = self._compound_interest(principal, rate, time, n) return f""" Financial Calculation - Compound Interest: - Principal: ${principal:,.2f} - Interest Rate: {rate*100}% per year - Time Period: {time} years - Compounding: {'Monthly' if n == 12 else 'Annually'} - Final Amount: ${result:,.2f} - Interest Earned: ${result - principal:,.2f} """ return "Unable to parse financial calculation. Please use format like: compound_interest(1000, 0.05, 3)" except Exception as e: return f"Financial calculation error: {str(e)}" class WebSearchTool(GeminiTool): """Web search tool using DuckDuckGo""" def __init__(self): super().__init__( name="web_search", description=""" Searches the web for current information using DuckDuckGo. Returns relevant, up-to-date search results with summaries. Best for: - Current events and news - Recent statistics and data - Current prices, populations, etc. - Latest information on any topic Example: "current population of Tokyo 2024" """ ) self.ddgs = DDGS() def execute(self, query: str) -> str: try: # Perform web search results = list(self.ddgs.text(query, max_results=5)) if not results: return f"No search results found for: {query}" formatted_results = f"Web search results for '{query}':\n\n" for i, result in enumerate(results, 1): title = result.get('title', 'No title') snippet = result.get('body', 'No description') url = result.get('href', 'No URL') formatted_results += f"{i}. **{title}**\n" formatted_results += f" {snippet[:200]}...\n" formatted_results += f" Source: {url}\n\n" return formatted_results except Exception as e: return f"Web search error: {str(e)}. Unable to perform search at this time." class FileAnalyzerTool(GeminiTool): """Tool for analyzing various file types""" def __init__(self): super().__init__( name="file_analyzer", description=""" Analyzes various file types including: - Text files (.txt, .md, .json, .csv) - Data files (CSV, Excel, JSON) - Image files (PNG, JPG, GIF, etc.) - Documents and structured data Provides summaries, statistics, and insights from file contents. """ ) def execute(self, file_path: str) -> str: try: if not os.path.exists(file_path): return f"File not found: {file_path}" file_extension = Path(file_path).suffix.lower() if file_extension in ['.txt', '.md', '.py', '.js', '.html', '.css']: return self._analyze_text_file(file_path) elif file_extension == '.json': return self._analyze_json_file(file_path) elif file_extension == '.csv': return self._analyze_csv_file(file_path) elif file_extension in ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp']: return self._analyze_image_file(file_path) else: return f"Unsupported file type: {file_extension}" except Exception as e: return f"Error analyzing file: {str(e)}" def _analyze_text_file(self, file_path: str) -> str: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() lines = content.split('\n') words = content.split() chars = len(content) # Basic text statistics avg_line_length = sum(len(line) for line in lines) / len(lines) if lines else 0 avg_word_length = sum(len(word) for word in words) / len(words) if words else 0 preview = content[:500] + ('...' if len(content) > 500 else '') return f""" šŸ“„ Text File Analysis: - File: {Path(file_path).name} - Lines: {len(lines):,} - Words: {len(words):,} - Characters: {chars:,} - Average line length: {avg_line_length:.1f} characters - Average word length: {avg_word_length:.1f} characters šŸ“ Content Preview: {preview} """ def _analyze_json_file(self, file_path: str) -> str: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) data_type = type(data).__name__ if isinstance(data, dict): keys_info = f"Keys ({len(data)}): {list(data.keys())[:10]}" if len(data) > 10: keys_info += "..." elif isinstance(data, list): keys_info = f"List with {len(data)} items" else: keys_info = f"Single {data_type} value" preview = json.dumps(data, indent=2)[:500] if len(str(data)) > 500: preview += "..." return f""" šŸ”§ JSON File Analysis: - File: {Path(file_path).name} - Data type: {data_type} - {keys_info} - File size: {os.path.getsize(file_path):,} bytes šŸ“Š Content Preview: {preview} """ def _analyze_csv_file(self, file_path: str) -> str: try: df = pd.read_csv(file_path) # Basic statistics rows, cols = df.shape numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() text_cols = df.select_dtypes(include=['object']).columns.tolist() missing_data = df.isnull().sum() # Summary statistics for numeric columns numeric_summary = "" if numeric_cols: numeric_summary = "\nšŸ“Š Numeric Columns Summary:\n" for col in numeric_cols[:5]: # Show first 5 numeric columns col_data = df[col] numeric_summary += f" {col}: mean={col_data.mean():.2f}, std={col_data.std():.2f}, min={col_data.min()}, max={col_data.max()}\n" preview = df.head(3).to_string(max_cols=6) return f""" šŸ“Š CSV File Analysis: - File: {Path(file_path).name} - Dimensions: {rows:,} rows Ɨ {cols} columns - Numeric columns: {len(numeric_cols)} ({numeric_cols[:5]}) - Text columns: {len(text_cols)} ({text_cols[:5]}) - Missing values: {missing_data.sum()} total - File size: {os.path.getsize(file_path):,} bytes {numeric_summary} šŸ“‹ Data Preview (first 3 rows): {preview} """ except Exception as e: return f"Error analyzing CSV file: {str(e)}" def _analyze_image_file(self, file_path: str) -> str: try: with Image.open(file_path) as img: width, height = img.size mode = img.mode format_name = img.format file_size = os.path.getsize(file_path) # Calculate aspect ratio aspect_ratio = width / height # Determine image orientation orientation = "Square" if abs(aspect_ratio - 1) < 0.1 else ("Landscape" if aspect_ratio > 1 else "Portrait") return f""" šŸ–¼ļø Image File Analysis: - File: {Path(file_path).name} - Format: {format_name} - Dimensions: {width} Ɨ {height} pixels - Color mode: {mode} - Aspect ratio: {aspect_ratio:.2f} ({orientation}) - File size: {file_size:,} bytes ({file_size/1024:.1f} KB) Note: For detailed image content analysis, the image will be processed by Gemini's vision capabilities. """ except Exception as e: return f"Error analyzing image: {str(e)}" class GeminiGAIAAgent: """ Advanced GAIA benchmark agent using Google Gemini Optimized for multimodal understanding and complex reasoning """ def __init__(self, model_name: str = "gemini-2.5-flash", api_key: Optional[str] = None, temperature: float = 0.1, max_tokens: int = 2048, verbose: bool = True): self.model_name = model_name self.temperature = temperature self.max_tokens = max_tokens self.verbose = verbose # Configure Gemini API self._configure_gemini(api_key) # Initialize model self.model = self._initialize_model() # Initialize tools self.tools = self._initialize_tools() # Conversation history self.conversation_history = [] logger.info(f"Gemini GAIA Agent initialized with model: {model_name}") def _configure_gemini(self, api_key: Optional[str]): """Configure Gemini API""" if api_key: genai.configure(api_key=api_key) elif os.getenv("GOOGLE_API_KEY"): genai.configure(api_key=os.getenv("GOOGLE_API_KEY")) else: logger.warning("No Google API key provided. Please set GOOGLE_API_KEY environment variable or pass api_key parameter.") def _initialize_model(self): """Initialize the Gemini model""" try: # Configure safety settings for more permissive responses safety_settings = { HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE, HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE, HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE, HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE, } # Generation configuration generation_config = genai.types.GenerationConfig( temperature=self.temperature, max_output_tokens=self.max_tokens, top_p=0.8, top_k=40 ) model = genai.GenerativeModel( model_name=self.model_name, generation_config=generation_config, safety_settings=safety_settings ) return model except Exception as e: logger.error(f"Failed to initialize Gemini model: {str(e)}") return None def _initialize_tools(self) -> Dict[str, GeminiTool]: """Initialize all available tools""" tools = { "calculator": CalculatorTool(), "web_search": WebSearchTool(), "file_analyzer": FileAnalyzerTool(), } return tools def _create_system_prompt(self) -> str: """Create the system prompt for the agent""" current_time = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') return f"""You are an advanced AI assistant designed to solve GAIA benchmark questions with exceptional accuracy and reasoning. GAIA (General AI Assistants) benchmark tests your ability to: 1. 🧠 **Complex Reasoning**: Multi-step problem solving and logical inference 2. šŸ”§ **Tool Usage**: Effective use of calculators, web search, and file analysis 3. šŸ–¼ļø **Multimodal Understanding**: Processing text, images, data files, and documents 4. šŸŽÆ **Accuracy**: Providing precise, well-researched answers AVAILABLE TOOLS: - **calculator**: Advanced mathematical operations, financial calculations, statistics - **web_search**: Current information from the web using DuckDuckGo - **file_analyzer**: Analysis of text files, CSV data, JSON, and images INSTRUCTIONS: 1. **Think Step-by-Step**: Break down complex problems into logical steps 2. **Use Tools Strategically**: Choose the right tools for each task 3. **Verify Information**: Double-check calculations and search for current data when needed 4. **Be Precise**: Provide exact, accurate answers with proper reasoning 5. **Show Your Work**: Explain your thought process clearly 6. **Handle Files**: Analyze uploaded files as part of your solution process RESPONSE FORMAT: When using tools, clearly indicate: - Which tool you're using and why - The input you're providing to the tool - How the tool's output contributes to your final answer Current Date/Time (UTC): {current_time} User: AdilzhanB Remember: Your goal is to provide the most accurate and well-reasoned answer possible for each GAIA question.""" def _identify_required_tools(self, question: str, file_path: Optional[str] = None) -> List[str]: """Identify which tools might be needed for a question""" required_tools = [] question_lower = question.lower() # Mathematical operations math_keywords = ['calculate', 'compute', 'math', 'formula', 'equation', 'interest', 'percentage', 'average', 'sum', 'multiply', 'divide', 'square root', 'logarithm', 'statistics'] if any(keyword in question_lower for keyword in math_keywords): required_tools.append('calculator') # Current/recent information current_keywords = ['current', 'latest', 'recent', 'today', '2024', '2025', 'now', 'present', 'up-to-date', 'newest'] search_keywords = ['population', 'price', 'news', 'event', 'happening'] if any(keyword in question_lower for keyword in current_keywords + search_keywords): required_tools.append('web_search') # File analysis if file_path or any(keyword in question_lower for keyword in ['file', 'document', 'image', 'data', 'csv', 'analyze', 'uploaded']): required_tools.append('file_analyzer') return required_tools def _use_tool(self, tool_name: str, input_data: str) -> str: """Execute a specific tool with given input""" if tool_name not in self.tools: return f"Tool '{tool_name}' not available." try: result = self.tools[tool_name].execute(input_data) return result except Exception as e: return f"Error using {tool_name}: {str(e)}" def _process_image_for_gemini(self, file_path: str) -> Optional[dict]: """Process image file for Gemini's multimodal capabilities""" try: with open(file_path, 'rb') as f: image_data = f.read() # Convert to format Gemini expects import mimetypes mime_type, _ = mimetypes.guess_type(file_path) return { 'mime_type': mime_type or 'image/jpeg', 'data': image_data } except Exception as e: logger.error(f"Error processing image: {str(e)}") return None def solve_gaia_question(self, gaia_question: GAIAQuestion) -> Dict[str, Any]: """ Main method to solve a GAIA benchmark question """ start_time = datetime.utcnow() logger.info(f"Solving GAIA Question {gaia_question.question_id} (Level {gaia_question.level})") if not self.model: return { "question_id": gaia_question.question_id, "error": "Model not initialized. Please check your Google API key.", "timestamp": start_time.isoformat() } try: # Step 1: Analyze question and identify required tools required_tools = self._identify_required_tools(gaia_question.question, gaia_question.file_path) # Step 2: Gather context from tools tool_results = {} reasoning_steps = [] # File analysis first (if applicable) if gaia_question.file_path and os.path.exists(gaia_question.file_path): reasoning_steps.append(f"šŸ“Ž Analyzing uploaded file: {gaia_question.file_name}") file_analysis = self._use_tool("file_analyzer", gaia_question.file_path) tool_results["file_analyzer"] = file_analysis reasoning_steps.append(f"āœ… File analysis completed") # Use other tools as needed for tool_name in required_tools: if tool_name != "file_analyzer": # Already handled above reasoning_steps.append(f"šŸ”§ Using {tool_name} tool") if tool_name == "web_search": # Extract search query from question search_query = gaia_question.question tool_result = self._use_tool(tool_name, search_query) elif tool_name == "calculator": # For now, we'll let Gemini decide what to calculate tool_result = "Calculator tool available for mathematical operations" else: tool_result = self._use_tool(tool_name, gaia_question.question) tool_results[tool_name] = tool_result reasoning_steps.append(f"āœ… {tool_name} completed") # Step 3: Prepare content for Gemini content_parts = [] # System prompt and question prompt = f"""{self._create_system_prompt()} GAIA BENCHMARK QUESTION (Level {gaia_question.level}): Question ID: {gaia_question.question_id} Question: {gaia_question.question} AVAILABLE TOOL RESULTS: {json.dumps(tool_results, indent=2) if tool_results else "No tools used yet."} TASK: Solve this GAIA question step by step. You may request specific tool usage if needed by clearly stating: "USE_TOOL: [tool_name] with input: [input_data]" Provide your complete reasoning and final answer.""" content_parts.append(prompt) # Add image if it's an image file if (gaia_question.file_path and Path(gaia_question.file_path).suffix.lower() in ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp']): image_data = self._process_image_for_gemini(gaia_question.file_path) if image_data: content_parts.append(image_data) reasoning_steps.append("šŸ–¼ļø Image included for visual analysis") # Step 4: Generate response with Gemini reasoning_steps.append("šŸ¤– Generating response with Gemini...") response = self.model.generate_content(content_parts) if not response or not response.text: raise Exception("Empty response from Gemini model") agent_response = response.text reasoning_steps.append("āœ… Response generated successfully") # Step 5: Process any additional tool requests if "USE_TOOL:" in agent_response: reasoning_steps.append("šŸ”§ Processing additional tool requests...") agent_response = self._process_tool_requests(agent_response, reasoning_steps) # Step 6: Calculate confidence and metrics confidence_score = self._calculate_confidence(agent_response, tool_results) end_time = datetime.utcnow() processing_time = (end_time - start_time).total_seconds() # Step 7: Prepare final result result = { "question_id": gaia_question.question_id, "question": gaia_question.question, "level": gaia_question.level, "agent_response": agent_response, "reasoning_steps": reasoning_steps, "tools_used": list(tool_results.keys()), "tool_results": tool_results, "confidence_score": confidence_score, "processing_time_seconds": processing_time, "timestamp": end_time.isoformat(), "model_used": self.model_name, "agent_version": "1.0-gemini" } # Add to conversation history self.conversation_history.append(result) logger.info(f"Question {gaia_question.question_id} solved successfully in {processing_time:.2f}s") return result except Exception as e: error_msg = f"Error solving question: {str(e)}" logger.error(error_msg) return { "question_id": gaia_question.question_id, "question": gaia_question.question, "level": gaia_question.level, "agent_response": f"Error: {error_msg}", "error": True, "timestamp": datetime.utcnow().isoformat(), "model_used": self.model_name } def _process_tool_requests(self, response: str, reasoning_steps: List[str]) -> str: """Process tool usage requests from Gemini's response""" lines = response.split('\n') processed_response = [] for line in lines: if line.strip().startswith("USE_TOOL:"): try: # Parse tool request: "USE_TOOL: calculator with input: 2+2" parts = line.split("USE_TOOL:")[1].strip() tool_name = parts.split("with input:")[0].strip() tool_input = parts.split("with input:")[1].strip() reasoning_steps.append(f"šŸ”§ Executing {tool_name} with input: {tool_input}") # Execute the tool tool_result = self._use_tool(tool_name, tool_input) # Replace the tool request with the result processed_response.append(f"Tool Result ({tool_name}): {tool_result}") reasoning_steps.append(f"āœ… {tool_name} executed successfully") except Exception as e: processed_response.append(f"Tool Error: {str(e)}") reasoning_steps.append(f"āŒ Tool execution failed: {str(e)}") else: processed_response.append(line) return '\n'.join(processed_response) def _calculate_confidence(self, response: str, tool_results: Dict) -> float: """Calculate confidence score based on various factors""" confidence = 0.5 # Base confidence # Increase confidence for detailed responses if len(response) > 200: confidence += 0.1 # Increase confidence for tool usage if tool_results: confidence += 0.2 # Increase confidence for structured responses if any(marker in response for marker in ['Step', 'Analysis:', 'Result:', 'Conclusion:']): confidence += 0.1 # Decrease confidence for uncertainty indicators uncertainty_words = ['uncertain', 'unclear', 'might', 'possibly', 'approximately', 'estimate'] if any(word in response.lower() for word in uncertainty_words): confidence -= 0.1 # Increase confidence for numerical precision if any(char.isdigit() for char in response): confidence += 0.1 return max(0.0, min(1.0, confidence)) def get_available_tools(self) -> List[str]: """Get list of available tool names""" return list(self.tools.keys()) def test_tools(self) -> Dict[str, str]: """Test all tools to ensure they're working""" test_results = {} for tool_name, tool in self.tools.items(): try: if tool_name == "calculator": result = tool.execute("sqrt(16)") elif tool_name == "web_search": result = tool.execute("test search query") elif tool_name == "file_analyzer": # Create a temporary test file with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: f.write("Test file content") temp_path = f.name result = tool.execute(temp_path) os.unlink(temp_path) # Clean up else: result = "Tool available" test_results[tool_name] = f"āœ… Working: {result[:100]}..." except Exception as e: test_results[tool_name] = f"āŒ Error: {str(e)}" return test_results def get_conversation_history(self, limit: int = 5) -> List[Dict]: """Get recent conversation history""" return self.conversation_history[-limit:] if self.conversation_history else [] # Example usage and testing if __name__ == "__main__": import sys # Check for API key if not os.getenv("GOOGLE_API_KEY"): print("āš ļø Please set your GOOGLE_API_KEY environment variable") print("You can get one from: https://makersuite.google.com/app/apikey") sys.exit(1) # Initialize agent print("šŸš€ Initializing Gemini GAIA Agent...") agent = GeminiGAIAAgent(verbose=True) # Test tools print("\nšŸ”§ Testing tools...") tool_results = agent.test_tools() for tool, result in tool_results.items(): print(f" {tool}: {result}") # Test with sample questions sample_questions = [ GAIAQuestion( question_id="test_001", question="What is the square root of 144?", level=1 ), GAIAQuestion( question_id="test_002", question="If I invest $1000 at 5% annual compound interest, how much will I have after 3 years?", level=2 ), GAIAQuestion( question_id="test_003", question="What is the current population of Tokyo according to the latest data?", level=2 ) ] print("\nšŸ“ Testing sample questions...") for question in sample_questions: print(f"\n{'='*60}") result = agent.solve_gaia_question(question) print(f"Question: {result['question']}") print(f"Level: {result['level']}") print(f"Tools Used: {result.get('tools_used', [])}") print(f"Confidence: {result.get('confidence_score', 0):.2f}") print(f"Answer: {result['agent_response'][:300]}...") if result.get('error'): print(f"āŒ Error occurred: {result.get('agent_response')}")