Spaces:
Running
Running
""" | |
Enhanced Excel Processing Tool for GAIA Agent - Phase 4 | |
Advanced Excel file reading, processing, and data analysis capabilities | |
Features: | |
- Multi-sheet Excel processing with openpyxl and pandas | |
- Formula evaluation and calculation | |
- Data type detection and conversion | |
- Cell range analysis and aggregation | |
- Conditional data filtering and grouping | |
- Financial calculations with currency formatting | |
""" | |
import os | |
import logging | |
import pandas as pd | |
import numpy as np | |
from typing import Dict, Any, List, Optional, Union, Tuple | |
from pathlib import Path | |
import re | |
from decimal import Decimal, ROUND_HALF_UP | |
try: | |
import openpyxl | |
from openpyxl import load_workbook | |
from openpyxl.utils import get_column_letter, column_index_from_string | |
OPENPYXL_AVAILABLE = True | |
except ImportError: | |
OPENPYXL_AVAILABLE = False | |
try: | |
import xlrd | |
XLRD_AVAILABLE = True | |
except ImportError: | |
XLRD_AVAILABLE = False | |
logger = logging.getLogger(__name__) | |
class ExcelProcessor: | |
"""Enhanced Excel processor for GAIA data analysis tasks.""" | |
def __init__(self): | |
"""Initialize the Excel processor.""" | |
self.available = OPENPYXL_AVAILABLE | |
self.workbook = None | |
self.sheets_data = {} | |
self.sheet_names = [] | |
if not self.available: | |
logger.warning("β οΈ openpyxl not available - Excel processing limited") | |
def load_excel_file(self, file_path: str) -> Dict[str, Any]: | |
""" | |
Load Excel file and return comprehensive data structure. | |
Args: | |
file_path: Path to Excel file | |
Returns: | |
Dictionary containing sheets data and metadata | |
""" | |
try: | |
file_path = Path(file_path) | |
if not file_path.exists(): | |
raise FileNotFoundError(f"Excel file not found: {file_path}") | |
# Determine file type and load accordingly | |
if file_path.suffix.lower() == '.csv': | |
return self._load_csv_file(file_path) | |
elif file_path.suffix.lower() in ['.xlsx', '.xlsm']: | |
return self._load_xlsx_file(file_path) | |
elif file_path.suffix.lower() == '.xls' and XLRD_AVAILABLE: | |
return self._load_xls_file(file_path) | |
else: | |
# Try pandas as fallback | |
return self._load_with_pandas(file_path) | |
except Exception as e: | |
logger.error(f"β Failed to load Excel file {file_path}: {e}") | |
return {"error": str(e), "sheets": {}, "metadata": {}} | |
def _load_xlsx_file(self, file_path: Path) -> Dict[str, Any]: | |
"""Load .xlsx file using openpyxl for advanced features.""" | |
if not OPENPYXL_AVAILABLE: | |
return self._load_with_pandas(file_path) | |
try: | |
# Load workbook with openpyxl for formula access | |
self.workbook = load_workbook(file_path, data_only=False) | |
workbook_data_only = load_workbook(file_path, data_only=True) | |
sheets_data = {} | |
metadata = { | |
"file_path": str(file_path), | |
"file_size": file_path.stat().st_size, | |
"sheet_count": len(self.workbook.sheetnames), | |
"sheet_names": self.workbook.sheetnames | |
} | |
for sheet_name in self.workbook.sheetnames: | |
sheet_data = self._process_worksheet( | |
self.workbook[sheet_name], | |
workbook_data_only[sheet_name], | |
sheet_name | |
) | |
sheets_data[sheet_name] = sheet_data | |
self.sheets_data = sheets_data | |
self.sheet_names = self.workbook.sheetnames | |
return { | |
"sheets": sheets_data, | |
"metadata": metadata, | |
"success": True | |
} | |
except Exception as e: | |
logger.error(f"β Failed to load XLSX file: {e}") | |
return {"error": str(e), "sheets": {}, "metadata": {}} | |
def _load_xls_file(self, file_path: Path) -> Dict[str, Any]: | |
"""Load .xls file using xlrd.""" | |
try: | |
# Use pandas for .xls files | |
return self._load_with_pandas(file_path) | |
except Exception as e: | |
logger.error(f"β Failed to load XLS file: {e}") | |
return {"error": str(e), "sheets": {}, "metadata": {}} | |
def _load_csv_file(self, file_path: Path) -> Dict[str, Any]: | |
"""Load CSV file as single sheet.""" | |
try: | |
df = pd.read_csv(file_path) | |
# Process the dataframe | |
processed_data = self._process_dataframe(df, "Sheet1") | |
metadata = { | |
"file_path": str(file_path), | |
"file_size": file_path.stat().st_size, | |
"sheet_count": 1, | |
"sheet_names": ["Sheet1"] | |
} | |
return { | |
"sheets": {"Sheet1": processed_data}, | |
"metadata": metadata, | |
"success": True | |
} | |
except Exception as e: | |
logger.error(f"β Failed to load CSV file: {e}") | |
return {"error": str(e), "sheets": {}, "metadata": {}} | |
def _load_with_pandas(self, file_path: Path) -> Dict[str, Any]: | |
"""Load Excel file using pandas as fallback.""" | |
try: | |
# Read all sheets | |
if file_path.suffix.lower() == '.csv': | |
sheets_dict = {"Sheet1": pd.read_csv(file_path)} | |
else: | |
sheets_dict = pd.read_excel(file_path, sheet_name=None) | |
sheets_data = {} | |
for sheet_name, df in sheets_dict.items(): | |
sheets_data[sheet_name] = self._process_dataframe(df, sheet_name) | |
metadata = { | |
"file_path": str(file_path), | |
"file_size": file_path.stat().st_size, | |
"sheet_count": len(sheets_dict), | |
"sheet_names": list(sheets_dict.keys()) | |
} | |
return { | |
"sheets": sheets_data, | |
"metadata": metadata, | |
"success": True | |
} | |
except Exception as e: | |
logger.error(f"β Failed to load with pandas: {e}") | |
return {"error": str(e), "sheets": {}, "metadata": {}} | |
def _process_worksheet(self, worksheet, worksheet_data_only, sheet_name: str) -> Dict[str, Any]: | |
"""Process individual worksheet with openpyxl.""" | |
try: | |
# Get dimensions | |
max_row = worksheet.max_row | |
max_col = worksheet.max_column | |
# Extract data with formulas and values | |
data_with_formulas = [] | |
data_values_only = [] | |
for row in range(1, max_row + 1): | |
row_formulas = [] | |
row_values = [] | |
for col in range(1, max_col + 1): | |
# Get cell with formula | |
cell_formula = worksheet.cell(row=row, column=col) | |
# Get cell with calculated value | |
cell_value = worksheet_data_only.cell(row=row, column=col) | |
row_formulas.append({ | |
'value': cell_formula.value, | |
'formula': cell_formula.value if isinstance(cell_formula.value, str) and cell_formula.value.startswith('=') else None, | |
'data_type': str(type(cell_formula.value).__name__) | |
}) | |
row_values.append(cell_value.value) | |
data_with_formulas.append(row_formulas) | |
data_values_only.append(row_values) | |
# Convert to DataFrame for easier analysis | |
df = pd.DataFrame(data_values_only) | |
# Process the dataframe | |
processed_data = self._process_dataframe(df, sheet_name) | |
# Add formula information | |
processed_data['formulas'] = data_with_formulas | |
processed_data['dimensions'] = {'rows': max_row, 'columns': max_col} | |
return processed_data | |
except Exception as e: | |
logger.error(f"β Failed to process worksheet {sheet_name}: {e}") | |
return {"error": str(e), "data": [], "columns": []} | |
def _process_dataframe(self, df: pd.DataFrame, sheet_name: str) -> Dict[str, Any]: | |
"""Process pandas DataFrame and extract metadata.""" | |
try: | |
# Clean the dataframe | |
df_clean = df.copy() | |
# Detect header row | |
header_row = self._detect_header_row(df_clean) | |
if header_row > 0: | |
# Set proper headers | |
df_clean.columns = df_clean.iloc[header_row] | |
df_clean = df_clean.iloc[header_row + 1:].reset_index(drop=True) | |
# Clean column names | |
df_clean.columns = [str(col).strip() if pd.notna(col) else f"Column_{i}" | |
for i, col in enumerate(df_clean.columns)] | |
# Detect and convert data types | |
df_clean = self._detect_and_convert_types(df_clean) | |
# Generate summary statistics | |
summary_stats = self._generate_summary_stats(df_clean) | |
# Detect categories (for food vs drinks analysis) | |
categories = self._detect_categories(df_clean) | |
return { | |
"data": df_clean.to_dict('records'), | |
"dataframe": df_clean, | |
"columns": list(df_clean.columns), | |
"shape": df_clean.shape, | |
"dtypes": df_clean.dtypes.to_dict(), | |
"summary_stats": summary_stats, | |
"categories": categories, | |
"header_row": header_row, | |
"sheet_name": sheet_name | |
} | |
except Exception as e: | |
logger.error(f"β Failed to process dataframe for {sheet_name}: {e}") | |
return {"error": str(e), "data": [], "columns": []} | |
def _detect_header_row(self, df: pd.DataFrame) -> int: | |
"""Detect which row contains the headers.""" | |
for i in range(min(5, len(df))): # Check first 5 rows | |
row = df.iloc[i] | |
# Check if row has mostly string values (likely headers) | |
string_count = sum(1 for val in row if isinstance(val, str) and val.strip()) | |
if string_count > len(row) * 0.6: # 60% strings | |
return i | |
return 0 | |
def _detect_and_convert_types(self, df: pd.DataFrame) -> pd.DataFrame: | |
"""Detect and convert appropriate data types.""" | |
df_converted = df.copy() | |
for col in df_converted.columns: | |
# Try to convert to numeric | |
try: | |
# Remove currency symbols and commas | |
if df_converted[col].dtype == 'object': | |
cleaned_series = df_converted[col].astype(str).str.replace(r'[$,β¬Β£Β₯]', '', regex=True) | |
cleaned_series = cleaned_series.str.replace(r'[^\d.-]', '', regex=True) | |
# Try to convert to numeric | |
numeric_series = pd.to_numeric(cleaned_series, errors='coerce') | |
# If most values are numeric, use numeric type | |
if numeric_series.notna().sum() > len(numeric_series) * 0.7: | |
df_converted[col] = numeric_series | |
except Exception: | |
pass # Keep original type | |
return df_converted | |
def _generate_summary_stats(self, df: pd.DataFrame) -> Dict[str, Any]: | |
"""Generate summary statistics for the dataframe.""" | |
try: | |
stats = { | |
"row_count": len(df), | |
"column_count": len(df.columns), | |
"numeric_columns": [], | |
"text_columns": [], | |
"missing_values": df.isnull().sum().to_dict() | |
} | |
for col in df.columns: | |
if pd.api.types.is_numeric_dtype(df[col]): | |
stats["numeric_columns"].append({ | |
"name": col, | |
"min": float(df[col].min()) if pd.notna(df[col].min()) else None, | |
"max": float(df[col].max()) if pd.notna(df[col].max()) else None, | |
"mean": float(df[col].mean()) if pd.notna(df[col].mean()) else None, | |
"sum": float(df[col].sum()) if pd.notna(df[col].sum()) else None | |
}) | |
else: | |
stats["text_columns"].append({ | |
"name": col, | |
"unique_values": int(df[col].nunique()), | |
"most_common": str(df[col].mode().iloc[0]) if len(df[col].mode()) > 0 else None | |
}) | |
return stats | |
except Exception as e: | |
logger.error(f"β Failed to generate summary stats: {e}") | |
return {} | |
def _detect_categories(self, df: pd.DataFrame) -> Dict[str, List[str]]: | |
"""Detect potential categories in the data (e.g., food vs drinks).""" | |
categories = {} | |
try: | |
# Look for columns that might contain categories | |
for col in df.columns: | |
if df[col].dtype == 'object': | |
unique_values = df[col].dropna().unique() | |
# Check for food/drink related categories | |
food_keywords = ['food', 'burger', 'sandwich', 'pizza', 'salad', 'fries', 'chicken', 'beef'] | |
drink_keywords = ['drink', 'soda', 'coffee', 'tea', 'juice', 'water', 'beer', 'wine'] | |
food_items = [] | |
drink_items = [] | |
for value in unique_values: | |
value_str = str(value).lower() | |
if any(keyword in value_str for keyword in food_keywords): | |
food_items.append(str(value)) | |
elif any(keyword in value_str for keyword in drink_keywords): | |
drink_items.append(str(value)) | |
if food_items or drink_items: | |
categories[col] = { | |
"food": food_items, | |
"drinks": drink_items, | |
"other": [str(v) for v in unique_values if str(v) not in food_items + drink_items] | |
} | |
return categories | |
except Exception as e: | |
logger.error(f"β Failed to detect categories: {e}") | |
return {} | |
def analyze_sales_data(self, category_filter: str = None, exclude_categories: List[str] = None) -> Dict[str, Any]: | |
""" | |
Analyze sales data with category filtering. | |
Args: | |
category_filter: Category to include (e.g., 'food') | |
exclude_categories: Categories to exclude (e.g., ['drinks']) | |
Returns: | |
Analysis results with totals and breakdowns | |
""" | |
try: | |
if not self.sheets_data: | |
return {"error": "No data loaded"} | |
results = {} | |
total_sales = 0 | |
for sheet_name, sheet_data in self.sheets_data.items(): | |
if "error" in sheet_data: | |
continue | |
df = sheet_data.get("dataframe") | |
if df is None or df.empty: | |
continue | |
# Find sales/amount columns | |
sales_columns = self._find_sales_columns(df) | |
category_columns = self._find_category_columns(df) | |
sheet_total = 0 | |
filtered_data = df.copy() | |
# Apply category filtering | |
if category_filter or exclude_categories: | |
filtered_data = self._apply_category_filter( | |
df, category_columns, category_filter, exclude_categories | |
) | |
# Calculate totals for each sales column | |
for sales_col in sales_columns: | |
if sales_col in filtered_data.columns: | |
col_total = filtered_data[sales_col].sum() | |
if pd.notna(col_total): | |
sheet_total += col_total | |
results[sheet_name] = { | |
"total": sheet_total, | |
"sales_columns": sales_columns, | |
"category_columns": category_columns, | |
"filtered_rows": len(filtered_data), | |
"original_rows": len(df) | |
} | |
total_sales += sheet_total | |
# Format final result | |
formatted_total = self._format_currency(total_sales) | |
return { | |
"total_sales": total_sales, | |
"formatted_total": formatted_total, | |
"sheet_results": results, | |
"success": True | |
} | |
except Exception as e: | |
logger.error(f"β Failed to analyze sales data: {e}") | |
return {"error": str(e)} | |
def _find_sales_columns(self, df: pd.DataFrame) -> List[str]: | |
"""Find columns that likely contain sales/amount data.""" | |
sales_keywords = ['sales', 'amount', 'total', 'price', 'cost', 'revenue', 'value'] | |
sales_columns = [] | |
for col in df.columns: | |
col_lower = str(col).lower() | |
if any(keyword in col_lower for keyword in sales_keywords): | |
# Check if column contains numeric data | |
if pd.api.types.is_numeric_dtype(df[col]): | |
sales_columns.append(col) | |
# If no obvious sales columns, look for numeric columns with currency-like values | |
if not sales_columns: | |
for col in df.columns: | |
if pd.api.types.is_numeric_dtype(df[col]): | |
# Check if values look like currency (positive numbers, reasonable range) | |
values = df[col].dropna() | |
if len(values) > 0 and values.min() >= 0 and values.max() < 1000000: | |
sales_columns.append(col) | |
return sales_columns | |
def _find_category_columns(self, df: pd.DataFrame) -> List[str]: | |
"""Find columns that likely contain category data.""" | |
category_keywords = ['category', 'type', 'item', 'product', 'name', 'description'] | |
category_columns = [] | |
for col in df.columns: | |
col_lower = str(col).lower() | |
if any(keyword in col_lower for keyword in category_keywords): | |
if df[col].dtype == 'object': # Text column | |
category_columns.append(col) | |
return category_columns | |
def _apply_category_filter(self, df: pd.DataFrame, category_columns: List[str], | |
include_category: str = None, exclude_categories: List[str] = None) -> pd.DataFrame: | |
"""Apply category filtering to dataframe.""" | |
filtered_df = df.copy() | |
try: | |
for col in category_columns: | |
if col not in df.columns: | |
continue | |
mask = pd.Series([True] * len(df)) | |
# Apply include filter | |
if include_category: | |
include_mask = df[col].astype(str).str.lower().str.contains( | |
include_category.lower(), na=False | |
) | |
mask = mask & include_mask | |
# Apply exclude filter | |
if exclude_categories: | |
for exclude_cat in exclude_categories: | |
exclude_mask = ~df[col].astype(str).str.lower().str.contains( | |
exclude_cat.lower(), na=False | |
) | |
mask = mask & exclude_mask | |
filtered_df = filtered_df[mask] | |
return filtered_df | |
except Exception as e: | |
logger.error(f"β Failed to apply category filter: {e}") | |
return df | |
def _format_currency(self, amount: float, currency: str = "USD", decimal_places: int = 2) -> str: | |
"""Format amount as currency with specified decimal places.""" | |
try: | |
# Round to specified decimal places | |
rounded_amount = Decimal(str(amount)).quantize( | |
Decimal('0.' + '0' * decimal_places), | |
rounding=ROUND_HALF_UP | |
) | |
if currency.upper() == "USD": | |
return f"${rounded_amount:.{decimal_places}f}" | |
else: | |
return f"{rounded_amount:.{decimal_places}f} {currency}" | |
except Exception as e: | |
logger.error(f"β Failed to format currency: {e}") | |
return f"{amount:.{decimal_places}f}" | |
def get_sheet_summary(self) -> Dict[str, Any]: | |
"""Get summary of all loaded sheets.""" | |
if not self.sheets_data: | |
return {"error": "No data loaded"} | |
summary = { | |
"total_sheets": len(self.sheets_data), | |
"sheet_names": list(self.sheets_data.keys()), | |
"sheets": {} | |
} | |
for sheet_name, sheet_data in self.sheets_data.items(): | |
if "error" not in sheet_data: | |
summary["sheets"][sheet_name] = { | |
"rows": sheet_data.get("shape", [0, 0])[0], | |
"columns": sheet_data.get("shape", [0, 0])[1], | |
"column_names": sheet_data.get("columns", []), | |
"has_numeric_data": len(sheet_data.get("summary_stats", {}).get("numeric_columns", [])) > 0 | |
} | |
return summary | |
def get_excel_processor_tools() -> List[Any]: | |
"""Get Excel processor tools for AGNO integration.""" | |
from .base_tool import BaseTool | |
class ExcelProcessorTool(BaseTool): | |
"""Excel processing tool for GAIA agent.""" | |
def __init__(self): | |
super().__init__( | |
name="excel_processor", | |
description="Process and analyze Excel files for data analysis tasks" | |
) | |
self.processor = ExcelProcessor() | |
def execute(self, file_path: str, analysis_type: str = "sales", | |
category_filter: str = None, exclude_categories: List[str] = None) -> Dict[str, Any]: | |
"""Execute Excel processing and analysis.""" | |
try: | |
# Load the Excel file | |
result = self.processor.load_excel_file(file_path) | |
if not result.get("success"): | |
return {"error": f"Failed to load Excel file: {result.get('error', 'Unknown error')}"} | |
# Perform analysis based on type | |
if analysis_type == "sales": | |
analysis_result = self.processor.analyze_sales_data( | |
category_filter=category_filter, | |
exclude_categories=exclude_categories | |
) | |
return analysis_result | |
elif analysis_type == "summary": | |
return self.processor.get_sheet_summary() | |
else: | |
return {"error": f"Unknown analysis type: {analysis_type}"} | |
except Exception as e: | |
return {"error": f"Excel processing failed: {str(e)}"} | |
return [ExcelProcessorTool()] |