Final_Assignment_GAIAAgent / src /gaia /agent /assessment_analyzer.py
JoachimVC's picture
Upload GAIA agent implementation files for assessment
c922f8b
"""
Assessment analyzer for the GAIA agent.
This module provides functionality to analyze assessment results,
categorize questions, calculate success rates, identify error patterns,
and generate recommendations for further improvements.
"""
import logging
from typing import Dict, Any, List, Optional, Set, Tuple
import re
from collections import Counter, defaultdict
logger = logging.getLogger("gaia_agent.assessment_analyzer")
class AnalysisResult:
"""Class to store assessment analysis results."""
def __init__(
self,
category: str,
success_rate: float,
successful_questions: Dict[str, Dict[str, Any]],
failed_questions: Dict[str, Dict[str, Any]],
error_patterns: Dict[str, int],
recommendations: List[str]
):
"""
Initialize the analysis result.
Args:
category: The category of questions analyzed
success_rate: The success rate (0.0 to 1.0)
successful_questions: Dictionary of successfully answered questions
failed_questions: Dictionary of failed questions
error_patterns: Dictionary of error patterns and their frequencies
recommendations: List of recommendations for improvement
"""
self.category = category
self.success_rate = success_rate
self.successful_questions = successful_questions
self.failed_questions = failed_questions
self.error_patterns = error_patterns
self.recommendations = recommendations
def __str__(self) -> str:
"""Return a string representation of the analysis result."""
return (
f"Analysis Result for {self.category} questions:\n"
f"Success Rate: {self.success_rate:.2%}\n"
f"Successful Questions: {len(self.successful_questions)}\n"
f"Failed Questions: {len(self.failed_questions)}\n"
f"Top Error Patterns: {dict(sorted(self.error_patterns.items(), key=lambda x: x[1], reverse=True)[:3])}\n"
f"Recommendations: {self.recommendations}"
)
def categorize_question(question_id: str, question_data: Dict[str, Any]) -> str:
"""
Categorize a question based on its content and ID.
Args:
question_id: The ID of the question
question_data: The question data
Returns:
The category of the question
"""
question_text = question_data.get("question", "").lower()
# Web search questions
web_search_keywords = [
"mercedes sosa", "wikipedia", "featured article", "malko competition",
"yankees", "olympics", "universe today", "polish tv actor"
]
# Multimodal questions
multimodal_keywords = [
"video", "youtube", "watch?v=", "audio", "listen", "chess position", "image"
]
# File processing questions
file_processing_keywords = [
"excel", "spreadsheet", "python code", "execute", "run this code"
]
# Academic paper questions
academic_paper_keywords = [
"arxiv", "paper", "scientific", "research", "publication", "journal"
]
# Mathematical reasoning questions
math_keywords = [
"math", "calculate", "commutativity", "table", "operation", "subset"
]
# Check for web search questions
for keyword in web_search_keywords:
if keyword in question_text:
return "web_search"
# Check for multimodal questions
for keyword in multimodal_keywords:
if keyword in question_text:
return "multimodal"
# Check for file processing questions
for keyword in file_processing_keywords:
if keyword in question_text:
return "file_processing"
# Check for academic paper questions
for keyword in academic_paper_keywords:
if keyword in question_text:
return "academic_paper"
# Check for mathematical reasoning questions
for keyword in math_keywords:
if keyword in question_text:
return "mathematical_reasoning"
# Default to "other" if no category matches
return "other"
def identify_error_patterns(failed_questions: Dict[str, Dict[str, Any]]) -> Dict[str, int]:
"""
Identify common error patterns in failed questions.
Args:
failed_questions: Dictionary of failed questions
Returns:
Dictionary of error patterns and their frequencies
"""
error_patterns = Counter()
for question_id, question_data in failed_questions.items():
error = question_data.get("error", "")
answer = question_data.get("answer", "")
expected_answer = question_data.get("expected_answer", "")
# Check for specific error types
if error:
# Check for timeout-related errors
if any(term in error.lower() for term in ["timeout", "timed out", "time limit"]):
error_patterns["timeout"] += 1
# Check for API-related errors
elif any(term in error.lower() for term in ["api", "rate limit", "quota"]):
error_patterns["api_error"] += 1
# Check for unhashable type errors
elif "unhashable" in error.lower():
error_patterns["unhashable_type"] += 1
else:
error_patterns["other_error"] += 1
# Check for incorrect answers
elif answer and expected_answer and answer != expected_answer:
# Check for partial matches
if any(word in answer.lower() for word in expected_answer.lower().split()):
error_patterns["partial_match"] += 1
else:
error_patterns["completely_wrong_answer"] += 1
else:
error_patterns["unknown_failure"] += 1
return error_patterns
def generate_recommendations(
category: str,
success_rate: float,
error_patterns: Dict[str, int],
failed_questions: Dict[str, Dict[str, Any]]
) -> List[str]:
"""
Generate recommendations for improvement based on analysis.
Args:
category: The category of questions
success_rate: The success rate
error_patterns: Dictionary of error patterns and their frequencies
failed_questions: Dictionary of failed questions
Returns:
List of recommendations
"""
recommendations = []
# General recommendations based on success rate
if success_rate < 0.3:
recommendations.append(f"Major improvements needed for {category} questions")
elif success_rate < 0.7:
recommendations.append(f"Moderate improvements needed for {category} questions")
elif success_rate < 1.0:
recommendations.append(f"Minor improvements needed for {category} questions")
else:
recommendations.append(f"All {category} questions are answered correctly")
# Specific recommendations based on error patterns
if error_patterns.get("timeout", 0) > 0:
recommendations.append("Optimize search performance to prevent timeouts")
if error_patterns.get("api_error", 0) > 0:
recommendations.append("Improve error handling for API failures")
if error_patterns.get("unhashable_type", 0) > 0:
recommendations.append("Fix unhashable type errors in the agent implementation")
if error_patterns.get("partial_match", 0) > 0:
recommendations.append("Improve answer extraction precision")
if error_patterns.get("completely_wrong_answer", 0) > 0:
recommendations.append("Enhance search result relevance filtering")
# Category-specific recommendations
if category == "web_search":
if success_rate < 1.0:
recommendations.append("Implement specialized search for specific domains (Wikipedia, news, etc.)")
recommendations.append("Improve result ranking based on query relevance")
recommendations.append("Add better error handling for failed searches")
elif category == "multimodal":
if success_rate < 1.0:
recommendations.append("Enhance video content extraction capabilities")
recommendations.append("Improve audio transcription accuracy")
recommendations.append("Add specialized image analysis for specific domains")
elif category == "file_processing":
if success_rate < 1.0:
recommendations.append("Improve Excel file parsing and data extraction")
recommendations.append("Enhance code execution sandbox for better isolation")
elif category == "academic_paper":
if success_rate < 1.0:
recommendations.append("Improve arXiv paper content extraction")
recommendations.append("Add support for more academic paper sources")
elif category == "mathematical_reasoning":
if success_rate < 1.0:
recommendations.append("Enhance mathematical operation analysis")
recommendations.append("Improve structured data parsing for tables and matrices")
return recommendations
def analyze_assessment_results(
assessment_results: Dict[str, Dict[str, Any]],
category: Optional[str] = None
) -> AnalysisResult:
"""
Analyze assessment results and generate insights.
Args:
assessment_results: Dictionary of assessment results
category: Optional category to filter questions by
Returns:
AnalysisResult object with analysis results
"""
# Categorize questions
categorized_questions = defaultdict(dict)
for question_id, question_data in assessment_results.items():
question_category = categorize_question(question_id, question_data)
categorized_questions[question_category][question_id] = question_data
# If a specific category is requested, filter questions
if category:
questions_to_analyze = categorized_questions.get(category, {})
category_name = category
else:
questions_to_analyze = assessment_results
category_name = "all"
# Separate successful and failed questions
successful_questions = {}
failed_questions = {}
for question_id, question_data in questions_to_analyze.items():
if question_data.get("correct", False):
successful_questions[question_id] = question_data
else:
failed_questions[question_id] = question_data
# Calculate success rate
total_questions = len(questions_to_analyze)
successful_count = len(successful_questions)
success_rate = successful_count / total_questions if total_questions > 0 else 0.0
# Identify error patterns
error_patterns = identify_error_patterns(failed_questions)
# Generate recommendations
recommendations = generate_recommendations(
category_name,
success_rate,
error_patterns,
failed_questions
)
# Create and return analysis result
return AnalysisResult(
category=category_name,
success_rate=success_rate,
successful_questions=successful_questions,
failed_questions=failed_questions,
error_patterns=error_patterns,
recommendations=recommendations
)