""" Table Data Processor Component This module provides capabilities for parsing, analyzing, and extracting information from tabular data in various formats (markdown tables, CSV, TSV, etc.) """ import re import logging import csv import io from typing import List, Dict, Any, Tuple, Optional, Set, Union import traceback import pandas as pd import numpy as np from collections import defaultdict logger = logging.getLogger("gaia_agent.components.table_processor") class TableProcessor: """ Handles parsing, analysis, and operations on tabular data structures. Provides capabilities for answering questions about table data. """ def __init__(self): """Initialize the table processor component.""" self.supported_formats = { 'markdown': self._parse_markdown_table, 'csv': self._parse_csv, 'tsv': self._parse_tsv, 'plain': self._parse_plain_table } logger.info("TableProcessor initialized") def process_table_data(self, table_text: str, format_hint: str = None) -> Dict[str, Any]: """ Process tabular data from a text representation. Args: table_text: Text containing the table data format_hint: Optional hint about the table format ('markdown', 'csv', 'tsv', 'plain') Returns: Dict containing parsed table information: - headers: List of column headers - rows: List of rows (each row is a list of values) - data_types: Dict mapping column names to data types - dimensions: Tuple of (rows, columns) - format: Detected format of the table - success: Boolean indicating successful parsing - error: Error message if parsing failed """ result = { 'headers': [], 'rows': [], 'data_types': {}, 'dimensions': (0, 0), 'format': None, 'success': False, 'error': None } try: # If format is provided, try that first if format_hint and format_hint in self.supported_formats: try: headers, rows = self.supported_formats[format_hint](table_text) result['format'] = format_hint result['success'] = True except Exception as e: logger.warning(f"Failed to parse as {format_hint}, trying auto-detection: {str(e)}") format_hint = None # If no format hint or the hinted format failed, auto-detect if not format_hint or not result['success']: detected_format = self._detect_table_format(table_text) if detected_format in self.supported_formats: headers, rows = self.supported_formats[detected_format](table_text) result['format'] = detected_format result['success'] = True else: # If format detection failed, try all formats for fmt, parser in self.supported_formats.items(): try: headers, rows = parser(table_text) result['format'] = fmt result['success'] = True break except Exception: continue # If parsing succeeded, populate the result if result['success']: result['headers'] = headers result['rows'] = rows result['dimensions'] = (len(rows), len(headers)) # Determine column data types result['data_types'] = self._determine_column_types(headers, rows) else: result['error'] = "Failed to parse table data in any supported format" except Exception as e: error_msg = f"Error processing table data: {str(e)}" logger.error(error_msg) logger.debug(traceback.format_exc()) result['error'] = error_msg def analyze_table(self, table_data: Dict[str, Any]) -> Dict[str, Any]: """ Analyze a parsed table to extract summary statistics and insights. Args: table_data: Parsed table data (from process_table_data) Returns: Dict containing analysis results: - summary: Text summary of the table - column_stats: Statistics for each numeric column - categorical_summaries: Summaries for categorical columns - correlations: Correlation matrix for numeric columns - dimensions: Table dimensions (rows, columns) """ analysis = { 'summary': None, 'column_stats': {}, 'categorical_summaries': {}, 'correlations': None, 'dimensions': table_data.get('dimensions', (0, 0)) } if not table_data.get('success', False) or not table_data.get('rows'): analysis['summary'] = "No valid table data to analyze" return analysis headers = table_data['headers'] rows = table_data['rows'] data_types = table_data['data_types'] # Create a more detailed summary row_count, col_count = analysis['dimensions'] analysis['summary'] = f"Table with {row_count} rows and {col_count} columns." # Create a pandas DataFrame for easier analysis try: df = self._convert_to_dataframe(headers, rows) # Calculate statistics for numeric columns for col in headers: if data_types.get(col) in ['numeric', 'integer']: try: col_stats = { 'min': float(df[col].min()), 'max': float(df[col].max()), 'mean': float(df[col].mean()), 'median': float(df[col].median()), 'std': float(df[col].std()), 'sum': float(df[col].sum()), 'count': int(df[col].count()) } analysis['column_stats'][col] = col_stats except Exception as e: logger.warning(f"Error calculating stats for column {col}: {str(e)}") elif data_types.get(col) == 'categorical': try: value_counts = df[col].value_counts().to_dict() unique_count = len(value_counts) most_common = df[col].value_counts().index[0] if not df[col].value_counts().empty else None cat_summary = { 'unique_values': unique_count, 'most_common': most_common, 'value_counts': value_counts, 'count': int(df[col].count()) } analysis['categorical_summaries'][col] = cat_summary except Exception as e: logger.warning(f"Error analyzing categorical column {col}: {str(e)}") # Calculate correlations for numeric columns numeric_cols = [col for col in headers if data_types.get(col) in ['numeric', 'integer']] if len(numeric_cols) > 1: try: corr_matrix = df[numeric_cols].corr().to_dict() analysis['correlations'] = corr_matrix except Exception as e: logger.warning(f"Error calculating correlations: {str(e)}") except Exception as e: logger.error(f"Error during table analysis: {str(e)}") logger.debug(traceback.format_exc()) analysis['error'] = str(e) return analysis def answer_table_question(self, question: str, table_data: Dict[str, Any]) -> Dict[str, Any]: """ Answer questions about a table based on its content. Args: question: Question about the table table_data: Parsed table data (from process_table_data) Returns: Dict containing: - answer: The answer to the question - confidence: Confidence score (0-1) - explanation: Explanation of how the answer was derived - query_type: Type of query detected (e.g., "sum", "mean", "max", etc.) """ result = { 'question': question, 'answer': None, 'confidence': 0.0, 'explanation': None, 'query_type': None } if not table_data.get('success', False) or not table_data.get('rows'): result['answer'] = "Cannot answer the question as the table could not be properly parsed." result['confidence'] = 0.0 return result headers = table_data['headers'] rows = table_data['rows'] data_types = table_data['data_types'] # Convert to DataFrame for easier analysis df = self._convert_to_dataframe(headers, rows) # Lowercase the question for easier matching question_lower = question.lower() # Step 1: Identify the query type query_types = { 'count': ['how many', 'count', 'number of'], 'sum': ['sum', 'total', 'add'], 'average': ['average', 'mean', 'avg'], 'median': ['median', 'middle'], 'min': ['minimum', 'min', 'smallest', 'lowest'], 'max': ['maximum', 'max', 'largest', 'highest'], 'difference': ['difference', 'gap', 'delta'], 'compare': ['compare', 'greater than', 'less than', 'larger', 'smaller'], 'list': ['list', 'what are', 'show', 'display'], 'unique': ['unique', 'distinct', 'different'] } detected_query_type = None for query_type, indicators in query_types.items(): if any(indicator in question_lower for indicator in indicators): detected_query_type = query_type break result['query_type'] = detected_query_type or 'unknown' # Step 2: Identify the target column(s) target_columns = [] for col in headers: if col.lower() in question_lower: target_columns.append(col) # If no exact matches, try partial matches if not target_columns: for col in headers: col_parts = col.lower().split() if any(part in question_lower for part in col_parts if len(part) > 3): target_columns.append(col) # If still no matches, use all columns (this is a fallback) if not target_columns: target_columns = headers result['confidence'] = max(0.4, result['confidence']) # Lower confidence result['explanation'] = "No specific column identified, analyzing all columns." else: result['confidence'] = 0.7 # Higher confidence when columns are identified target_columns_str = ", ".join(target_columns) result['explanation'] = f"Analyzing columns: {target_columns_str}" # Step 3: Execute the query based on the type and target columns try: if detected_query_type == 'count': if 'rows' in question_lower or 'entries' in question_lower: # Count rows answer = len(rows) result['answer'] = str(answer) result['confidence'] = 0.9 result['explanation'] = f"Counted {answer} rows in the table." elif 'columns' in question_lower: # Count columns answer = len(headers) result['answer'] = str(answer) result['confidence'] = 0.9 result['explanation'] = f"Counted {answer} columns in the table." else: # Count values in specific column(s) counts = {} for col in target_columns: if col in df.columns: if data_types.get(col) == 'categorical': value_counts = df[col].value_counts().to_dict() counts[col] = sum(value_counts.values()) else: counts[col] = df[col].count() if counts: if len(counts) == 1: col = list(counts.keys())[0] answer = counts[col] result['answer'] = str(answer) result['confidence'] = 0.8 result['explanation'] = f"Counted {answer} non-null values in column '{col}'." else: answer_parts = [f"{col}: {count}" for col, count in counts.items()] result['answer'] = "; ".join(answer_parts) result['confidence'] = 0.7 result['explanation'] = f"Counted values in multiple columns: {', '.join(counts.keys())}." elif detected_query_type == 'sum': sums = {} for col in target_columns: if col in df.columns and data_types.get(col) in ['numeric', 'integer']: sums[col] = df[col].sum() if sums: if len(sums) == 1: col = list(sums.keys())[0] answer = sums[col] result['answer'] = str(answer) result['confidence'] = 0.85 result['explanation'] = f"Calculated sum of {answer} for column '{col}'." else: answer_parts = [f"{col}: {sum_val}" for col, sum_val in sums.items()] result['answer'] = "; ".join(answer_parts) result['confidence'] = 0.75 result['explanation'] = f"Calculated sums for multiple columns: {', '.join(sums.keys())}." else: result['answer'] = "No numeric columns found to sum." result['confidence'] = 0.5 elif detected_query_type == 'average': averages = {} for col in target_columns: if col in df.columns and data_types.get(col) in ['numeric', 'integer']: averages[col] = df[col].mean() if averages: if len(averages) == 1: col = list(averages.keys())[0] answer = averages[col] result['answer'] = f"{answer:.2f}" result['confidence'] = 0.85 result['explanation'] = f"Calculated average of {answer:.2f} for column '{col}'." else: answer_parts = [f"{col}: {avg:.2f}" for col, avg in averages.items()] result['answer'] = "; ".join(answer_parts) result['confidence'] = 0.75 result['explanation'] = f"Calculated averages for multiple columns: {', '.join(averages.keys())}." else: result['answer'] = "No numeric columns found to average." result['confidence'] = 0.5 elif detected_query_type == 'median': medians = {} for col in target_columns: if col in df.columns and data_types.get(col) in ['numeric', 'integer']: medians[col] = df[col].median() if medians: if len(medians) == 1: col = list(medians.keys())[0] answer = medians[col] result['answer'] = f"{answer:.2f}" result['confidence'] = 0.85 result['explanation'] = f"Calculated median of {answer:.2f} for column '{col}'." else: answer_parts = [f"{col}: {med:.2f}" for col, med in medians.items()] result['answer'] = "; ".join(answer_parts) result['confidence'] = 0.75 result['explanation'] = f"Calculated medians for multiple columns: {', '.join(medians.keys())}." else: result['answer'] = "No numeric columns found to find median." result['confidence'] = 0.5 elif detected_query_type == 'min': minimums = {} for col in target_columns: if col in df.columns and data_types.get(col) in ['numeric', 'integer']: minimums[col] = df[col].min() if minimums: if len(minimums) == 1: col = list(minimums.keys())[0] answer = minimums[col] result['answer'] = str(answer) result['confidence'] = 0.85 result['explanation'] = f"Found minimum value of {answer} in column '{col}'." else: answer_parts = [f"{col}: {min_val}" for col, min_val in minimums.items()] result['answer'] = "; ".join(answer_parts) result['confidence'] = 0.75 result['explanation'] = f"Found minimum values for multiple columns: {', '.join(minimums.keys())}." else: result['answer'] = "No numeric columns found to determine minimum." result['confidence'] = 0.5 elif detected_query_type == 'max': maximums = {} for col in target_columns: if col in df.columns and data_types.get(col) in ['numeric', 'integer']: maximums[col] = df[col].max() if maximums: if len(maximums) == 1: col = list(maximums.keys())[0] answer = maximums[col] result['answer'] = str(answer) result['confidence'] = 0.85 result['explanation'] = f"Found maximum value of {answer} in column '{col}'." else: answer_parts = [f"{col}: {max_val}" for col, max_val in maximums.items()] result['answer'] = "; ".join(answer_parts) result['confidence'] = 0.75 result['explanation'] = f"Found maximum values for multiple columns: {', '.join(maximums.keys())}." else: result['answer'] = "No numeric columns found to determine maximum." result['confidence'] = 0.5 elif detected_query_type == 'unique': uniques = {} for col in target_columns: if col in df.columns: unique_values = df[col].unique().tolist() if len(unique_values) <= 10: # Limit to reasonable number uniques[col] = unique_values else: uniques[col] = f"{len(unique_values)} unique values" if uniques: if len(uniques) == 1: col = list(uniques.keys())[0] unique_val = uniques[col] if isinstance(unique_val, list): result['answer'] = ", ".join(str(v) for v in unique_val) else: result['answer'] = unique_val result['confidence'] = 0.85 result['explanation'] = f"Found unique values in column '{col}'." else: answer_parts = [] for col, vals in uniques.items(): if isinstance(vals, list): answer_parts.append(f"{col}: {', '.join(str(v) for v in vals)}") else: answer_parts.append(f"{col}: {vals}") result['answer'] = "; ".join(answer_parts) result['confidence'] = 0.75 result['explanation'] = f"Found unique values for multiple columns: {', '.join(uniques.keys())}." else: result['answer'] = "Could not determine unique values for the specified columns." result['confidence'] = 0.5 elif detected_query_type == 'compare': # This is a more complex query, try to identify comparison elements if len(target_columns) >= 2: col1, col2 = target_columns[:2] if data_types.get(col1) in ['numeric', 'integer'] and data_types.get(col2) in ['numeric', 'integer']: # Compare column averages avg1 = df[col1].mean() avg2 = df[col2].mean() if avg1 > avg2: result['answer'] = f"'{col1}' has a higher average ({avg1:.2f}) than '{col2}' ({avg2:.2f})." elif avg2 > avg1: result['answer'] = f"'{col2}' has a higher average ({avg2:.2f}) than '{col1}' ({avg1:.2f})." else: result['answer'] = f"'{col1}' and '{col2}' have the same average ({avg1:.2f})." result['confidence'] = 0.8 result['explanation'] = f"Compared averages of columns '{col1}' and '{col2}'." else: result['answer'] = f"Cannot compare non-numeric columns '{col1}' and '{col2}'." result['confidence'] = 0.7 else: result['answer'] = "Need at least two columns to compare." result['confidence'] = 0.6 # Default case if no specific query type was matched else: # Provide a general summary of the table row_count, col_count = table_data['dimensions'] result['answer'] = f"The table has {row_count} rows and {col_count} columns." result['confidence'] = 0.5 result['explanation'] = "Provided a general summary as the specific query type couldn't be determined." except Exception as e: logger.error(f"Error answering table question: {str(e)}") logger.debug(traceback.format_exc()) result['answer'] = f"Error processing question: {str(e)}" result['confidence'] = 0.0 result['explanation'] = "An error occurred during analysis." return result def check_commutative_property(self, table_data: Dict[str, Any], operation: str) -> Dict[str, Any]: """ Check if an operation (like addition, multiplication) is commutative across table data. Args: table_data: Parsed table data operation: Operation to check ('add', 'multiply', '+', '*') Returns: Dict with results of commutativity check """ result = { 'is_commutative': False, 'explanation': None, 'tested_pairs': [], 'confidence': 0.0 } if not table_data.get('success', False) or not table_data.get('rows'): result['explanation'] = "Cannot perform check as the table could not be properly parsed." return result # Map operation string to actual operation operations = { 'add': np.add, 'multiply': np.multiply, '+': np.add, '*': np.multiply } op_func = operations.get(operation) if not op_func: result['explanation'] = f"Unsupported operation: {operation}" return result headers = table_data['headers'] rows = table_data['rows'] data_types = table_data['data_types'] # Find numeric columns for testing commutativity numeric_cols = [col for col in headers if data_types.get(col) in ['numeric', 'integer']] if len(numeric_cols) < 2: result['explanation'] = "Need at least two numeric columns to test commutativity." return result # Prepare data for testing df = self._convert_to_dataframe(headers, rows) # Test commutativity on pairs of columns commutative_pairs = 0 non_commutative_pairs = 0 tested_pairs = [] for i in range(len(numeric_cols)): for j in range(i+1, len(numeric_cols)): col1, col2 = numeric_cols[i], numeric_cols[j] # Apply operation in both directions result1 = op_func(df[col1], df[col2]) result2 = op_func(df[col2], df[col1]) # Check if results are equal (within floating-point precision) is_equal = np.allclose(result1, result2, rtol=1e-05, atol=1e-08, equal_nan=True) test_result = { 'col1': col1, 'col2': col2, 'is_commutative': is_equal } tested_pairs.append(test_result) if is_equal: commutative_pairs += 1 else: non_commutative_pairs += 1 # Determine overall commutativity total_pairs = commutative_pairs + non_commutative_pairs if total_pairs > 0: commutativity_ratio = commutative_pairs / total_pairs result['is_commutative'] = commutativity_ratio >= 0.99 # Require almost all pairs to be commutative result['confidence'] = commutativity_ratio if result['is_commutative']: result['explanation'] = f"Operation '{operation}' is commutative across all tested column pairs." else: result['explanation'] = f"Operation '{operation}' is not commutative for some column pairs." else: result['explanation'] = "No column pairs were tested for commutativity." result['tested_pairs'] = tested_pairs return result def _detect_table_format(self, table_text: str) -> str: """ Detect the format of a table based on its text representation. Args: table_text: Text containing the table Returns: Detected format ('markdown', 'csv', 'tsv', 'plain') """ # Check for markdown table format (pipes and dashes) if '|' in table_text and '-+-' in table_text.replace(' ', '') or ('|' in table_text and any(line.strip().startswith('|') for line in table_text.split('\n'))): return 'markdown' # Check for CSV (comma-separated) if ',' in table_text and table_text.count(',') > table_text.count('\n'): return 'csv' # Check for TSV (tab-separated) if '\t' in table_text: return 'tsv' # Default to plain text return 'plain' def _parse_markdown_table(self, table_text: str) -> Tuple[List[str], List[List[Any]]]: """ Parse a markdown-formatted table. Args: table_text: Text containing markdown table Returns: Tuple of (headers, rows) """ lines = table_text.strip().split('\n') # Find the header row (first row with pipes) header_row = None for i, line in enumerate(lines): if '|' in line: header_row = i break if header_row is None: raise ValueError("No valid markdown table found") # Extract headers header_line = lines[header_row] headers = [h.strip() for h in header_line.split('|')] headers = [h for h in headers if h] # Remove empty entries # Find the separator row separator_row = header_row + 1 if separator_row < len(lines) and all(c in '-|:' for c in lines[separator_row] if not c.isspace()): data_start = separator_row + 1 else: data_start = header_row + 1 # Extract rows rows = [] for i in range(data_start, len(lines)): line = lines[i].strip() if not line or '|' not in line: continue row_values = [cell.strip() for cell in line.split('|')] row_values = [cell for cell in row_values if cell != ''] # Remove empty cells from pipe chars # Convert values to appropriate types converted_row = self._convert_values(row_values) if converted_row: # Skip empty rows rows.append(converted_row) return headers, rows def _parse_csv(self, table_text: str) -> Tuple[List[str], List[List[Any]]]: """ Parse a CSV-formatted table. Args: table_text: Text containing CSV data Returns: Tuple of (headers, rows) """ csv_file = io.StringIO(table_text) reader = csv.reader(csv_file) all_rows = list(reader) if not all_rows: raise ValueError("No data found in CSV text") headers = all_rows[0] data_rows = [] for row in all_rows[1:]: # Skip rows that don't match header length if len(row) != len(headers): continue # Convert values to appropriate types converted_row = self._convert_values(row) data_rows.append(converted_row) return headers, data_rows def _parse_tsv(self, table_text: str) -> Tuple[List[str], List[List[Any]]]: """ Parse a TSV-formatted table. Args: table_text: Text containing TSV data Returns: Tuple of (headers, rows) """ tsv_file = io.StringIO(table_text) reader = csv.reader(tsv_file, delimiter='\t') all_rows = list(reader) if not all_rows: raise ValueError("No data found in TSV text") headers = all_rows[0] data_rows = [] for row in all_rows[1:]: # Skip rows that don't match header length if len(row) != len(headers): continue # Convert values to appropriate types converted_row = self._convert_values(row) data_rows.append(converted_row) return headers, data_rows def _parse_plain_table(self, table_text: str) -> Tuple[List[str], List[List[Any]]]: """ Parse a plain text table with space delimiters. Args: table_text: Text containing plain table data Returns: Tuple of (headers, rows) """ lines = table_text.strip().split('\n') if not lines: raise ValueError("No data found in plain text table") # Try to detect a delimiter pattern first_line = lines[0] columns = [] if ' ' in first_line: # Use double spaces as delimiter # Split by multiple spaces while preserving quoted content parts = re.findall(r'[^"]\S+(?:\s+\S+)*[^"]|"[^"]*"', first_line) headers = [p.strip().strip('"') for p in parts if p.strip()] else: # For tables with no clear delimiter, try to split on whitespace headers = first_line.split() # Process the data rows rows = [] for i in range(1, len(lines)): line = lines[i].strip() if not line: continue # Match the headers method if ' ' in first_line: parts = re.findall(r'[^"]\S+(?:\s+\S+)*[^"]|"[^"]*"', line) values = [p.strip().strip('"') for p in parts if p.strip()] else: values = line.split() # Adjust for cases where we have more or fewer values than headers if len(values) > len(headers): values = values[:len(headers)] # Truncate extra values elif len(values) < len(headers): values.extend([''] * (len(headers) - len(values))) # Pad with empty values # Convert values to appropriate types converted_row = self._convert_values(values) rows.append(converted_row) return headers, rows def _convert_values(self, values: List[str]) -> List[Any]: """ Convert string values to appropriate Python types. Args: values: List of string values Returns: List of values converted to appropriate types """ converted = [] for val in values: val = val.strip() # Try converting to numeric types try: # Try integer first converted_val = int(val) except ValueError: try: # Then try float converted_val = float(val) except ValueError: # Keep as string if not numeric converted_val = val # Handle special values if val.lower() in ('true', 'yes', 'y'): converted_val = True elif val.lower() in ('false', 'no', 'n'): converted_val = False elif val.lower() in ('none', 'null', 'na', '-', ''): converted_val = None converted.append(converted_val) return converted def _determine_column_types(self, headers: List[str], rows: List[List[Any]]) -> Dict[str, str]: """ Determine the data type of each column based on its values. Args: headers: List of column headers rows: List of data rows Returns: Dict mapping column names to data types """ if not rows: return {h: 'unknown' for h in headers} col_count = len(headers) type_counts = {h: {'integer': 0, 'numeric': 0, 'boolean': 0, 'date': 0, 'categorical': 0} for h in headers} for row in rows: for i, val in enumerate(row[:col_count]): col_name = headers[i] if val is None: continue # Skip None values for type determination if isinstance(val, bool): type_counts[col_name]['boolean'] += 1 elif isinstance(val, int): type_counts[col_name]['integer'] += 1 elif isinstance(val, float): type_counts[col_name]['numeric'] += 1 elif isinstance(val, str): # Try to determine if it's a date if re.match(r'\d{1,4}[-/]\d{1,2}[-/]\d{1,4}', val): type_counts[col_name]['date'] += 1 else: type_counts[col_name]['categorical'] += 1 # Determine the predominant type for each column data_types = {} for col in headers: counts = type_counts[col] # Check for empty columns if sum(counts.values()) == 0: data_types[col] = 'unknown' continue # Determine the most common type max_type = max(counts.items(), key=lambda x: x[1]) most_common_type = max_type[0] # Special case: if most values are integers but some are float, use numeric if most_common_type == 'integer' and counts['numeric'] > 0: data_types[col] = 'numeric' else: data_types[col] = most_common_type return data_types def _convert_to_dataframe(self, headers: List[str], rows: List[List[Any]]) -> pd.DataFrame: """ Convert headers and rows to a pandas DataFrame. Args: headers: List of column headers rows: List of data rows Returns: pandas DataFrame """ # Create a DataFrame df = pd.DataFrame(rows, columns=headers) # Convert columns to appropriate types where possible for col in df.columns: # Check if column can be converted to numeric if df[col].dtype == 'object': try: # Try to convert to numeric, coerce errors to NaN numeric_col = pd.to_numeric(df[col], errors='coerce') # If conversion successful (not all NaN), update the column if not numeric_col.isna().all(): df[col] = numeric_col except: pass return df