gaia-enhanced-agent / tools /data_analysis_engine.py
GAIA Agent Deployment
Deploy Complete Enhanced GAIA Agent with Phase 1-6 Improvements
9a6a4dc
"""
Data Analysis Engine for GAIA Agent - Phase 4
Advanced data analysis capabilities for Excel and structured data
Features:
- Statistical analysis of Excel data
- Data aggregation and summarization
- Financial calculations and reporting
- Category-based filtering (food vs drinks)
- Currency formatting and precision handling
- Data validation and quality checks
"""
import logging
import pandas as pd
import numpy as np
from typing import Dict, Any, List, Optional, Union, Tuple
from decimal import Decimal, ROUND_HALF_UP
import re
from datetime import datetime, date
logger = logging.getLogger(__name__)
class DataAnalysisEngine:
"""Advanced data analysis engine for GAIA evaluation tasks."""
def __init__(self):
"""Initialize the data analysis engine."""
self.available = True
self.analysis_cache = {}
def analyze_financial_data(self, data: Union[pd.DataFrame, List[Dict]],
sales_columns: List[str] = None,
category_columns: List[str] = None,
filters: Dict[str, Any] = None) -> Dict[str, Any]:
"""
Perform comprehensive financial data analysis.
Args:
data: DataFrame or list of dictionaries containing the data
sales_columns: Columns containing sales/financial data
category_columns: Columns containing category information
filters: Dictionary of filters to apply
Returns:
Comprehensive financial analysis results
"""
try:
# Convert to DataFrame if needed
if isinstance(data, list):
df = pd.DataFrame(data)
else:
df = data.copy()
if df.empty:
return {"error": "No data provided for analysis"}
# Auto-detect columns if not provided
if sales_columns is None:
sales_columns = self._detect_sales_columns(df)
if category_columns is None:
category_columns = self._detect_category_columns(df)
# Apply filters
filtered_df = self._apply_filters(df, filters) if filters else df
# Perform analysis
analysis_results = {
"total_records": len(df),
"filtered_records": len(filtered_df),
"sales_analysis": self._analyze_sales_data(filtered_df, sales_columns),
"category_analysis": self._analyze_categories(filtered_df, category_columns, sales_columns),
"statistical_summary": self._generate_statistical_summary(filtered_df, sales_columns),
"data_quality": self._assess_data_quality(filtered_df),
"filters_applied": filters or {},
"columns_analyzed": {
"sales_columns": sales_columns,
"category_columns": category_columns
}
}
return analysis_results
except Exception as e:
logger.error(f"❌ Financial data analysis failed: {e}")
return {"error": f"Analysis failed: {str(e)}"}
def calculate_category_totals(self, data: Union[pd.DataFrame, List[Dict]],
category_column: str,
sales_column: str,
include_categories: List[str] = None,
exclude_categories: List[str] = None) -> Dict[str, Any]:
"""
Calculate totals by category with inclusion/exclusion filters.
Args:
data: DataFrame or list of dictionaries
category_column: Column containing categories
sales_column: Column containing sales amounts
include_categories: Categories to include
exclude_categories: Categories to exclude
Returns:
Category totals and analysis
"""
try:
# Convert to DataFrame if needed
if isinstance(data, list):
df = pd.DataFrame(data)
else:
df = data.copy()
if df.empty or category_column not in df.columns or sales_column not in df.columns:
return {"error": "Required columns not found in data"}
# Clean and prepare data
df[category_column] = df[category_column].astype(str).str.strip()
df[sales_column] = pd.to_numeric(df[sales_column], errors='coerce')
# Remove rows with invalid sales data
df = df.dropna(subset=[sales_column])
# Apply category filters
if include_categories:
mask = df[category_column].str.lower().isin([cat.lower() for cat in include_categories])
df = df[mask]
if exclude_categories:
mask = ~df[category_column].str.lower().isin([cat.lower() for cat in exclude_categories])
df = df[mask]
# Calculate totals by category
category_totals = df.groupby(category_column)[sales_column].agg([
'sum', 'count', 'mean', 'min', 'max'
]).round(2)
# Calculate overall total
overall_total = df[sales_column].sum()
# Prepare results
results = {
"overall_total": float(overall_total),
"formatted_total": self._format_currency(overall_total),
"category_breakdown": {},
"summary": {
"total_categories": len(category_totals),
"total_items": len(df),
"average_per_item": float(df[sales_column].mean()) if len(df) > 0 else 0
},
"filters_applied": {
"include_categories": include_categories,
"exclude_categories": exclude_categories
}
}
# Add category breakdown
for category, stats in category_totals.iterrows():
results["category_breakdown"][category] = {
"total": float(stats['sum']),
"formatted_total": self._format_currency(stats['sum']),
"count": int(stats['count']),
"average": float(stats['mean']),
"min": float(stats['min']),
"max": float(stats['max']),
"percentage_of_total": float((stats['sum'] / overall_total * 100)) if overall_total > 0 else 0
}
return results
except Exception as e:
logger.error(f"❌ Category totals calculation failed: {e}")
return {"error": f"Calculation failed: {str(e)}"}
def detect_food_vs_drinks(self, data: Union[pd.DataFrame, List[Dict]],
category_columns: List[str] = None) -> Dict[str, Any]:
"""
Detect and categorize items as food vs drinks.
Args:
data: DataFrame or list of dictionaries
category_columns: Columns to analyze for food/drink classification
Returns:
Classification results with food and drink items
"""
try:
# Convert to DataFrame if needed
if isinstance(data, list):
df = pd.DataFrame(data)
else:
df = data.copy()
if df.empty:
return {"error": "No data provided"}
# Auto-detect category columns if not provided
if category_columns is None:
category_columns = self._detect_category_columns(df)
# Food and drink keywords
food_keywords = [
'burger', 'sandwich', 'pizza', 'salad', 'fries', 'chicken', 'beef', 'pork',
'fish', 'pasta', 'rice', 'bread', 'soup', 'steak', 'wings', 'nuggets',
'taco', 'burrito', 'wrap', 'hot dog', 'sub', 'panini', 'quesadilla',
'breakfast', 'lunch', 'dinner', 'appetizer', 'dessert', 'cake', 'pie',
'food', 'meal', 'dish', 'entree', 'side'
]
drink_keywords = [
'drink', 'beverage', 'soda', 'cola', 'pepsi', 'coke', 'sprite', 'fanta',
'coffee', 'tea', 'latte', 'cappuccino', 'espresso', 'mocha',
'juice', 'water', 'milk', 'shake', 'smoothie', 'beer', 'wine',
'cocktail', 'martini', 'whiskey', 'vodka', 'rum', 'gin',
'lemonade', 'iced tea', 'hot chocolate', 'energy drink'
]
classification_results = {
"food_items": [],
"drink_items": [],
"unclassified_items": [],
"classification_summary": {}
}
# Analyze each category column
for col in category_columns:
if col not in df.columns:
continue
unique_items = df[col].dropna().unique()
for item in unique_items:
item_str = str(item).lower()
# Check for food keywords
is_food = any(keyword in item_str for keyword in food_keywords)
# Check for drink keywords
is_drink = any(keyword in item_str for keyword in drink_keywords)
if is_food and not is_drink:
classification_results["food_items"].append(str(item))
elif is_drink and not is_food:
classification_results["drink_items"].append(str(item))
else:
classification_results["unclassified_items"].append(str(item))
# Remove duplicates
classification_results["food_items"] = list(set(classification_results["food_items"]))
classification_results["drink_items"] = list(set(classification_results["drink_items"]))
classification_results["unclassified_items"] = list(set(classification_results["unclassified_items"]))
# Generate summary
classification_results["classification_summary"] = {
"total_items": len(classification_results["food_items"]) +
len(classification_results["drink_items"]) +
len(classification_results["unclassified_items"]),
"food_count": len(classification_results["food_items"]),
"drink_count": len(classification_results["drink_items"]),
"unclassified_count": len(classification_results["unclassified_items"]),
"classification_confidence": (
(len(classification_results["food_items"]) + len(classification_results["drink_items"])) /
max(1, len(classification_results["food_items"]) +
len(classification_results["drink_items"]) +
len(classification_results["unclassified_items"]))
) * 100
}
return classification_results
except Exception as e:
logger.error(f"❌ Food vs drinks detection failed: {e}")
return {"error": f"Detection failed: {str(e)}"}
def _detect_sales_columns(self, df: pd.DataFrame) -> List[str]:
"""Detect columns that likely contain sales/financial data."""
sales_keywords = [
'sales', 'amount', 'total', 'price', 'cost', 'revenue', 'value',
'sum', 'subtotal', 'grand total', 'net', 'gross'
]
sales_columns = []
for col in df.columns:
col_lower = str(col).lower()
# Check for sales keywords in column name
if any(keyword in col_lower for keyword in sales_keywords):
if pd.api.types.is_numeric_dtype(df[col]):
sales_columns.append(col)
continue
# Check if column contains numeric data that looks like currency
if pd.api.types.is_numeric_dtype(df[col]):
values = df[col].dropna()
if len(values) > 0:
# Check if values are positive and in reasonable range for currency
if values.min() >= 0 and values.max() < 1000000:
# Check if values have decimal places (common for currency)
decimal_count = sum(1 for v in values if v != int(v))
if decimal_count > len(values) * 0.1: # 10% have decimals
sales_columns.append(col)
return sales_columns
def _detect_category_columns(self, df: pd.DataFrame) -> List[str]:
"""Detect columns that likely contain category/classification data."""
category_keywords = [
'category', 'type', 'item', 'product', 'name', 'description',
'class', 'group', 'kind', 'menu', 'food', 'drink'
]
category_columns = []
for col in df.columns:
col_lower = str(col).lower()
# Check for category keywords
if any(keyword in col_lower for keyword in category_keywords):
if df[col].dtype == 'object': # Text column
category_columns.append(col)
continue
# Check if column contains text with reasonable variety
if df[col].dtype == 'object':
unique_count = df[col].nunique()
total_count = len(df[col].dropna())
# Good category column has some variety but not too much
if total_count > 0 and 2 <= unique_count <= total_count * 0.5:
category_columns.append(col)
return category_columns
def _apply_filters(self, df: pd.DataFrame, filters: Dict[str, Any]) -> pd.DataFrame:
"""Apply filters to the dataframe."""
filtered_df = df.copy()
try:
for column, filter_value in filters.items():
if column not in df.columns:
continue
if isinstance(filter_value, dict):
# Range filter
if 'min' in filter_value:
filtered_df = filtered_df[filtered_df[column] >= filter_value['min']]
if 'max' in filter_value:
filtered_df = filtered_df[filtered_df[column] <= filter_value['max']]
elif isinstance(filter_value, list):
# Include filter
filtered_df = filtered_df[filtered_df[column].isin(filter_value)]
else:
# Exact match filter
filtered_df = filtered_df[filtered_df[column] == filter_value]
return filtered_df
except Exception as e:
logger.error(f"❌ Failed to apply filters: {e}")
return df
def _analyze_sales_data(self, df: pd.DataFrame, sales_columns: List[str]) -> Dict[str, Any]:
"""Analyze sales data columns."""
sales_analysis = {}
for col in sales_columns:
if col not in df.columns:
continue
values = df[col].dropna()
if len(values) == 0:
continue
sales_analysis[col] = {
"total": float(values.sum()),
"formatted_total": self._format_currency(values.sum()),
"count": len(values),
"average": float(values.mean()),
"median": float(values.median()),
"min": float(values.min()),
"max": float(values.max()),
"std_dev": float(values.std()) if len(values) > 1 else 0
}
# Calculate overall totals if multiple sales columns
if len(sales_analysis) > 1:
overall_total = sum(analysis["total"] for analysis in sales_analysis.values())
sales_analysis["overall"] = {
"total": overall_total,
"formatted_total": self._format_currency(overall_total)
}
return sales_analysis
def _analyze_categories(self, df: pd.DataFrame, category_columns: List[str],
sales_columns: List[str]) -> Dict[str, Any]:
"""Analyze category distributions and their sales performance."""
category_analysis = {}
for cat_col in category_columns:
if cat_col not in df.columns:
continue
category_stats = {
"unique_categories": df[cat_col].nunique(),
"category_distribution": df[cat_col].value_counts().to_dict(),
"sales_by_category": {}
}
# Analyze sales by category
for sales_col in sales_columns:
if sales_col not in df.columns:
continue
sales_by_cat = df.groupby(cat_col)[sales_col].agg([
'sum', 'count', 'mean'
]).round(2)
category_stats["sales_by_category"][sales_col] = {}
for category, stats in sales_by_cat.iterrows():
category_stats["sales_by_category"][sales_col][category] = {
"total": float(stats['sum']),
"formatted_total": self._format_currency(stats['sum']),
"count": int(stats['count']),
"average": float(stats['mean'])
}
category_analysis[cat_col] = category_stats
return category_analysis
def _generate_statistical_summary(self, df: pd.DataFrame, sales_columns: List[str]) -> Dict[str, Any]:
"""Generate comprehensive statistical summary."""
summary = {
"data_shape": df.shape,
"missing_values": df.isnull().sum().to_dict(),
"data_types": df.dtypes.astype(str).to_dict(),
"numeric_summary": {}
}
# Detailed analysis for sales columns
for col in sales_columns:
if col in df.columns and pd.api.types.is_numeric_dtype(df[col]):
values = df[col].dropna()
if len(values) > 0:
summary["numeric_summary"][col] = {
"count": len(values),
"mean": float(values.mean()),
"std": float(values.std()) if len(values) > 1 else 0,
"min": float(values.min()),
"25%": float(values.quantile(0.25)),
"50%": float(values.quantile(0.50)),
"75%": float(values.quantile(0.75)),
"max": float(values.max()),
"sum": float(values.sum())
}
return summary
def _assess_data_quality(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Assess data quality and identify potential issues."""
quality_assessment = {
"completeness": {},
"consistency": {},
"validity": {},
"overall_score": 0
}
# Completeness check
total_cells = df.shape[0] * df.shape[1]
missing_cells = df.isnull().sum().sum()
completeness_score = ((total_cells - missing_cells) / total_cells) * 100 if total_cells > 0 else 0
quality_assessment["completeness"] = {
"score": completeness_score,
"missing_percentage": (missing_cells / total_cells) * 100 if total_cells > 0 else 0,
"columns_with_missing": df.columns[df.isnull().any()].tolist()
}
# Consistency check (for numeric columns)
numeric_columns = df.select_dtypes(include=[np.number]).columns
consistency_issues = []
for col in numeric_columns:
values = df[col].dropna()
if len(values) > 0:
# Check for negative values in sales data
if 'sales' in col.lower() or 'amount' in col.lower():
if (values < 0).any():
consistency_issues.append(f"{col}: Contains negative values")
# Check for extreme outliers
q1, q3 = values.quantile([0.25, 0.75])
iqr = q3 - q1
outliers = values[(values < q1 - 3*iqr) | (values > q3 + 3*iqr)]
if len(outliers) > 0:
consistency_issues.append(f"{col}: Contains {len(outliers)} extreme outliers")
quality_assessment["consistency"] = {
"issues": consistency_issues,
"score": max(0, 100 - len(consistency_issues) * 10)
}
# Overall quality score
quality_assessment["overall_score"] = (
completeness_score * 0.6 +
quality_assessment["consistency"]["score"] * 0.4
)
return quality_assessment
def _format_currency(self, amount: float, currency: str = "USD", decimal_places: int = 2) -> str:
"""Format amount as currency with specified decimal places."""
try:
# Round to specified decimal places
rounded_amount = Decimal(str(amount)).quantize(
Decimal('0.' + '0' * decimal_places),
rounding=ROUND_HALF_UP
)
if currency.upper() == "USD":
return f"${rounded_amount:.{decimal_places}f}"
else:
return f"{rounded_amount:.{decimal_places}f} {currency}"
except Exception as e:
logger.error(f"❌ Failed to format currency: {e}")
return f"{amount:.{decimal_places}f}"
def get_data_analysis_engine_tools() -> List[Any]:
"""Get data analysis engine tools for AGNO integration."""
from .base_tool import BaseTool
class DataAnalysisEngineTool(BaseTool):
"""Data analysis engine tool for GAIA agent."""
def __init__(self):
super().__init__(
name="data_analysis_engine",
description="Advanced data analysis for financial and categorical data"
)
self.engine = DataAnalysisEngine()
def execute(self, data: Union[pd.DataFrame, List[Dict]],
analysis_type: str = "financial",
**kwargs) -> Dict[str, Any]:
"""Execute data analysis."""
try:
if analysis_type == "financial":
return self.engine.analyze_financial_data(data, **kwargs)
elif analysis_type == "category_totals":
return self.engine.calculate_category_totals(data, **kwargs)
elif analysis_type == "food_vs_drinks":
return self.engine.detect_food_vs_drinks(data, **kwargs)
else:
return {"error": f"Unknown analysis type: {analysis_type}"}
except Exception as e:
return {"error": f"Data analysis failed: {str(e)}"}
return [DataAnalysisEngineTool()]