Spaces:
Sleeping
Sleeping
| """ | |
| Enhanced Excel Processing Tool for GAIA Agent - Phase 4 | |
| Advanced Excel file reading, processing, and data analysis capabilities | |
| Features: | |
| - Multi-sheet Excel processing with openpyxl and pandas | |
| - Formula evaluation and calculation | |
| - Data type detection and conversion | |
| - Cell range analysis and aggregation | |
| - Conditional data filtering and grouping | |
| - Financial calculations with currency formatting | |
| """ | |
| import os | |
| import logging | |
| import pandas as pd | |
| import numpy as np | |
| from typing import Dict, Any, List, Optional, Union, Tuple | |
| from pathlib import Path | |
| import re | |
| from decimal import Decimal, ROUND_HALF_UP | |
| try: | |
| import openpyxl | |
| from openpyxl import load_workbook | |
| from openpyxl.utils import get_column_letter, column_index_from_string | |
| OPENPYXL_AVAILABLE = True | |
| except ImportError: | |
| OPENPYXL_AVAILABLE = False | |
| try: | |
| import xlrd | |
| XLRD_AVAILABLE = True | |
| except ImportError: | |
| XLRD_AVAILABLE = False | |
| logger = logging.getLogger(__name__) | |
| class ExcelProcessor: | |
| """Enhanced Excel processor for GAIA data analysis tasks.""" | |
| def __init__(self): | |
| """Initialize the Excel processor.""" | |
| self.available = OPENPYXL_AVAILABLE | |
| self.workbook = None | |
| self.sheets_data = {} | |
| self.sheet_names = [] | |
| if not self.available: | |
| logger.warning("⚠️ openpyxl not available - Excel processing limited") | |
| def load_excel_file(self, file_path: str) -> Dict[str, Any]: | |
| """ | |
| Load Excel file and return comprehensive data structure. | |
| Args: | |
| file_path: Path to Excel file | |
| Returns: | |
| Dictionary containing sheets data and metadata | |
| """ | |
| try: | |
| file_path = Path(file_path) | |
| if not file_path.exists(): | |
| raise FileNotFoundError(f"Excel file not found: {file_path}") | |
| # Determine file type and load accordingly | |
| if file_path.suffix.lower() == '.csv': | |
| return self._load_csv_file(file_path) | |
| elif file_path.suffix.lower() in ['.xlsx', '.xlsm']: | |
| return self._load_xlsx_file(file_path) | |
| elif file_path.suffix.lower() == '.xls' and XLRD_AVAILABLE: | |
| return self._load_xls_file(file_path) | |
| else: | |
| # Try pandas as fallback | |
| return self._load_with_pandas(file_path) | |
| except Exception as e: | |
| logger.error(f"❌ Failed to load Excel file {file_path}: {e}") | |
| return {"error": str(e), "sheets": {}, "metadata": {}} | |
| def _load_xlsx_file(self, file_path: Path) -> Dict[str, Any]: | |
| """Load .xlsx file using openpyxl for advanced features.""" | |
| if not OPENPYXL_AVAILABLE: | |
| return self._load_with_pandas(file_path) | |
| try: | |
| # Load workbook with openpyxl for formula access | |
| self.workbook = load_workbook(file_path, data_only=False) | |
| workbook_data_only = load_workbook(file_path, data_only=True) | |
| sheets_data = {} | |
| metadata = { | |
| "file_path": str(file_path), | |
| "file_size": file_path.stat().st_size, | |
| "sheet_count": len(self.workbook.sheetnames), | |
| "sheet_names": self.workbook.sheetnames | |
| } | |
| for sheet_name in self.workbook.sheetnames: | |
| sheet_data = self._process_worksheet( | |
| self.workbook[sheet_name], | |
| workbook_data_only[sheet_name], | |
| sheet_name | |
| ) | |
| sheets_data[sheet_name] = sheet_data | |
| self.sheets_data = sheets_data | |
| self.sheet_names = self.workbook.sheetnames | |
| return { | |
| "sheets": sheets_data, | |
| "metadata": metadata, | |
| "success": True | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Failed to load XLSX file: {e}") | |
| return {"error": str(e), "sheets": {}, "metadata": {}} | |
| def _load_xls_file(self, file_path: Path) -> Dict[str, Any]: | |
| """Load .xls file using xlrd.""" | |
| try: | |
| # Use pandas for .xls files | |
| return self._load_with_pandas(file_path) | |
| except Exception as e: | |
| logger.error(f"❌ Failed to load XLS file: {e}") | |
| return {"error": str(e), "sheets": {}, "metadata": {}} | |
| def _load_csv_file(self, file_path: Path) -> Dict[str, Any]: | |
| """Load CSV file as single sheet.""" | |
| try: | |
| df = pd.read_csv(file_path) | |
| # Process the dataframe | |
| processed_data = self._process_dataframe(df, "Sheet1") | |
| metadata = { | |
| "file_path": str(file_path), | |
| "file_size": file_path.stat().st_size, | |
| "sheet_count": 1, | |
| "sheet_names": ["Sheet1"] | |
| } | |
| return { | |
| "sheets": {"Sheet1": processed_data}, | |
| "metadata": metadata, | |
| "success": True | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Failed to load CSV file: {e}") | |
| return {"error": str(e), "sheets": {}, "metadata": {}} | |
| def _load_with_pandas(self, file_path: Path) -> Dict[str, Any]: | |
| """Load Excel file using pandas as fallback.""" | |
| try: | |
| # Read all sheets | |
| if file_path.suffix.lower() == '.csv': | |
| sheets_dict = {"Sheet1": pd.read_csv(file_path)} | |
| else: | |
| sheets_dict = pd.read_excel(file_path, sheet_name=None) | |
| sheets_data = {} | |
| for sheet_name, df in sheets_dict.items(): | |
| sheets_data[sheet_name] = self._process_dataframe(df, sheet_name) | |
| metadata = { | |
| "file_path": str(file_path), | |
| "file_size": file_path.stat().st_size, | |
| "sheet_count": len(sheets_dict), | |
| "sheet_names": list(sheets_dict.keys()) | |
| } | |
| return { | |
| "sheets": sheets_data, | |
| "metadata": metadata, | |
| "success": True | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Failed to load with pandas: {e}") | |
| return {"error": str(e), "sheets": {}, "metadata": {}} | |
| def _process_worksheet(self, worksheet, worksheet_data_only, sheet_name: str) -> Dict[str, Any]: | |
| """Process individual worksheet with openpyxl.""" | |
| try: | |
| # Get dimensions | |
| max_row = worksheet.max_row | |
| max_col = worksheet.max_column | |
| # Extract data with formulas and values | |
| data_with_formulas = [] | |
| data_values_only = [] | |
| for row in range(1, max_row + 1): | |
| row_formulas = [] | |
| row_values = [] | |
| for col in range(1, max_col + 1): | |
| # Get cell with formula | |
| cell_formula = worksheet.cell(row=row, column=col) | |
| # Get cell with calculated value | |
| cell_value = worksheet_data_only.cell(row=row, column=col) | |
| row_formulas.append({ | |
| 'value': cell_formula.value, | |
| 'formula': cell_formula.value if isinstance(cell_formula.value, str) and cell_formula.value.startswith('=') else None, | |
| 'data_type': str(type(cell_formula.value).__name__) | |
| }) | |
| row_values.append(cell_value.value) | |
| data_with_formulas.append(row_formulas) | |
| data_values_only.append(row_values) | |
| # Convert to DataFrame for easier analysis | |
| df = pd.DataFrame(data_values_only) | |
| # Process the dataframe | |
| processed_data = self._process_dataframe(df, sheet_name) | |
| # Add formula information | |
| processed_data['formulas'] = data_with_formulas | |
| processed_data['dimensions'] = {'rows': max_row, 'columns': max_col} | |
| return processed_data | |
| except Exception as e: | |
| logger.error(f"❌ Failed to process worksheet {sheet_name}: {e}") | |
| return {"error": str(e), "data": [], "columns": []} | |
| def _process_dataframe(self, df: pd.DataFrame, sheet_name: str) -> Dict[str, Any]: | |
| """Process pandas DataFrame and extract metadata.""" | |
| try: | |
| # Clean the dataframe | |
| df_clean = df.copy() | |
| # Detect header row | |
| header_row = self._detect_header_row(df_clean) | |
| if header_row > 0: | |
| # Set proper headers | |
| df_clean.columns = df_clean.iloc[header_row] | |
| df_clean = df_clean.iloc[header_row + 1:].reset_index(drop=True) | |
| # Clean column names | |
| df_clean.columns = [str(col).strip() if pd.notna(col) else f"Column_{i}" | |
| for i, col in enumerate(df_clean.columns)] | |
| # Detect and convert data types | |
| df_clean = self._detect_and_convert_types(df_clean) | |
| # Generate summary statistics | |
| summary_stats = self._generate_summary_stats(df_clean) | |
| # Detect categories (for food vs drinks analysis) | |
| categories = self._detect_categories(df_clean) | |
| return { | |
| "data": df_clean.to_dict('records'), | |
| "dataframe": df_clean, | |
| "columns": list(df_clean.columns), | |
| "shape": df_clean.shape, | |
| "dtypes": df_clean.dtypes.to_dict(), | |
| "summary_stats": summary_stats, | |
| "categories": categories, | |
| "header_row": header_row, | |
| "sheet_name": sheet_name | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Failed to process dataframe for {sheet_name}: {e}") | |
| return {"error": str(e), "data": [], "columns": []} | |
| def _detect_header_row(self, df: pd.DataFrame) -> int: | |
| """Detect which row contains the headers.""" | |
| for i in range(min(5, len(df))): # Check first 5 rows | |
| row = df.iloc[i] | |
| # Check if row has mostly string values (likely headers) | |
| string_count = sum(1 for val in row if isinstance(val, str) and val.strip()) | |
| if string_count > len(row) * 0.6: # 60% strings | |
| return i | |
| return 0 | |
| def _detect_and_convert_types(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """Detect and convert appropriate data types.""" | |
| df_converted = df.copy() | |
| for col in df_converted.columns: | |
| # Try to convert to numeric | |
| try: | |
| # Remove currency symbols and commas | |
| if df_converted[col].dtype == 'object': | |
| cleaned_series = df_converted[col].astype(str).str.replace(r'[$,€£¥]', '', regex=True) | |
| cleaned_series = cleaned_series.str.replace(r'[^\d.-]', '', regex=True) | |
| # Try to convert to numeric | |
| numeric_series = pd.to_numeric(cleaned_series, errors='coerce') | |
| # If most values are numeric, use numeric type | |
| if numeric_series.notna().sum() > len(numeric_series) * 0.7: | |
| df_converted[col] = numeric_series | |
| except Exception: | |
| pass # Keep original type | |
| return df_converted | |
| def _generate_summary_stats(self, df: pd.DataFrame) -> Dict[str, Any]: | |
| """Generate summary statistics for the dataframe.""" | |
| try: | |
| stats = { | |
| "row_count": len(df), | |
| "column_count": len(df.columns), | |
| "numeric_columns": [], | |
| "text_columns": [], | |
| "missing_values": df.isnull().sum().to_dict() | |
| } | |
| for col in df.columns: | |
| if pd.api.types.is_numeric_dtype(df[col]): | |
| stats["numeric_columns"].append({ | |
| "name": col, | |
| "min": float(df[col].min()) if pd.notna(df[col].min()) else None, | |
| "max": float(df[col].max()) if pd.notna(df[col].max()) else None, | |
| "mean": float(df[col].mean()) if pd.notna(df[col].mean()) else None, | |
| "sum": float(df[col].sum()) if pd.notna(df[col].sum()) else None | |
| }) | |
| else: | |
| stats["text_columns"].append({ | |
| "name": col, | |
| "unique_values": int(df[col].nunique()), | |
| "most_common": str(df[col].mode().iloc[0]) if len(df[col].mode()) > 0 else None | |
| }) | |
| return stats | |
| except Exception as e: | |
| logger.error(f"❌ Failed to generate summary stats: {e}") | |
| return {} | |
| def _detect_categories(self, df: pd.DataFrame) -> Dict[str, List[str]]: | |
| """Detect potential categories in the data (e.g., food vs drinks).""" | |
| categories = {} | |
| try: | |
| # Look for columns that might contain categories | |
| for col in df.columns: | |
| if df[col].dtype == 'object': | |
| unique_values = df[col].dropna().unique() | |
| # Check for food/drink related categories | |
| food_keywords = ['food', 'burger', 'sandwich', 'pizza', 'salad', 'fries', 'chicken', 'beef'] | |
| drink_keywords = ['drink', 'soda', 'coffee', 'tea', 'juice', 'water', 'beer', 'wine'] | |
| food_items = [] | |
| drink_items = [] | |
| for value in unique_values: | |
| value_str = str(value).lower() | |
| if any(keyword in value_str for keyword in food_keywords): | |
| food_items.append(str(value)) | |
| elif any(keyword in value_str for keyword in drink_keywords): | |
| drink_items.append(str(value)) | |
| if food_items or drink_items: | |
| categories[col] = { | |
| "food": food_items, | |
| "drinks": drink_items, | |
| "other": [str(v) for v in unique_values if str(v) not in food_items + drink_items] | |
| } | |
| return categories | |
| except Exception as e: | |
| logger.error(f"❌ Failed to detect categories: {e}") | |
| return {} | |
| def analyze_sales_data(self, category_filter: str = None, exclude_categories: List[str] = None) -> Dict[str, Any]: | |
| """ | |
| Analyze sales data with category filtering. | |
| Args: | |
| category_filter: Category to include (e.g., 'food') | |
| exclude_categories: Categories to exclude (e.g., ['drinks']) | |
| Returns: | |
| Analysis results with totals and breakdowns | |
| """ | |
| try: | |
| if not self.sheets_data: | |
| return {"error": "No data loaded"} | |
| results = {} | |
| total_sales = 0 | |
| for sheet_name, sheet_data in self.sheets_data.items(): | |
| if "error" in sheet_data: | |
| continue | |
| df = sheet_data.get("dataframe") | |
| if df is None or df.empty: | |
| continue | |
| # Find sales/amount columns | |
| sales_columns = self._find_sales_columns(df) | |
| category_columns = self._find_category_columns(df) | |
| sheet_total = 0 | |
| filtered_data = df.copy() | |
| # Apply category filtering | |
| if category_filter or exclude_categories: | |
| filtered_data = self._apply_category_filter( | |
| df, category_columns, category_filter, exclude_categories | |
| ) | |
| # Calculate totals for each sales column | |
| for sales_col in sales_columns: | |
| if sales_col in filtered_data.columns: | |
| col_total = filtered_data[sales_col].sum() | |
| if pd.notna(col_total): | |
| sheet_total += col_total | |
| results[sheet_name] = { | |
| "total": sheet_total, | |
| "sales_columns": sales_columns, | |
| "category_columns": category_columns, | |
| "filtered_rows": len(filtered_data), | |
| "original_rows": len(df) | |
| } | |
| total_sales += sheet_total | |
| # Format final result | |
| formatted_total = self._format_currency(total_sales) | |
| return { | |
| "total_sales": total_sales, | |
| "formatted_total": formatted_total, | |
| "sheet_results": results, | |
| "success": True | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Failed to analyze sales data: {e}") | |
| return {"error": str(e)} | |
| def _find_sales_columns(self, df: pd.DataFrame) -> List[str]: | |
| """Find columns that likely contain sales/amount data.""" | |
| sales_keywords = ['sales', 'amount', 'total', 'price', 'cost', 'revenue', 'value'] | |
| sales_columns = [] | |
| for col in df.columns: | |
| col_lower = str(col).lower() | |
| if any(keyword in col_lower for keyword in sales_keywords): | |
| # Check if column contains numeric data | |
| if pd.api.types.is_numeric_dtype(df[col]): | |
| sales_columns.append(col) | |
| # If no obvious sales columns, look for numeric columns with currency-like values | |
| if not sales_columns: | |
| for col in df.columns: | |
| if pd.api.types.is_numeric_dtype(df[col]): | |
| # Check if values look like currency (positive numbers, reasonable range) | |
| values = df[col].dropna() | |
| if len(values) > 0 and values.min() >= 0 and values.max() < 1000000: | |
| sales_columns.append(col) | |
| return sales_columns | |
| def _find_category_columns(self, df: pd.DataFrame) -> List[str]: | |
| """Find columns that likely contain category data.""" | |
| category_keywords = ['category', 'type', 'item', 'product', 'name', 'description'] | |
| category_columns = [] | |
| for col in df.columns: | |
| col_lower = str(col).lower() | |
| if any(keyword in col_lower for keyword in category_keywords): | |
| if df[col].dtype == 'object': # Text column | |
| category_columns.append(col) | |
| return category_columns | |
| def _apply_category_filter(self, df: pd.DataFrame, category_columns: List[str], | |
| include_category: str = None, exclude_categories: List[str] = None) -> pd.DataFrame: | |
| """Apply category filtering to dataframe.""" | |
| filtered_df = df.copy() | |
| try: | |
| for col in category_columns: | |
| if col not in df.columns: | |
| continue | |
| mask = pd.Series([True] * len(df)) | |
| # Apply include filter | |
| if include_category: | |
| include_mask = df[col].astype(str).str.lower().str.contains( | |
| include_category.lower(), na=False | |
| ) | |
| mask = mask & include_mask | |
| # Apply exclude filter | |
| if exclude_categories: | |
| for exclude_cat in exclude_categories: | |
| exclude_mask = ~df[col].astype(str).str.lower().str.contains( | |
| exclude_cat.lower(), na=False | |
| ) | |
| mask = mask & exclude_mask | |
| filtered_df = filtered_df[mask] | |
| return filtered_df | |
| except Exception as e: | |
| logger.error(f"❌ Failed to apply category filter: {e}") | |
| return df | |
| def _format_currency(self, amount: float, currency: str = "USD", decimal_places: int = 2) -> str: | |
| """Format amount as currency with specified decimal places.""" | |
| try: | |
| # Round to specified decimal places | |
| rounded_amount = Decimal(str(amount)).quantize( | |
| Decimal('0.' + '0' * decimal_places), | |
| rounding=ROUND_HALF_UP | |
| ) | |
| if currency.upper() == "USD": | |
| return f"${rounded_amount:.{decimal_places}f}" | |
| else: | |
| return f"{rounded_amount:.{decimal_places}f} {currency}" | |
| except Exception as e: | |
| logger.error(f"❌ Failed to format currency: {e}") | |
| return f"{amount:.{decimal_places}f}" | |
| def get_sheet_summary(self) -> Dict[str, Any]: | |
| """Get summary of all loaded sheets.""" | |
| if not self.sheets_data: | |
| return {"error": "No data loaded"} | |
| summary = { | |
| "total_sheets": len(self.sheets_data), | |
| "sheet_names": list(self.sheets_data.keys()), | |
| "sheets": {} | |
| } | |
| for sheet_name, sheet_data in self.sheets_data.items(): | |
| if "error" not in sheet_data: | |
| summary["sheets"][sheet_name] = { | |
| "rows": sheet_data.get("shape", [0, 0])[0], | |
| "columns": sheet_data.get("shape", [0, 0])[1], | |
| "column_names": sheet_data.get("columns", []), | |
| "has_numeric_data": len(sheet_data.get("summary_stats", {}).get("numeric_columns", [])) > 0 | |
| } | |
| return summary | |
| def get_excel_processor_tools() -> List[Any]: | |
| """Get Excel processor tools for AGNO integration.""" | |
| from .base_tool import BaseTool | |
| class ExcelProcessorTool(BaseTool): | |
| """Excel processing tool for GAIA agent.""" | |
| def __init__(self): | |
| super().__init__( | |
| name="excel_processor", | |
| description="Process and analyze Excel files for data analysis tasks" | |
| ) | |
| self.processor = ExcelProcessor() | |
| def execute(self, file_path: str, analysis_type: str = "sales", | |
| category_filter: str = None, exclude_categories: List[str] = None) -> Dict[str, Any]: | |
| """Execute Excel processing and analysis.""" | |
| try: | |
| # Load the Excel file | |
| result = self.processor.load_excel_file(file_path) | |
| if not result.get("success"): | |
| return {"error": f"Failed to load Excel file: {result.get('error', 'Unknown error')}"} | |
| # Perform analysis based on type | |
| if analysis_type == "sales": | |
| analysis_result = self.processor.analyze_sales_data( | |
| category_filter=category_filter, | |
| exclude_categories=exclude_categories | |
| ) | |
| return analysis_result | |
| elif analysis_type == "summary": | |
| return self.processor.get_sheet_summary() | |
| else: | |
| return {"error": f"Unknown analysis type: {analysis_type}"} | |
| except Exception as e: | |
| return {"error": f"Excel processing failed: {str(e)}"} | |
| return [ExcelProcessorTool()] |