Spaces:
Sleeping
Sleeping
| """ | |
| Spreadsheet Formula Evaluator for GAIA Agent - Phase 4 | |
| Excel formula parsing, evaluation, and calculation engine | |
| Features: | |
| - Excel formula parsing and evaluation | |
| - Built-in function support (SUM, AVERAGE, COUNT, etc.) | |
| - Cell reference resolution | |
| - Conditional logic evaluation | |
| - Mathematical operations on ranges | |
| - Error handling for invalid formulas | |
| """ | |
| import logging | |
| import re | |
| import pandas as pd | |
| import numpy as np | |
| from typing import Dict, Any, List, Optional, Union, Tuple | |
| from decimal import Decimal, ROUND_HALF_UP | |
| import math | |
| logger = logging.getLogger(__name__) | |
| class FormulaEvaluator: | |
| """Excel formula evaluator for GAIA data analysis tasks.""" | |
| def __init__(self): | |
| """Initialize the formula evaluator.""" | |
| self.available = True | |
| self.functions = self._init_builtin_functions() | |
| self.cell_cache = {} | |
| def _init_builtin_functions(self) -> Dict[str, callable]: | |
| """Initialize built-in Excel functions.""" | |
| return { | |
| 'SUM': self._sum, | |
| 'AVERAGE': self._average, | |
| 'AVG': self._average, # Alias | |
| 'COUNT': self._count, | |
| 'COUNTA': self._counta, | |
| 'MIN': self._min, | |
| 'MAX': self._max, | |
| 'MEDIAN': self._median, | |
| 'STDEV': self._stdev, | |
| 'VAR': self._var, | |
| 'IF': self._if, | |
| 'AND': self._and, | |
| 'OR': self._or, | |
| 'NOT': self._not, | |
| 'ROUND': self._round, | |
| 'ABS': self._abs, | |
| 'SQRT': self._sqrt, | |
| 'POWER': self._power, | |
| 'MOD': self._mod, | |
| 'CONCATENATE': self._concatenate, | |
| 'LEFT': self._left, | |
| 'RIGHT': self._right, | |
| 'MID': self._mid, | |
| 'LEN': self._len, | |
| 'UPPER': self._upper, | |
| 'LOWER': self._lower, | |
| 'TRIM': self._trim, | |
| 'SUMIF': self._sumif, | |
| 'COUNTIF': self._countif, | |
| 'AVERAGEIF': self._averageif, | |
| } | |
| def evaluate_formula(self, formula: str, data: pd.DataFrame = None, | |
| cell_references: Dict[str, Any] = None) -> Union[float, str, bool, None]: | |
| """ | |
| Evaluate an Excel formula. | |
| Args: | |
| formula: Excel formula string (with or without leading =) | |
| data: DataFrame containing the data | |
| cell_references: Dictionary of cell references and their values | |
| Returns: | |
| Evaluated result of the formula | |
| """ | |
| try: | |
| # Clean the formula | |
| formula = formula.strip() | |
| if formula.startswith('='): | |
| formula = formula[1:] | |
| if not formula: | |
| return None | |
| # Store data and cell references for function access | |
| self.current_data = data | |
| self.current_cell_refs = cell_references or {} | |
| # Parse and evaluate the formula | |
| result = self._parse_and_evaluate(formula) | |
| return result | |
| except Exception as e: | |
| logger.error(f"❌ Formula evaluation failed for '{formula}': {e}") | |
| return f"#ERROR: {str(e)}" | |
| def evaluate_cell_range(self, range_expr: str, data: pd.DataFrame) -> List[Any]: | |
| """ | |
| Evaluate a cell range expression (e.g., A1:A10, B2:D5). | |
| Args: | |
| range_expr: Range expression string | |
| data: DataFrame containing the data | |
| Returns: | |
| List of values in the range | |
| """ | |
| try: | |
| # Parse range expression | |
| if ':' in range_expr: | |
| start_cell, end_cell = range_expr.split(':') | |
| start_row, start_col = self._parse_cell_reference(start_cell) | |
| end_row, end_col = self._parse_cell_reference(end_cell) | |
| values = [] | |
| for row in range(start_row, end_row + 1): | |
| for col in range(start_col, end_col + 1): | |
| if row < len(data) and col < len(data.columns): | |
| value = data.iloc[row, col] | |
| if pd.notna(value): | |
| values.append(value) | |
| return values | |
| else: | |
| # Single cell reference | |
| row, col = self._parse_cell_reference(range_expr) | |
| if row < len(data) and col < len(data.columns): | |
| value = data.iloc[row, col] | |
| return [value] if pd.notna(value) else [] | |
| return [] | |
| except Exception as e: | |
| logger.error(f"❌ Range evaluation failed for '{range_expr}': {e}") | |
| return [] | |
| def _parse_and_evaluate(self, formula: str) -> Any: | |
| """Parse and evaluate a formula expression.""" | |
| # Handle parentheses first | |
| while '(' in formula: | |
| # Find innermost parentheses | |
| start = -1 | |
| for i, char in enumerate(formula): | |
| if char == '(': | |
| start = i | |
| elif char == ')' and start != -1: | |
| # Evaluate expression inside parentheses | |
| inner_expr = formula[start + 1:i] | |
| inner_result = self._evaluate_expression(inner_expr) | |
| # Replace with result | |
| formula = formula[:start] + str(inner_result) + formula[i + 1:] | |
| break | |
| return self._evaluate_expression(formula) | |
| def _evaluate_expression(self, expr: str) -> Any: | |
| """Evaluate a simple expression without parentheses.""" | |
| expr = expr.strip() | |
| # Check if it's a function call | |
| func_match = re.match(r'([A-Z]+)\((.*)\)', expr, re.IGNORECASE) | |
| if func_match: | |
| func_name = func_match.group(1).upper() | |
| args_str = func_match.group(2) | |
| return self._evaluate_function(func_name, args_str) | |
| # Check if it's a cell reference | |
| if re.match(r'^[A-Z]+\d+$', expr, re.IGNORECASE): | |
| return self._get_cell_value(expr) | |
| # Check if it's a range reference | |
| if ':' in expr and re.match(r'^[A-Z]+\d+:[A-Z]+\d+$', expr, re.IGNORECASE): | |
| return self.evaluate_cell_range(expr, self.current_data) | |
| # Check for arithmetic operations | |
| for op in ['+', '-', '*', '/', '^', '=', '<>', '>', '<', '>=', '<=']: | |
| if op in expr: | |
| return self._evaluate_arithmetic(expr, op) | |
| # Try to convert to number | |
| try: | |
| if '.' in expr: | |
| return float(expr) | |
| else: | |
| return int(expr) | |
| except ValueError: | |
| pass | |
| # Return as string if nothing else works | |
| return expr.strip('"\'') | |
| def _evaluate_function(self, func_name: str, args_str: str) -> Any: | |
| """Evaluate a function call.""" | |
| if func_name not in self.functions: | |
| raise ValueError(f"Unknown function: {func_name}") | |
| # Parse arguments | |
| args = self._parse_function_args(args_str) | |
| # Evaluate each argument | |
| evaluated_args = [] | |
| for arg in args: | |
| if isinstance(arg, str): | |
| evaluated_args.append(self._evaluate_expression(arg)) | |
| else: | |
| evaluated_args.append(arg) | |
| # Call the function | |
| return self.functions[func_name](*evaluated_args) | |
| def _parse_function_args(self, args_str: str) -> List[str]: | |
| """Parse function arguments, handling nested functions and ranges.""" | |
| if not args_str.strip(): | |
| return [] | |
| args = [] | |
| current_arg = "" | |
| paren_depth = 0 | |
| in_quotes = False | |
| quote_char = None | |
| for char in args_str: | |
| if char in ['"', "'"] and not in_quotes: | |
| in_quotes = True | |
| quote_char = char | |
| current_arg += char | |
| elif char == quote_char and in_quotes: | |
| in_quotes = False | |
| quote_char = None | |
| current_arg += char | |
| elif char == '(' and not in_quotes: | |
| paren_depth += 1 | |
| current_arg += char | |
| elif char == ')' and not in_quotes: | |
| paren_depth -= 1 | |
| current_arg += char | |
| elif char == ',' and paren_depth == 0 and not in_quotes: | |
| args.append(current_arg.strip()) | |
| current_arg = "" | |
| else: | |
| current_arg += char | |
| if current_arg.strip(): | |
| args.append(current_arg.strip()) | |
| return args | |
| def _evaluate_arithmetic(self, expr: str, operator: str) -> Any: | |
| """Evaluate arithmetic expressions.""" | |
| parts = expr.split(operator, 1) | |
| if len(parts) != 2: | |
| raise ValueError(f"Invalid arithmetic expression: {expr}") | |
| left = self._evaluate_expression(parts[0].strip()) | |
| right = self._evaluate_expression(parts[1].strip()) | |
| # Convert to numbers if possible | |
| try: | |
| left_num = float(left) if not isinstance(left, (int, float)) else left | |
| right_num = float(right) if not isinstance(right, (int, float)) else right | |
| except (ValueError, TypeError): | |
| left_num, right_num = left, right | |
| # Perform operation | |
| if operator == '+': | |
| return left_num + right_num | |
| elif operator == '-': | |
| return left_num - right_num | |
| elif operator == '*': | |
| return left_num * right_num | |
| elif operator == '/': | |
| if right_num == 0: | |
| return "#DIV/0!" | |
| return left_num / right_num | |
| elif operator == '^': | |
| return left_num ** right_num | |
| elif operator == '=': | |
| return left == right | |
| elif operator == '<>': | |
| return left != right | |
| elif operator == '>': | |
| return left_num > right_num | |
| elif operator == '<': | |
| return left_num < right_num | |
| elif operator == '>=': | |
| return left_num >= right_num | |
| elif operator == '<=': | |
| return left_num <= right_num | |
| else: | |
| raise ValueError(f"Unknown operator: {operator}") | |
| def _get_cell_value(self, cell_ref: str) -> Any: | |
| """Get value from cell reference.""" | |
| if cell_ref in self.current_cell_refs: | |
| return self.current_cell_refs[cell_ref] | |
| if self.current_data is not None: | |
| try: | |
| row, col = self._parse_cell_reference(cell_ref) | |
| if row < len(self.current_data) and col < len(self.current_data.columns): | |
| return self.current_data.iloc[row, col] | |
| except Exception: | |
| pass | |
| return 0 # Default value for missing cells | |
| def _parse_cell_reference(self, cell_ref: str) -> Tuple[int, int]: | |
| """Parse cell reference (e.g., A1, B10) to row and column indices.""" | |
| match = re.match(r'^([A-Z]+)(\d+)$', cell_ref.upper()) | |
| if not match: | |
| raise ValueError(f"Invalid cell reference: {cell_ref}") | |
| col_letters = match.group(1) | |
| row_num = int(match.group(2)) | |
| # Convert column letters to index (A=0, B=1, ..., Z=25, AA=26, etc.) | |
| col_index = 0 | |
| for char in col_letters: | |
| col_index = col_index * 26 + (ord(char) - ord('A') + 1) | |
| col_index -= 1 # Convert to 0-based index | |
| row_index = row_num - 1 # Convert to 0-based index | |
| return row_index, col_index | |
| # Built-in function implementations | |
| def _sum(self, *args) -> float: | |
| """SUM function implementation.""" | |
| total = 0 | |
| for arg in args: | |
| if isinstance(arg, list): | |
| total += sum(self._to_number(x) for x in arg if self._is_number(x)) | |
| elif self._is_number(arg): | |
| total += self._to_number(arg) | |
| return total | |
| def _average(self, *args) -> float: | |
| """AVERAGE function implementation.""" | |
| values = [] | |
| for arg in args: | |
| if isinstance(arg, list): | |
| values.extend([self._to_number(x) for x in arg if self._is_number(x)]) | |
| elif self._is_number(arg): | |
| values.append(self._to_number(arg)) | |
| return sum(values) / len(values) if values else 0 | |
| def _count(self, *args) -> int: | |
| """COUNT function implementation (counts numeric values).""" | |
| count = 0 | |
| for arg in args: | |
| if isinstance(arg, list): | |
| count += sum(1 for x in arg if self._is_number(x)) | |
| elif self._is_number(arg): | |
| count += 1 | |
| return count | |
| def _counta(self, *args) -> int: | |
| """COUNTA function implementation (counts non-empty values).""" | |
| count = 0 | |
| for arg in args: | |
| if isinstance(arg, list): | |
| count += sum(1 for x in arg if x is not None and str(x).strip() != '') | |
| elif arg is not None and str(arg).strip() != '': | |
| count += 1 | |
| return count | |
| def _min(self, *args) -> float: | |
| """MIN function implementation.""" | |
| values = [] | |
| for arg in args: | |
| if isinstance(arg, list): | |
| values.extend([self._to_number(x) for x in arg if self._is_number(x)]) | |
| elif self._is_number(arg): | |
| values.append(self._to_number(arg)) | |
| return min(values) if values else 0 | |
| def _max(self, *args) -> float: | |
| """MAX function implementation.""" | |
| values = [] | |
| for arg in args: | |
| if isinstance(arg, list): | |
| values.extend([self._to_number(x) for x in arg if self._is_number(x)]) | |
| elif self._is_number(arg): | |
| values.append(self._to_number(arg)) | |
| return max(values) if values else 0 | |
| def _median(self, *args) -> float: | |
| """MEDIAN function implementation.""" | |
| values = [] | |
| for arg in args: | |
| if isinstance(arg, list): | |
| values.extend([self._to_number(x) for x in arg if self._is_number(x)]) | |
| elif self._is_number(arg): | |
| values.append(self._to_number(arg)) | |
| if not values: | |
| return 0 | |
| sorted_values = sorted(values) | |
| n = len(sorted_values) | |
| if n % 2 == 0: | |
| return (sorted_values[n//2 - 1] + sorted_values[n//2]) / 2 | |
| else: | |
| return sorted_values[n//2] | |
| def _stdev(self, *args) -> float: | |
| """STDEV function implementation.""" | |
| values = [] | |
| for arg in args: | |
| if isinstance(arg, list): | |
| values.extend([self._to_number(x) for x in arg if self._is_number(x)]) | |
| elif self._is_number(arg): | |
| values.append(self._to_number(arg)) | |
| if len(values) < 2: | |
| return 0 | |
| mean = sum(values) / len(values) | |
| variance = sum((x - mean) ** 2 for x in values) / (len(values) - 1) | |
| return math.sqrt(variance) | |
| def _var(self, *args) -> float: | |
| """VAR function implementation.""" | |
| values = [] | |
| for arg in args: | |
| if isinstance(arg, list): | |
| values.extend([self._to_number(x) for x in arg if self._is_number(x)]) | |
| elif self._is_number(arg): | |
| values.append(self._to_number(arg)) | |
| if len(values) < 2: | |
| return 0 | |
| mean = sum(values) / len(values) | |
| return sum((x - mean) ** 2 for x in values) / (len(values) - 1) | |
| def _if(self, condition, true_value, false_value) -> Any: | |
| """IF function implementation.""" | |
| if self._to_boolean(condition): | |
| return true_value | |
| else: | |
| return false_value | |
| def _and(self, *args) -> bool: | |
| """AND function implementation.""" | |
| return all(self._to_boolean(arg) for arg in args) | |
| def _or(self, *args) -> bool: | |
| """OR function implementation.""" | |
| return any(self._to_boolean(arg) for arg in args) | |
| def _not(self, value) -> bool: | |
| """NOT function implementation.""" | |
| return not self._to_boolean(value) | |
| def _round(self, number, digits=0) -> float: | |
| """ROUND function implementation.""" | |
| return round(self._to_number(number), int(digits)) | |
| def _abs(self, number) -> float: | |
| """ABS function implementation.""" | |
| return abs(self._to_number(number)) | |
| def _sqrt(self, number) -> float: | |
| """SQRT function implementation.""" | |
| num = self._to_number(number) | |
| if num < 0: | |
| return "#NUM!" | |
| return math.sqrt(num) | |
| def _power(self, number, power) -> float: | |
| """POWER function implementation.""" | |
| return self._to_number(number) ** self._to_number(power) | |
| def _mod(self, number, divisor) -> float: | |
| """MOD function implementation.""" | |
| return self._to_number(number) % self._to_number(divisor) | |
| def _concatenate(self, *args) -> str: | |
| """CONCATENATE function implementation.""" | |
| return ''.join(str(arg) for arg in args) | |
| def _left(self, text, num_chars) -> str: | |
| """LEFT function implementation.""" | |
| return str(text)[:int(num_chars)] | |
| def _right(self, text, num_chars) -> str: | |
| """RIGHT function implementation.""" | |
| return str(text)[-int(num_chars):] | |
| def _mid(self, text, start_num, num_chars) -> str: | |
| """MID function implementation.""" | |
| start = int(start_num) - 1 # Excel uses 1-based indexing | |
| return str(text)[start:start + int(num_chars)] | |
| def _len(self, text) -> int: | |
| """LEN function implementation.""" | |
| return len(str(text)) | |
| def _upper(self, text) -> str: | |
| """UPPER function implementation.""" | |
| return str(text).upper() | |
| def _lower(self, text) -> str: | |
| """LOWER function implementation.""" | |
| return str(text).lower() | |
| def _trim(self, text) -> str: | |
| """TRIM function implementation.""" | |
| return str(text).strip() | |
| def _sumif(self, range_arg, criteria, sum_range=None) -> float: | |
| """SUMIF function implementation.""" | |
| # This is a simplified implementation | |
| # In a full implementation, you'd need to handle the range and criteria properly | |
| if sum_range is None: | |
| sum_range = range_arg | |
| if isinstance(range_arg, list) and isinstance(sum_range, list): | |
| total = 0 | |
| for i, value in enumerate(range_arg): | |
| if i < len(sum_range) and self._meets_criteria(value, criteria): | |
| if self._is_number(sum_range[i]): | |
| total += self._to_number(sum_range[i]) | |
| return total | |
| return 0 | |
| def _countif(self, range_arg, criteria) -> int: | |
| """COUNTIF function implementation.""" | |
| if isinstance(range_arg, list): | |
| return sum(1 for value in range_arg if self._meets_criteria(value, criteria)) | |
| return 0 | |
| def _averageif(self, range_arg, criteria, average_range=None) -> float: | |
| """AVERAGEIF function implementation.""" | |
| if average_range is None: | |
| average_range = range_arg | |
| if isinstance(range_arg, list) and isinstance(average_range, list): | |
| values = [] | |
| for i, value in enumerate(range_arg): | |
| if i < len(average_range) and self._meets_criteria(value, criteria): | |
| if self._is_number(average_range[i]): | |
| values.append(self._to_number(average_range[i])) | |
| return sum(values) / len(values) if values else 0 | |
| return 0 | |
| def _meets_criteria(self, value, criteria) -> bool: | |
| """Check if value meets the given criteria.""" | |
| criteria_str = str(criteria) | |
| value_str = str(value) | |
| # Handle comparison operators | |
| if criteria_str.startswith('>='): | |
| return self._to_number(value) >= self._to_number(criteria_str[2:]) | |
| elif criteria_str.startswith('<='): | |
| return self._to_number(value) <= self._to_number(criteria_str[2:]) | |
| elif criteria_str.startswith('<>'): | |
| return value_str != criteria_str[2:] | |
| elif criteria_str.startswith('>'): | |
| return self._to_number(value) > self._to_number(criteria_str[1:]) | |
| elif criteria_str.startswith('<'): | |
| return self._to_number(value) < self._to_number(criteria_str[1:]) | |
| elif criteria_str.startswith('='): | |
| return value_str == criteria_str[1:] | |
| else: | |
| # Exact match or wildcard | |
| if '*' in criteria_str or '?' in criteria_str: | |
| # Simple wildcard matching | |
| pattern = criteria_str.replace('*', '.*').replace('?', '.') | |
| return re.match(pattern, value_str, re.IGNORECASE) is not None | |
| else: | |
| return value_str == criteria_str | |
| def _is_number(self, value) -> bool: | |
| """Check if value is a number.""" | |
| try: | |
| float(value) | |
| return True | |
| except (ValueError, TypeError): | |
| return False | |
| def _to_number(self, value) -> float: | |
| """Convert value to number.""" | |
| if isinstance(value, (int, float)): | |
| return float(value) | |
| try: | |
| return float(value) | |
| except (ValueError, TypeError): | |
| return 0 | |
| def _to_boolean(self, value) -> bool: | |
| """Convert value to boolean.""" | |
| if isinstance(value, bool): | |
| return value | |
| if isinstance(value, (int, float)): | |
| return value != 0 | |
| if isinstance(value, str): | |
| return value.lower() in ['true', '1', 'yes'] | |
| return bool(value) | |
| def get_formula_evaluator_tools() -> List[Any]: | |
| """Get formula evaluator tools for AGNO integration.""" | |
| from .base_tool import BaseTool | |
| class FormulaEvaluatorTool(BaseTool): | |
| """Formula evaluator tool for GAIA agent.""" | |
| def __init__(self): | |
| super().__init__( | |
| name="formula_evaluator", | |
| description="Evaluate Excel formulas and mathematical expressions" | |
| ) | |
| self.evaluator = FormulaEvaluator() | |
| def execute(self, formula: str, data: pd.DataFrame = None, | |
| cell_references: Dict[str, Any] = None) -> Dict[str, Any]: | |
| """Execute formula evaluation.""" | |
| try: | |
| result = self.evaluator.evaluate_formula(formula, data, cell_references) | |
| return { | |
| "formula": formula, | |
| "result": result, | |
| "success": True | |
| } | |
| except Exception as e: | |
| return { | |
| "formula": formula, | |
| "error": f"Formula evaluation failed: {str(e)}", | |
| "success": False | |
| } | |
| return [FormulaEvaluatorTool()] |