""" Assessment analyzer for the GAIA agent. This module provides functionality to analyze assessment results, categorize questions, calculate success rates, identify error patterns, and generate recommendations for further improvements. """ import logging from typing import Dict, Any, List, Optional, Set, Tuple import re from collections import Counter, defaultdict logger = logging.getLogger("gaia_agent.assessment_analyzer") class AnalysisResult: """Class to store assessment analysis results.""" def __init__( self, category: str, success_rate: float, successful_questions: Dict[str, Dict[str, Any]], failed_questions: Dict[str, Dict[str, Any]], error_patterns: Dict[str, int], recommendations: List[str] ): """ Initialize the analysis result. Args: category: The category of questions analyzed success_rate: The success rate (0.0 to 1.0) successful_questions: Dictionary of successfully answered questions failed_questions: Dictionary of failed questions error_patterns: Dictionary of error patterns and their frequencies recommendations: List of recommendations for improvement """ self.category = category self.success_rate = success_rate self.successful_questions = successful_questions self.failed_questions = failed_questions self.error_patterns = error_patterns self.recommendations = recommendations def __str__(self) -> str: """Return a string representation of the analysis result.""" return ( f"Analysis Result for {self.category} questions:\n" f"Success Rate: {self.success_rate:.2%}\n" f"Successful Questions: {len(self.successful_questions)}\n" f"Failed Questions: {len(self.failed_questions)}\n" f"Top Error Patterns: {dict(sorted(self.error_patterns.items(), key=lambda x: x[1], reverse=True)[:3])}\n" f"Recommendations: {self.recommendations}" ) def categorize_question(question_id: str, question_data: Dict[str, Any]) -> str: """ Categorize a question based on its content and ID. Args: question_id: The ID of the question question_data: The question data Returns: The category of the question """ question_text = question_data.get("question", "").lower() # Web search questions web_search_keywords = [ "mercedes sosa", "wikipedia", "featured article", "malko competition", "yankees", "olympics", "universe today", "polish tv actor" ] # Multimodal questions multimodal_keywords = [ "video", "youtube", "watch?v=", "audio", "listen", "chess position", "image" ] # File processing questions file_processing_keywords = [ "excel", "spreadsheet", "python code", "execute", "run this code" ] # Academic paper questions academic_paper_keywords = [ "arxiv", "paper", "scientific", "research", "publication", "journal" ] # Mathematical reasoning questions math_keywords = [ "math", "calculate", "commutativity", "table", "operation", "subset" ] # Check for web search questions for keyword in web_search_keywords: if keyword in question_text: return "web_search" # Check for multimodal questions for keyword in multimodal_keywords: if keyword in question_text: return "multimodal" # Check for file processing questions for keyword in file_processing_keywords: if keyword in question_text: return "file_processing" # Check for academic paper questions for keyword in academic_paper_keywords: if keyword in question_text: return "academic_paper" # Check for mathematical reasoning questions for keyword in math_keywords: if keyword in question_text: return "mathematical_reasoning" # Default to "other" if no category matches return "other" def identify_error_patterns(failed_questions: Dict[str, Dict[str, Any]]) -> Dict[str, int]: """ Identify common error patterns in failed questions. Args: failed_questions: Dictionary of failed questions Returns: Dictionary of error patterns and their frequencies """ error_patterns = Counter() for question_id, question_data in failed_questions.items(): error = question_data.get("error", "") answer = question_data.get("answer", "") expected_answer = question_data.get("expected_answer", "") # Check for specific error types if error: # Check for timeout-related errors if any(term in error.lower() for term in ["timeout", "timed out", "time limit"]): error_patterns["timeout"] += 1 # Check for API-related errors elif any(term in error.lower() for term in ["api", "rate limit", "quota"]): error_patterns["api_error"] += 1 # Check for unhashable type errors elif "unhashable" in error.lower(): error_patterns["unhashable_type"] += 1 else: error_patterns["other_error"] += 1 # Check for incorrect answers elif answer and expected_answer and answer != expected_answer: # Check for partial matches if any(word in answer.lower() for word in expected_answer.lower().split()): error_patterns["partial_match"] += 1 else: error_patterns["completely_wrong_answer"] += 1 else: error_patterns["unknown_failure"] += 1 return error_patterns def generate_recommendations( category: str, success_rate: float, error_patterns: Dict[str, int], failed_questions: Dict[str, Dict[str, Any]] ) -> List[str]: """ Generate recommendations for improvement based on analysis. Args: category: The category of questions success_rate: The success rate error_patterns: Dictionary of error patterns and their frequencies failed_questions: Dictionary of failed questions Returns: List of recommendations """ recommendations = [] # General recommendations based on success rate if success_rate < 0.3: recommendations.append(f"Major improvements needed for {category} questions") elif success_rate < 0.7: recommendations.append(f"Moderate improvements needed for {category} questions") elif success_rate < 1.0: recommendations.append(f"Minor improvements needed for {category} questions") else: recommendations.append(f"All {category} questions are answered correctly") # Specific recommendations based on error patterns if error_patterns.get("timeout", 0) > 0: recommendations.append("Optimize search performance to prevent timeouts") if error_patterns.get("api_error", 0) > 0: recommendations.append("Improve error handling for API failures") if error_patterns.get("unhashable_type", 0) > 0: recommendations.append("Fix unhashable type errors in the agent implementation") if error_patterns.get("partial_match", 0) > 0: recommendations.append("Improve answer extraction precision") if error_patterns.get("completely_wrong_answer", 0) > 0: recommendations.append("Enhance search result relevance filtering") # Category-specific recommendations if category == "web_search": if success_rate < 1.0: recommendations.append("Implement specialized search for specific domains (Wikipedia, news, etc.)") recommendations.append("Improve result ranking based on query relevance") recommendations.append("Add better error handling for failed searches") elif category == "multimodal": if success_rate < 1.0: recommendations.append("Enhance video content extraction capabilities") recommendations.append("Improve audio transcription accuracy") recommendations.append("Add specialized image analysis for specific domains") elif category == "file_processing": if success_rate < 1.0: recommendations.append("Improve Excel file parsing and data extraction") recommendations.append("Enhance code execution sandbox for better isolation") elif category == "academic_paper": if success_rate < 1.0: recommendations.append("Improve arXiv paper content extraction") recommendations.append("Add support for more academic paper sources") elif category == "mathematical_reasoning": if success_rate < 1.0: recommendations.append("Enhance mathematical operation analysis") recommendations.append("Improve structured data parsing for tables and matrices") return recommendations def analyze_assessment_results( assessment_results: Dict[str, Dict[str, Any]], category: Optional[str] = None ) -> AnalysisResult: """ Analyze assessment results and generate insights. Args: assessment_results: Dictionary of assessment results category: Optional category to filter questions by Returns: AnalysisResult object with analysis results """ # Categorize questions categorized_questions = defaultdict(dict) for question_id, question_data in assessment_results.items(): question_category = categorize_question(question_id, question_data) categorized_questions[question_category][question_id] = question_data # If a specific category is requested, filter questions if category: questions_to_analyze = categorized_questions.get(category, {}) category_name = category else: questions_to_analyze = assessment_results category_name = "all" # Separate successful and failed questions successful_questions = {} failed_questions = {} for question_id, question_data in questions_to_analyze.items(): if question_data.get("correct", False): successful_questions[question_id] = question_data else: failed_questions[question_id] = question_data # Calculate success rate total_questions = len(questions_to_analyze) successful_count = len(successful_questions) success_rate = successful_count / total_questions if total_questions > 0 else 0.0 # Identify error patterns error_patterns = identify_error_patterns(failed_questions) # Generate recommendations recommendations = generate_recommendations( category_name, success_rate, error_patterns, failed_questions ) # Create and return analysis result return AnalysisResult( category=category_name, success_rate=success_rate, successful_questions=successful_questions, failed_questions=failed_questions, error_patterns=error_patterns, recommendations=recommendations )