Spaces:
Sleeping
Sleeping
| """ | |
| Data Analysis Engine for GAIA Agent - Phase 4 | |
| Advanced data analysis capabilities for Excel and structured data | |
| Features: | |
| - Statistical analysis of Excel data | |
| - Data aggregation and summarization | |
| - Financial calculations and reporting | |
| - Category-based filtering (food vs drinks) | |
| - Currency formatting and precision handling | |
| - Data validation and quality checks | |
| """ | |
| import logging | |
| import pandas as pd | |
| import numpy as np | |
| from typing import Dict, Any, List, Optional, Union, Tuple | |
| from decimal import Decimal, ROUND_HALF_UP | |
| import re | |
| from datetime import datetime, date | |
| logger = logging.getLogger(__name__) | |
| class DataAnalysisEngine: | |
| """Advanced data analysis engine for GAIA evaluation tasks.""" | |
| def __init__(self): | |
| """Initialize the data analysis engine.""" | |
| self.available = True | |
| self.analysis_cache = {} | |
| def analyze_financial_data(self, data: Union[pd.DataFrame, List[Dict]], | |
| sales_columns: List[str] = None, | |
| category_columns: List[str] = None, | |
| filters: Dict[str, Any] = None) -> Dict[str, Any]: | |
| """ | |
| Perform comprehensive financial data analysis. | |
| Args: | |
| data: DataFrame or list of dictionaries containing the data | |
| sales_columns: Columns containing sales/financial data | |
| category_columns: Columns containing category information | |
| filters: Dictionary of filters to apply | |
| Returns: | |
| Comprehensive financial analysis results | |
| """ | |
| try: | |
| # Convert to DataFrame if needed | |
| if isinstance(data, list): | |
| df = pd.DataFrame(data) | |
| else: | |
| df = data.copy() | |
| if df.empty: | |
| return {"error": "No data provided for analysis"} | |
| # Auto-detect columns if not provided | |
| if sales_columns is None: | |
| sales_columns = self._detect_sales_columns(df) | |
| if category_columns is None: | |
| category_columns = self._detect_category_columns(df) | |
| # Apply filters | |
| filtered_df = self._apply_filters(df, filters) if filters else df | |
| # Perform analysis | |
| analysis_results = { | |
| "total_records": len(df), | |
| "filtered_records": len(filtered_df), | |
| "sales_analysis": self._analyze_sales_data(filtered_df, sales_columns), | |
| "category_analysis": self._analyze_categories(filtered_df, category_columns, sales_columns), | |
| "statistical_summary": self._generate_statistical_summary(filtered_df, sales_columns), | |
| "data_quality": self._assess_data_quality(filtered_df), | |
| "filters_applied": filters or {}, | |
| "columns_analyzed": { | |
| "sales_columns": sales_columns, | |
| "category_columns": category_columns | |
| } | |
| } | |
| return analysis_results | |
| except Exception as e: | |
| logger.error(f"❌ Financial data analysis failed: {e}") | |
| return {"error": f"Analysis failed: {str(e)}"} | |
| def calculate_category_totals(self, data: Union[pd.DataFrame, List[Dict]], | |
| category_column: str, | |
| sales_column: str, | |
| include_categories: List[str] = None, | |
| exclude_categories: List[str] = None) -> Dict[str, Any]: | |
| """ | |
| Calculate totals by category with inclusion/exclusion filters. | |
| Args: | |
| data: DataFrame or list of dictionaries | |
| category_column: Column containing categories | |
| sales_column: Column containing sales amounts | |
| include_categories: Categories to include | |
| exclude_categories: Categories to exclude | |
| Returns: | |
| Category totals and analysis | |
| """ | |
| try: | |
| # Convert to DataFrame if needed | |
| if isinstance(data, list): | |
| df = pd.DataFrame(data) | |
| else: | |
| df = data.copy() | |
| if df.empty or category_column not in df.columns or sales_column not in df.columns: | |
| return {"error": "Required columns not found in data"} | |
| # Clean and prepare data | |
| df[category_column] = df[category_column].astype(str).str.strip() | |
| df[sales_column] = pd.to_numeric(df[sales_column], errors='coerce') | |
| # Remove rows with invalid sales data | |
| df = df.dropna(subset=[sales_column]) | |
| # Apply category filters | |
| if include_categories: | |
| mask = df[category_column].str.lower().isin([cat.lower() for cat in include_categories]) | |
| df = df[mask] | |
| if exclude_categories: | |
| mask = ~df[category_column].str.lower().isin([cat.lower() for cat in exclude_categories]) | |
| df = df[mask] | |
| # Calculate totals by category | |
| category_totals = df.groupby(category_column)[sales_column].agg([ | |
| 'sum', 'count', 'mean', 'min', 'max' | |
| ]).round(2) | |
| # Calculate overall total | |
| overall_total = df[sales_column].sum() | |
| # Prepare results | |
| results = { | |
| "overall_total": float(overall_total), | |
| "formatted_total": self._format_currency(overall_total), | |
| "category_breakdown": {}, | |
| "summary": { | |
| "total_categories": len(category_totals), | |
| "total_items": len(df), | |
| "average_per_item": float(df[sales_column].mean()) if len(df) > 0 else 0 | |
| }, | |
| "filters_applied": { | |
| "include_categories": include_categories, | |
| "exclude_categories": exclude_categories | |
| } | |
| } | |
| # Add category breakdown | |
| for category, stats in category_totals.iterrows(): | |
| results["category_breakdown"][category] = { | |
| "total": float(stats['sum']), | |
| "formatted_total": self._format_currency(stats['sum']), | |
| "count": int(stats['count']), | |
| "average": float(stats['mean']), | |
| "min": float(stats['min']), | |
| "max": float(stats['max']), | |
| "percentage_of_total": float((stats['sum'] / overall_total * 100)) if overall_total > 0 else 0 | |
| } | |
| return results | |
| except Exception as e: | |
| logger.error(f"❌ Category totals calculation failed: {e}") | |
| return {"error": f"Calculation failed: {str(e)}"} | |
| def detect_food_vs_drinks(self, data: Union[pd.DataFrame, List[Dict]], | |
| category_columns: List[str] = None) -> Dict[str, Any]: | |
| """ | |
| Detect and categorize items as food vs drinks. | |
| Args: | |
| data: DataFrame or list of dictionaries | |
| category_columns: Columns to analyze for food/drink classification | |
| Returns: | |
| Classification results with food and drink items | |
| """ | |
| try: | |
| # Convert to DataFrame if needed | |
| if isinstance(data, list): | |
| df = pd.DataFrame(data) | |
| else: | |
| df = data.copy() | |
| if df.empty: | |
| return {"error": "No data provided"} | |
| # Auto-detect category columns if not provided | |
| if category_columns is None: | |
| category_columns = self._detect_category_columns(df) | |
| # Food and drink keywords | |
| food_keywords = [ | |
| 'burger', 'sandwich', 'pizza', 'salad', 'fries', 'chicken', 'beef', 'pork', | |
| 'fish', 'pasta', 'rice', 'bread', 'soup', 'steak', 'wings', 'nuggets', | |
| 'taco', 'burrito', 'wrap', 'hot dog', 'sub', 'panini', 'quesadilla', | |
| 'breakfast', 'lunch', 'dinner', 'appetizer', 'dessert', 'cake', 'pie', | |
| 'food', 'meal', 'dish', 'entree', 'side' | |
| ] | |
| drink_keywords = [ | |
| 'drink', 'beverage', 'soda', 'cola', 'pepsi', 'coke', 'sprite', 'fanta', | |
| 'coffee', 'tea', 'latte', 'cappuccino', 'espresso', 'mocha', | |
| 'juice', 'water', 'milk', 'shake', 'smoothie', 'beer', 'wine', | |
| 'cocktail', 'martini', 'whiskey', 'vodka', 'rum', 'gin', | |
| 'lemonade', 'iced tea', 'hot chocolate', 'energy drink' | |
| ] | |
| classification_results = { | |
| "food_items": [], | |
| "drink_items": [], | |
| "unclassified_items": [], | |
| "classification_summary": {} | |
| } | |
| # Analyze each category column | |
| for col in category_columns: | |
| if col not in df.columns: | |
| continue | |
| unique_items = df[col].dropna().unique() | |
| for item in unique_items: | |
| item_str = str(item).lower() | |
| # Check for food keywords | |
| is_food = any(keyword in item_str for keyword in food_keywords) | |
| # Check for drink keywords | |
| is_drink = any(keyword in item_str for keyword in drink_keywords) | |
| if is_food and not is_drink: | |
| classification_results["food_items"].append(str(item)) | |
| elif is_drink and not is_food: | |
| classification_results["drink_items"].append(str(item)) | |
| else: | |
| classification_results["unclassified_items"].append(str(item)) | |
| # Remove duplicates | |
| classification_results["food_items"] = list(set(classification_results["food_items"])) | |
| classification_results["drink_items"] = list(set(classification_results["drink_items"])) | |
| classification_results["unclassified_items"] = list(set(classification_results["unclassified_items"])) | |
| # Generate summary | |
| classification_results["classification_summary"] = { | |
| "total_items": len(classification_results["food_items"]) + | |
| len(classification_results["drink_items"]) + | |
| len(classification_results["unclassified_items"]), | |
| "food_count": len(classification_results["food_items"]), | |
| "drink_count": len(classification_results["drink_items"]), | |
| "unclassified_count": len(classification_results["unclassified_items"]), | |
| "classification_confidence": ( | |
| (len(classification_results["food_items"]) + len(classification_results["drink_items"])) / | |
| max(1, len(classification_results["food_items"]) + | |
| len(classification_results["drink_items"]) + | |
| len(classification_results["unclassified_items"])) | |
| ) * 100 | |
| } | |
| return classification_results | |
| except Exception as e: | |
| logger.error(f"❌ Food vs drinks detection failed: {e}") | |
| return {"error": f"Detection failed: {str(e)}"} | |
| def _detect_sales_columns(self, df: pd.DataFrame) -> List[str]: | |
| """Detect columns that likely contain sales/financial data.""" | |
| sales_keywords = [ | |
| 'sales', 'amount', 'total', 'price', 'cost', 'revenue', 'value', | |
| 'sum', 'subtotal', 'grand total', 'net', 'gross' | |
| ] | |
| sales_columns = [] | |
| for col in df.columns: | |
| col_lower = str(col).lower() | |
| # Check for sales keywords in column name | |
| if any(keyword in col_lower for keyword in sales_keywords): | |
| if pd.api.types.is_numeric_dtype(df[col]): | |
| sales_columns.append(col) | |
| continue | |
| # Check if column contains numeric data that looks like currency | |
| if pd.api.types.is_numeric_dtype(df[col]): | |
| values = df[col].dropna() | |
| if len(values) > 0: | |
| # Check if values are positive and in reasonable range for currency | |
| if values.min() >= 0 and values.max() < 1000000: | |
| # Check if values have decimal places (common for currency) | |
| decimal_count = sum(1 for v in values if v != int(v)) | |
| if decimal_count > len(values) * 0.1: # 10% have decimals | |
| sales_columns.append(col) | |
| return sales_columns | |
| def _detect_category_columns(self, df: pd.DataFrame) -> List[str]: | |
| """Detect columns that likely contain category/classification data.""" | |
| category_keywords = [ | |
| 'category', 'type', 'item', 'product', 'name', 'description', | |
| 'class', 'group', 'kind', 'menu', 'food', 'drink' | |
| ] | |
| category_columns = [] | |
| for col in df.columns: | |
| col_lower = str(col).lower() | |
| # Check for category keywords | |
| if any(keyword in col_lower for keyword in category_keywords): | |
| if df[col].dtype == 'object': # Text column | |
| category_columns.append(col) | |
| continue | |
| # Check if column contains text with reasonable variety | |
| if df[col].dtype == 'object': | |
| unique_count = df[col].nunique() | |
| total_count = len(df[col].dropna()) | |
| # Good category column has some variety but not too much | |
| if total_count > 0 and 2 <= unique_count <= total_count * 0.5: | |
| category_columns.append(col) | |
| return category_columns | |
| def _apply_filters(self, df: pd.DataFrame, filters: Dict[str, Any]) -> pd.DataFrame: | |
| """Apply filters to the dataframe.""" | |
| filtered_df = df.copy() | |
| try: | |
| for column, filter_value in filters.items(): | |
| if column not in df.columns: | |
| continue | |
| if isinstance(filter_value, dict): | |
| # Range filter | |
| if 'min' in filter_value: | |
| filtered_df = filtered_df[filtered_df[column] >= filter_value['min']] | |
| if 'max' in filter_value: | |
| filtered_df = filtered_df[filtered_df[column] <= filter_value['max']] | |
| elif isinstance(filter_value, list): | |
| # Include filter | |
| filtered_df = filtered_df[filtered_df[column].isin(filter_value)] | |
| else: | |
| # Exact match filter | |
| filtered_df = filtered_df[filtered_df[column] == filter_value] | |
| return filtered_df | |
| except Exception as e: | |
| logger.error(f"❌ Failed to apply filters: {e}") | |
| return df | |
| def _analyze_sales_data(self, df: pd.DataFrame, sales_columns: List[str]) -> Dict[str, Any]: | |
| """Analyze sales data columns.""" | |
| sales_analysis = {} | |
| for col in sales_columns: | |
| if col not in df.columns: | |
| continue | |
| values = df[col].dropna() | |
| if len(values) == 0: | |
| continue | |
| sales_analysis[col] = { | |
| "total": float(values.sum()), | |
| "formatted_total": self._format_currency(values.sum()), | |
| "count": len(values), | |
| "average": float(values.mean()), | |
| "median": float(values.median()), | |
| "min": float(values.min()), | |
| "max": float(values.max()), | |
| "std_dev": float(values.std()) if len(values) > 1 else 0 | |
| } | |
| # Calculate overall totals if multiple sales columns | |
| if len(sales_analysis) > 1: | |
| overall_total = sum(analysis["total"] for analysis in sales_analysis.values()) | |
| sales_analysis["overall"] = { | |
| "total": overall_total, | |
| "formatted_total": self._format_currency(overall_total) | |
| } | |
| return sales_analysis | |
| def _analyze_categories(self, df: pd.DataFrame, category_columns: List[str], | |
| sales_columns: List[str]) -> Dict[str, Any]: | |
| """Analyze category distributions and their sales performance.""" | |
| category_analysis = {} | |
| for cat_col in category_columns: | |
| if cat_col not in df.columns: | |
| continue | |
| category_stats = { | |
| "unique_categories": df[cat_col].nunique(), | |
| "category_distribution": df[cat_col].value_counts().to_dict(), | |
| "sales_by_category": {} | |
| } | |
| # Analyze sales by category | |
| for sales_col in sales_columns: | |
| if sales_col not in df.columns: | |
| continue | |
| sales_by_cat = df.groupby(cat_col)[sales_col].agg([ | |
| 'sum', 'count', 'mean' | |
| ]).round(2) | |
| category_stats["sales_by_category"][sales_col] = {} | |
| for category, stats in sales_by_cat.iterrows(): | |
| category_stats["sales_by_category"][sales_col][category] = { | |
| "total": float(stats['sum']), | |
| "formatted_total": self._format_currency(stats['sum']), | |
| "count": int(stats['count']), | |
| "average": float(stats['mean']) | |
| } | |
| category_analysis[cat_col] = category_stats | |
| return category_analysis | |
| def _generate_statistical_summary(self, df: pd.DataFrame, sales_columns: List[str]) -> Dict[str, Any]: | |
| """Generate comprehensive statistical summary.""" | |
| summary = { | |
| "data_shape": df.shape, | |
| "missing_values": df.isnull().sum().to_dict(), | |
| "data_types": df.dtypes.astype(str).to_dict(), | |
| "numeric_summary": {} | |
| } | |
| # Detailed analysis for sales columns | |
| for col in sales_columns: | |
| if col in df.columns and pd.api.types.is_numeric_dtype(df[col]): | |
| values = df[col].dropna() | |
| if len(values) > 0: | |
| summary["numeric_summary"][col] = { | |
| "count": len(values), | |
| "mean": float(values.mean()), | |
| "std": float(values.std()) if len(values) > 1 else 0, | |
| "min": float(values.min()), | |
| "25%": float(values.quantile(0.25)), | |
| "50%": float(values.quantile(0.50)), | |
| "75%": float(values.quantile(0.75)), | |
| "max": float(values.max()), | |
| "sum": float(values.sum()) | |
| } | |
| return summary | |
| def _assess_data_quality(self, df: pd.DataFrame) -> Dict[str, Any]: | |
| """Assess data quality and identify potential issues.""" | |
| quality_assessment = { | |
| "completeness": {}, | |
| "consistency": {}, | |
| "validity": {}, | |
| "overall_score": 0 | |
| } | |
| # Completeness check | |
| total_cells = df.shape[0] * df.shape[1] | |
| missing_cells = df.isnull().sum().sum() | |
| completeness_score = ((total_cells - missing_cells) / total_cells) * 100 if total_cells > 0 else 0 | |
| quality_assessment["completeness"] = { | |
| "score": completeness_score, | |
| "missing_percentage": (missing_cells / total_cells) * 100 if total_cells > 0 else 0, | |
| "columns_with_missing": df.columns[df.isnull().any()].tolist() | |
| } | |
| # Consistency check (for numeric columns) | |
| numeric_columns = df.select_dtypes(include=[np.number]).columns | |
| consistency_issues = [] | |
| for col in numeric_columns: | |
| values = df[col].dropna() | |
| if len(values) > 0: | |
| # Check for negative values in sales data | |
| if 'sales' in col.lower() or 'amount' in col.lower(): | |
| if (values < 0).any(): | |
| consistency_issues.append(f"{col}: Contains negative values") | |
| # Check for extreme outliers | |
| q1, q3 = values.quantile([0.25, 0.75]) | |
| iqr = q3 - q1 | |
| outliers = values[(values < q1 - 3*iqr) | (values > q3 + 3*iqr)] | |
| if len(outliers) > 0: | |
| consistency_issues.append(f"{col}: Contains {len(outliers)} extreme outliers") | |
| quality_assessment["consistency"] = { | |
| "issues": consistency_issues, | |
| "score": max(0, 100 - len(consistency_issues) * 10) | |
| } | |
| # Overall quality score | |
| quality_assessment["overall_score"] = ( | |
| completeness_score * 0.6 + | |
| quality_assessment["consistency"]["score"] * 0.4 | |
| ) | |
| return quality_assessment | |
| def _format_currency(self, amount: float, currency: str = "USD", decimal_places: int = 2) -> str: | |
| """Format amount as currency with specified decimal places.""" | |
| try: | |
| # Round to specified decimal places | |
| rounded_amount = Decimal(str(amount)).quantize( | |
| Decimal('0.' + '0' * decimal_places), | |
| rounding=ROUND_HALF_UP | |
| ) | |
| if currency.upper() == "USD": | |
| return f"${rounded_amount:.{decimal_places}f}" | |
| else: | |
| return f"{rounded_amount:.{decimal_places}f} {currency}" | |
| except Exception as e: | |
| logger.error(f"❌ Failed to format currency: {e}") | |
| return f"{amount:.{decimal_places}f}" | |
| def get_data_analysis_engine_tools() -> List[Any]: | |
| """Get data analysis engine tools for AGNO integration.""" | |
| from .base_tool import BaseTool | |
| class DataAnalysisEngineTool(BaseTool): | |
| """Data analysis engine tool for GAIA agent.""" | |
| def __init__(self): | |
| super().__init__( | |
| name="data_analysis_engine", | |
| description="Advanced data analysis for financial and categorical data" | |
| ) | |
| self.engine = DataAnalysisEngine() | |
| def execute(self, data: Union[pd.DataFrame, List[Dict]], | |
| analysis_type: str = "financial", | |
| **kwargs) -> Dict[str, Any]: | |
| """Execute data analysis.""" | |
| try: | |
| if analysis_type == "financial": | |
| return self.engine.analyze_financial_data(data, **kwargs) | |
| elif analysis_type == "category_totals": | |
| return self.engine.calculate_category_totals(data, **kwargs) | |
| elif analysis_type == "food_vs_drinks": | |
| return self.engine.detect_food_vs_drinks(data, **kwargs) | |
| else: | |
| return {"error": f"Unknown analysis type: {analysis_type}"} | |
| except Exception as e: | |
| return {"error": f"Data analysis failed: {str(e)}"} | |
| return [DataAnalysisEngineTool()] |