Spaces:
Running
Running
""" | |
Data Analysis Engine for GAIA Agent - Phase 4 | |
Advanced data analysis capabilities for Excel and structured data | |
Features: | |
- Statistical analysis of Excel data | |
- Data aggregation and summarization | |
- Financial calculations and reporting | |
- Category-based filtering (food vs drinks) | |
- Currency formatting and precision handling | |
- Data validation and quality checks | |
""" | |
import logging | |
import pandas as pd | |
import numpy as np | |
from typing import Dict, Any, List, Optional, Union, Tuple | |
from decimal import Decimal, ROUND_HALF_UP | |
import re | |
from datetime import datetime, date | |
logger = logging.getLogger(__name__) | |
class DataAnalysisEngine: | |
"""Advanced data analysis engine for GAIA evaluation tasks.""" | |
def __init__(self): | |
"""Initialize the data analysis engine.""" | |
self.available = True | |
self.analysis_cache = {} | |
def analyze_financial_data(self, data: Union[pd.DataFrame, List[Dict]], | |
sales_columns: List[str] = None, | |
category_columns: List[str] = None, | |
filters: Dict[str, Any] = None) -> Dict[str, Any]: | |
""" | |
Perform comprehensive financial data analysis. | |
Args: | |
data: DataFrame or list of dictionaries containing the data | |
sales_columns: Columns containing sales/financial data | |
category_columns: Columns containing category information | |
filters: Dictionary of filters to apply | |
Returns: | |
Comprehensive financial analysis results | |
""" | |
try: | |
# Convert to DataFrame if needed | |
if isinstance(data, list): | |
df = pd.DataFrame(data) | |
else: | |
df = data.copy() | |
if df.empty: | |
return {"error": "No data provided for analysis"} | |
# Auto-detect columns if not provided | |
if sales_columns is None: | |
sales_columns = self._detect_sales_columns(df) | |
if category_columns is None: | |
category_columns = self._detect_category_columns(df) | |
# Apply filters | |
filtered_df = self._apply_filters(df, filters) if filters else df | |
# Perform analysis | |
analysis_results = { | |
"total_records": len(df), | |
"filtered_records": len(filtered_df), | |
"sales_analysis": self._analyze_sales_data(filtered_df, sales_columns), | |
"category_analysis": self._analyze_categories(filtered_df, category_columns, sales_columns), | |
"statistical_summary": self._generate_statistical_summary(filtered_df, sales_columns), | |
"data_quality": self._assess_data_quality(filtered_df), | |
"filters_applied": filters or {}, | |
"columns_analyzed": { | |
"sales_columns": sales_columns, | |
"category_columns": category_columns | |
} | |
} | |
return analysis_results | |
except Exception as e: | |
logger.error(f"❌ Financial data analysis failed: {e}") | |
return {"error": f"Analysis failed: {str(e)}"} | |
def calculate_category_totals(self, data: Union[pd.DataFrame, List[Dict]], | |
category_column: str, | |
sales_column: str, | |
include_categories: List[str] = None, | |
exclude_categories: List[str] = None) -> Dict[str, Any]: | |
""" | |
Calculate totals by category with inclusion/exclusion filters. | |
Args: | |
data: DataFrame or list of dictionaries | |
category_column: Column containing categories | |
sales_column: Column containing sales amounts | |
include_categories: Categories to include | |
exclude_categories: Categories to exclude | |
Returns: | |
Category totals and analysis | |
""" | |
try: | |
# Convert to DataFrame if needed | |
if isinstance(data, list): | |
df = pd.DataFrame(data) | |
else: | |
df = data.copy() | |
if df.empty or category_column not in df.columns or sales_column not in df.columns: | |
return {"error": "Required columns not found in data"} | |
# Clean and prepare data | |
df[category_column] = df[category_column].astype(str).str.strip() | |
df[sales_column] = pd.to_numeric(df[sales_column], errors='coerce') | |
# Remove rows with invalid sales data | |
df = df.dropna(subset=[sales_column]) | |
# Apply category filters | |
if include_categories: | |
mask = df[category_column].str.lower().isin([cat.lower() for cat in include_categories]) | |
df = df[mask] | |
if exclude_categories: | |
mask = ~df[category_column].str.lower().isin([cat.lower() for cat in exclude_categories]) | |
df = df[mask] | |
# Calculate totals by category | |
category_totals = df.groupby(category_column)[sales_column].agg([ | |
'sum', 'count', 'mean', 'min', 'max' | |
]).round(2) | |
# Calculate overall total | |
overall_total = df[sales_column].sum() | |
# Prepare results | |
results = { | |
"overall_total": float(overall_total), | |
"formatted_total": self._format_currency(overall_total), | |
"category_breakdown": {}, | |
"summary": { | |
"total_categories": len(category_totals), | |
"total_items": len(df), | |
"average_per_item": float(df[sales_column].mean()) if len(df) > 0 else 0 | |
}, | |
"filters_applied": { | |
"include_categories": include_categories, | |
"exclude_categories": exclude_categories | |
} | |
} | |
# Add category breakdown | |
for category, stats in category_totals.iterrows(): | |
results["category_breakdown"][category] = { | |
"total": float(stats['sum']), | |
"formatted_total": self._format_currency(stats['sum']), | |
"count": int(stats['count']), | |
"average": float(stats['mean']), | |
"min": float(stats['min']), | |
"max": float(stats['max']), | |
"percentage_of_total": float((stats['sum'] / overall_total * 100)) if overall_total > 0 else 0 | |
} | |
return results | |
except Exception as e: | |
logger.error(f"❌ Category totals calculation failed: {e}") | |
return {"error": f"Calculation failed: {str(e)}"} | |
def detect_food_vs_drinks(self, data: Union[pd.DataFrame, List[Dict]], | |
category_columns: List[str] = None) -> Dict[str, Any]: | |
""" | |
Detect and categorize items as food vs drinks. | |
Args: | |
data: DataFrame or list of dictionaries | |
category_columns: Columns to analyze for food/drink classification | |
Returns: | |
Classification results with food and drink items | |
""" | |
try: | |
# Convert to DataFrame if needed | |
if isinstance(data, list): | |
df = pd.DataFrame(data) | |
else: | |
df = data.copy() | |
if df.empty: | |
return {"error": "No data provided"} | |
# Auto-detect category columns if not provided | |
if category_columns is None: | |
category_columns = self._detect_category_columns(df) | |
# Food and drink keywords | |
food_keywords = [ | |
'burger', 'sandwich', 'pizza', 'salad', 'fries', 'chicken', 'beef', 'pork', | |
'fish', 'pasta', 'rice', 'bread', 'soup', 'steak', 'wings', 'nuggets', | |
'taco', 'burrito', 'wrap', 'hot dog', 'sub', 'panini', 'quesadilla', | |
'breakfast', 'lunch', 'dinner', 'appetizer', 'dessert', 'cake', 'pie', | |
'food', 'meal', 'dish', 'entree', 'side' | |
] | |
drink_keywords = [ | |
'drink', 'beverage', 'soda', 'cola', 'pepsi', 'coke', 'sprite', 'fanta', | |
'coffee', 'tea', 'latte', 'cappuccino', 'espresso', 'mocha', | |
'juice', 'water', 'milk', 'shake', 'smoothie', 'beer', 'wine', | |
'cocktail', 'martini', 'whiskey', 'vodka', 'rum', 'gin', | |
'lemonade', 'iced tea', 'hot chocolate', 'energy drink' | |
] | |
classification_results = { | |
"food_items": [], | |
"drink_items": [], | |
"unclassified_items": [], | |
"classification_summary": {} | |
} | |
# Analyze each category column | |
for col in category_columns: | |
if col not in df.columns: | |
continue | |
unique_items = df[col].dropna().unique() | |
for item in unique_items: | |
item_str = str(item).lower() | |
# Check for food keywords | |
is_food = any(keyword in item_str for keyword in food_keywords) | |
# Check for drink keywords | |
is_drink = any(keyword in item_str for keyword in drink_keywords) | |
if is_food and not is_drink: | |
classification_results["food_items"].append(str(item)) | |
elif is_drink and not is_food: | |
classification_results["drink_items"].append(str(item)) | |
else: | |
classification_results["unclassified_items"].append(str(item)) | |
# Remove duplicates | |
classification_results["food_items"] = list(set(classification_results["food_items"])) | |
classification_results["drink_items"] = list(set(classification_results["drink_items"])) | |
classification_results["unclassified_items"] = list(set(classification_results["unclassified_items"])) | |
# Generate summary | |
classification_results["classification_summary"] = { | |
"total_items": len(classification_results["food_items"]) + | |
len(classification_results["drink_items"]) + | |
len(classification_results["unclassified_items"]), | |
"food_count": len(classification_results["food_items"]), | |
"drink_count": len(classification_results["drink_items"]), | |
"unclassified_count": len(classification_results["unclassified_items"]), | |
"classification_confidence": ( | |
(len(classification_results["food_items"]) + len(classification_results["drink_items"])) / | |
max(1, len(classification_results["food_items"]) + | |
len(classification_results["drink_items"]) + | |
len(classification_results["unclassified_items"])) | |
) * 100 | |
} | |
return classification_results | |
except Exception as e: | |
logger.error(f"❌ Food vs drinks detection failed: {e}") | |
return {"error": f"Detection failed: {str(e)}"} | |
def _detect_sales_columns(self, df: pd.DataFrame) -> List[str]: | |
"""Detect columns that likely contain sales/financial data.""" | |
sales_keywords = [ | |
'sales', 'amount', 'total', 'price', 'cost', 'revenue', 'value', | |
'sum', 'subtotal', 'grand total', 'net', 'gross' | |
] | |
sales_columns = [] | |
for col in df.columns: | |
col_lower = str(col).lower() | |
# Check for sales keywords in column name | |
if any(keyword in col_lower for keyword in sales_keywords): | |
if pd.api.types.is_numeric_dtype(df[col]): | |
sales_columns.append(col) | |
continue | |
# Check if column contains numeric data that looks like currency | |
if pd.api.types.is_numeric_dtype(df[col]): | |
values = df[col].dropna() | |
if len(values) > 0: | |
# Check if values are positive and in reasonable range for currency | |
if values.min() >= 0 and values.max() < 1000000: | |
# Check if values have decimal places (common for currency) | |
decimal_count = sum(1 for v in values if v != int(v)) | |
if decimal_count > len(values) * 0.1: # 10% have decimals | |
sales_columns.append(col) | |
return sales_columns | |
def _detect_category_columns(self, df: pd.DataFrame) -> List[str]: | |
"""Detect columns that likely contain category/classification data.""" | |
category_keywords = [ | |
'category', 'type', 'item', 'product', 'name', 'description', | |
'class', 'group', 'kind', 'menu', 'food', 'drink' | |
] | |
category_columns = [] | |
for col in df.columns: | |
col_lower = str(col).lower() | |
# Check for category keywords | |
if any(keyword in col_lower for keyword in category_keywords): | |
if df[col].dtype == 'object': # Text column | |
category_columns.append(col) | |
continue | |
# Check if column contains text with reasonable variety | |
if df[col].dtype == 'object': | |
unique_count = df[col].nunique() | |
total_count = len(df[col].dropna()) | |
# Good category column has some variety but not too much | |
if total_count > 0 and 2 <= unique_count <= total_count * 0.5: | |
category_columns.append(col) | |
return category_columns | |
def _apply_filters(self, df: pd.DataFrame, filters: Dict[str, Any]) -> pd.DataFrame: | |
"""Apply filters to the dataframe.""" | |
filtered_df = df.copy() | |
try: | |
for column, filter_value in filters.items(): | |
if column not in df.columns: | |
continue | |
if isinstance(filter_value, dict): | |
# Range filter | |
if 'min' in filter_value: | |
filtered_df = filtered_df[filtered_df[column] >= filter_value['min']] | |
if 'max' in filter_value: | |
filtered_df = filtered_df[filtered_df[column] <= filter_value['max']] | |
elif isinstance(filter_value, list): | |
# Include filter | |
filtered_df = filtered_df[filtered_df[column].isin(filter_value)] | |
else: | |
# Exact match filter | |
filtered_df = filtered_df[filtered_df[column] == filter_value] | |
return filtered_df | |
except Exception as e: | |
logger.error(f"❌ Failed to apply filters: {e}") | |
return df | |
def _analyze_sales_data(self, df: pd.DataFrame, sales_columns: List[str]) -> Dict[str, Any]: | |
"""Analyze sales data columns.""" | |
sales_analysis = {} | |
for col in sales_columns: | |
if col not in df.columns: | |
continue | |
values = df[col].dropna() | |
if len(values) == 0: | |
continue | |
sales_analysis[col] = { | |
"total": float(values.sum()), | |
"formatted_total": self._format_currency(values.sum()), | |
"count": len(values), | |
"average": float(values.mean()), | |
"median": float(values.median()), | |
"min": float(values.min()), | |
"max": float(values.max()), | |
"std_dev": float(values.std()) if len(values) > 1 else 0 | |
} | |
# Calculate overall totals if multiple sales columns | |
if len(sales_analysis) > 1: | |
overall_total = sum(analysis["total"] for analysis in sales_analysis.values()) | |
sales_analysis["overall"] = { | |
"total": overall_total, | |
"formatted_total": self._format_currency(overall_total) | |
} | |
return sales_analysis | |
def _analyze_categories(self, df: pd.DataFrame, category_columns: List[str], | |
sales_columns: List[str]) -> Dict[str, Any]: | |
"""Analyze category distributions and their sales performance.""" | |
category_analysis = {} | |
for cat_col in category_columns: | |
if cat_col not in df.columns: | |
continue | |
category_stats = { | |
"unique_categories": df[cat_col].nunique(), | |
"category_distribution": df[cat_col].value_counts().to_dict(), | |
"sales_by_category": {} | |
} | |
# Analyze sales by category | |
for sales_col in sales_columns: | |
if sales_col not in df.columns: | |
continue | |
sales_by_cat = df.groupby(cat_col)[sales_col].agg([ | |
'sum', 'count', 'mean' | |
]).round(2) | |
category_stats["sales_by_category"][sales_col] = {} | |
for category, stats in sales_by_cat.iterrows(): | |
category_stats["sales_by_category"][sales_col][category] = { | |
"total": float(stats['sum']), | |
"formatted_total": self._format_currency(stats['sum']), | |
"count": int(stats['count']), | |
"average": float(stats['mean']) | |
} | |
category_analysis[cat_col] = category_stats | |
return category_analysis | |
def _generate_statistical_summary(self, df: pd.DataFrame, sales_columns: List[str]) -> Dict[str, Any]: | |
"""Generate comprehensive statistical summary.""" | |
summary = { | |
"data_shape": df.shape, | |
"missing_values": df.isnull().sum().to_dict(), | |
"data_types": df.dtypes.astype(str).to_dict(), | |
"numeric_summary": {} | |
} | |
# Detailed analysis for sales columns | |
for col in sales_columns: | |
if col in df.columns and pd.api.types.is_numeric_dtype(df[col]): | |
values = df[col].dropna() | |
if len(values) > 0: | |
summary["numeric_summary"][col] = { | |
"count": len(values), | |
"mean": float(values.mean()), | |
"std": float(values.std()) if len(values) > 1 else 0, | |
"min": float(values.min()), | |
"25%": float(values.quantile(0.25)), | |
"50%": float(values.quantile(0.50)), | |
"75%": float(values.quantile(0.75)), | |
"max": float(values.max()), | |
"sum": float(values.sum()) | |
} | |
return summary | |
def _assess_data_quality(self, df: pd.DataFrame) -> Dict[str, Any]: | |
"""Assess data quality and identify potential issues.""" | |
quality_assessment = { | |
"completeness": {}, | |
"consistency": {}, | |
"validity": {}, | |
"overall_score": 0 | |
} | |
# Completeness check | |
total_cells = df.shape[0] * df.shape[1] | |
missing_cells = df.isnull().sum().sum() | |
completeness_score = ((total_cells - missing_cells) / total_cells) * 100 if total_cells > 0 else 0 | |
quality_assessment["completeness"] = { | |
"score": completeness_score, | |
"missing_percentage": (missing_cells / total_cells) * 100 if total_cells > 0 else 0, | |
"columns_with_missing": df.columns[df.isnull().any()].tolist() | |
} | |
# Consistency check (for numeric columns) | |
numeric_columns = df.select_dtypes(include=[np.number]).columns | |
consistency_issues = [] | |
for col in numeric_columns: | |
values = df[col].dropna() | |
if len(values) > 0: | |
# Check for negative values in sales data | |
if 'sales' in col.lower() or 'amount' in col.lower(): | |
if (values < 0).any(): | |
consistency_issues.append(f"{col}: Contains negative values") | |
# Check for extreme outliers | |
q1, q3 = values.quantile([0.25, 0.75]) | |
iqr = q3 - q1 | |
outliers = values[(values < q1 - 3*iqr) | (values > q3 + 3*iqr)] | |
if len(outliers) > 0: | |
consistency_issues.append(f"{col}: Contains {len(outliers)} extreme outliers") | |
quality_assessment["consistency"] = { | |
"issues": consistency_issues, | |
"score": max(0, 100 - len(consistency_issues) * 10) | |
} | |
# Overall quality score | |
quality_assessment["overall_score"] = ( | |
completeness_score * 0.6 + | |
quality_assessment["consistency"]["score"] * 0.4 | |
) | |
return quality_assessment | |
def _format_currency(self, amount: float, currency: str = "USD", decimal_places: int = 2) -> str: | |
"""Format amount as currency with specified decimal places.""" | |
try: | |
# Round to specified decimal places | |
rounded_amount = Decimal(str(amount)).quantize( | |
Decimal('0.' + '0' * decimal_places), | |
rounding=ROUND_HALF_UP | |
) | |
if currency.upper() == "USD": | |
return f"${rounded_amount:.{decimal_places}f}" | |
else: | |
return f"{rounded_amount:.{decimal_places}f} {currency}" | |
except Exception as e: | |
logger.error(f"❌ Failed to format currency: {e}") | |
return f"{amount:.{decimal_places}f}" | |
def get_data_analysis_engine_tools() -> List[Any]: | |
"""Get data analysis engine tools for AGNO integration.""" | |
from .base_tool import BaseTool | |
class DataAnalysisEngineTool(BaseTool): | |
"""Data analysis engine tool for GAIA agent.""" | |
def __init__(self): | |
super().__init__( | |
name="data_analysis_engine", | |
description="Advanced data analysis for financial and categorical data" | |
) | |
self.engine = DataAnalysisEngine() | |
def execute(self, data: Union[pd.DataFrame, List[Dict]], | |
analysis_type: str = "financial", | |
**kwargs) -> Dict[str, Any]: | |
"""Execute data analysis.""" | |
try: | |
if analysis_type == "financial": | |
return self.engine.analyze_financial_data(data, **kwargs) | |
elif analysis_type == "category_totals": | |
return self.engine.calculate_category_totals(data, **kwargs) | |
elif analysis_type == "food_vs_drinks": | |
return self.engine.detect_food_vs_drinks(data, **kwargs) | |
else: | |
return {"error": f"Unknown analysis type: {analysis_type}"} | |
except Exception as e: | |
return {"error": f"Data analysis failed: {str(e)}"} | |
return [DataAnalysisEngineTool()] |