|
""" |
|
Table Data Processor Component |
|
|
|
This module provides capabilities for parsing, analyzing, and extracting information |
|
from tabular data in various formats (markdown tables, CSV, TSV, etc.) |
|
""" |
|
|
|
import re |
|
import logging |
|
import csv |
|
import io |
|
from typing import List, Dict, Any, Tuple, Optional, Set, Union |
|
import traceback |
|
import pandas as pd |
|
import numpy as np |
|
from collections import defaultdict |
|
|
|
logger = logging.getLogger("gaia_agent.components.table_processor") |
|
|
|
class TableProcessor: |
|
""" |
|
Handles parsing, analysis, and operations on tabular data structures. |
|
Provides capabilities for answering questions about table data. |
|
""" |
|
|
|
def __init__(self): |
|
"""Initialize the table processor component.""" |
|
self.supported_formats = { |
|
'markdown': self._parse_markdown_table, |
|
'csv': self._parse_csv, |
|
'tsv': self._parse_tsv, |
|
'plain': self._parse_plain_table |
|
} |
|
logger.info("TableProcessor initialized") |
|
|
|
def process_table_data(self, table_text: str, format_hint: str = None) -> Dict[str, Any]: |
|
""" |
|
Process tabular data from a text representation. |
|
|
|
Args: |
|
table_text: Text containing the table data |
|
format_hint: Optional hint about the table format ('markdown', 'csv', 'tsv', 'plain') |
|
|
|
Returns: |
|
Dict containing parsed table information: |
|
- headers: List of column headers |
|
- rows: List of rows (each row is a list of values) |
|
- data_types: Dict mapping column names to data types |
|
- dimensions: Tuple of (rows, columns) |
|
- format: Detected format of the table |
|
- success: Boolean indicating successful parsing |
|
- error: Error message if parsing failed |
|
""" |
|
result = { |
|
'headers': [], |
|
'rows': [], |
|
'data_types': {}, |
|
'dimensions': (0, 0), |
|
'format': None, |
|
'success': False, |
|
'error': None |
|
} |
|
|
|
try: |
|
|
|
if format_hint and format_hint in self.supported_formats: |
|
try: |
|
headers, rows = self.supported_formats[format_hint](table_text) |
|
result['format'] = format_hint |
|
result['success'] = True |
|
except Exception as e: |
|
logger.warning(f"Failed to parse as {format_hint}, trying auto-detection: {str(e)}") |
|
format_hint = None |
|
|
|
|
|
if not format_hint or not result['success']: |
|
detected_format = self._detect_table_format(table_text) |
|
if detected_format in self.supported_formats: |
|
headers, rows = self.supported_formats[detected_format](table_text) |
|
result['format'] = detected_format |
|
result['success'] = True |
|
else: |
|
|
|
for fmt, parser in self.supported_formats.items(): |
|
try: |
|
headers, rows = parser(table_text) |
|
result['format'] = fmt |
|
result['success'] = True |
|
break |
|
except Exception: |
|
continue |
|
|
|
|
|
if result['success']: |
|
result['headers'] = headers |
|
result['rows'] = rows |
|
result['dimensions'] = (len(rows), len(headers)) |
|
|
|
|
|
result['data_types'] = self._determine_column_types(headers, rows) |
|
else: |
|
result['error'] = "Failed to parse table data in any supported format" |
|
|
|
except Exception as e: |
|
error_msg = f"Error processing table data: {str(e)}" |
|
logger.error(error_msg) |
|
logger.debug(traceback.format_exc()) |
|
result['error'] = error_msg |
|
|
|
def analyze_table(self, table_data: Dict[str, Any]) -> Dict[str, Any]: |
|
""" |
|
Analyze a parsed table to extract summary statistics and insights. |
|
|
|
Args: |
|
table_data: Parsed table data (from process_table_data) |
|
|
|
Returns: |
|
Dict containing analysis results: |
|
- summary: Text summary of the table |
|
- column_stats: Statistics for each numeric column |
|
- categorical_summaries: Summaries for categorical columns |
|
- correlations: Correlation matrix for numeric columns |
|
- dimensions: Table dimensions (rows, columns) |
|
""" |
|
analysis = { |
|
'summary': None, |
|
'column_stats': {}, |
|
'categorical_summaries': {}, |
|
'correlations': None, |
|
'dimensions': table_data.get('dimensions', (0, 0)) |
|
} |
|
|
|
if not table_data.get('success', False) or not table_data.get('rows'): |
|
analysis['summary'] = "No valid table data to analyze" |
|
return analysis |
|
|
|
headers = table_data['headers'] |
|
rows = table_data['rows'] |
|
data_types = table_data['data_types'] |
|
|
|
|
|
row_count, col_count = analysis['dimensions'] |
|
analysis['summary'] = f"Table with {row_count} rows and {col_count} columns." |
|
|
|
|
|
try: |
|
df = self._convert_to_dataframe(headers, rows) |
|
|
|
|
|
for col in headers: |
|
if data_types.get(col) in ['numeric', 'integer']: |
|
try: |
|
col_stats = { |
|
'min': float(df[col].min()), |
|
'max': float(df[col].max()), |
|
'mean': float(df[col].mean()), |
|
'median': float(df[col].median()), |
|
'std': float(df[col].std()), |
|
'sum': float(df[col].sum()), |
|
'count': int(df[col].count()) |
|
} |
|
analysis['column_stats'][col] = col_stats |
|
except Exception as e: |
|
logger.warning(f"Error calculating stats for column {col}: {str(e)}") |
|
|
|
elif data_types.get(col) == 'categorical': |
|
try: |
|
value_counts = df[col].value_counts().to_dict() |
|
unique_count = len(value_counts) |
|
most_common = df[col].value_counts().index[0] if not df[col].value_counts().empty else None |
|
|
|
cat_summary = { |
|
'unique_values': unique_count, |
|
'most_common': most_common, |
|
'value_counts': value_counts, |
|
'count': int(df[col].count()) |
|
} |
|
analysis['categorical_summaries'][col] = cat_summary |
|
except Exception as e: |
|
logger.warning(f"Error analyzing categorical column {col}: {str(e)}") |
|
|
|
|
|
numeric_cols = [col for col in headers if data_types.get(col) in ['numeric', 'integer']] |
|
if len(numeric_cols) > 1: |
|
try: |
|
corr_matrix = df[numeric_cols].corr().to_dict() |
|
analysis['correlations'] = corr_matrix |
|
except Exception as e: |
|
logger.warning(f"Error calculating correlations: {str(e)}") |
|
|
|
except Exception as e: |
|
logger.error(f"Error during table analysis: {str(e)}") |
|
logger.debug(traceback.format_exc()) |
|
analysis['error'] = str(e) |
|
|
|
return analysis |
|
def answer_table_question(self, question: str, table_data: Dict[str, Any]) -> Dict[str, Any]: |
|
""" |
|
Answer questions about a table based on its content. |
|
|
|
Args: |
|
question: Question about the table |
|
table_data: Parsed table data (from process_table_data) |
|
|
|
Returns: |
|
Dict containing: |
|
- answer: The answer to the question |
|
- confidence: Confidence score (0-1) |
|
- explanation: Explanation of how the answer was derived |
|
- query_type: Type of query detected (e.g., "sum", "mean", "max", etc.) |
|
""" |
|
result = { |
|
'question': question, |
|
'answer': None, |
|
'confidence': 0.0, |
|
'explanation': None, |
|
'query_type': None |
|
} |
|
|
|
if not table_data.get('success', False) or not table_data.get('rows'): |
|
result['answer'] = "Cannot answer the question as the table could not be properly parsed." |
|
result['confidence'] = 0.0 |
|
return result |
|
|
|
headers = table_data['headers'] |
|
rows = table_data['rows'] |
|
data_types = table_data['data_types'] |
|
|
|
|
|
df = self._convert_to_dataframe(headers, rows) |
|
|
|
|
|
question_lower = question.lower() |
|
|
|
|
|
query_types = { |
|
'count': ['how many', 'count', 'number of'], |
|
'sum': ['sum', 'total', 'add'], |
|
'average': ['average', 'mean', 'avg'], |
|
'median': ['median', 'middle'], |
|
'min': ['minimum', 'min', 'smallest', 'lowest'], |
|
'max': ['maximum', 'max', 'largest', 'highest'], |
|
'difference': ['difference', 'gap', 'delta'], |
|
'compare': ['compare', 'greater than', 'less than', 'larger', 'smaller'], |
|
'list': ['list', 'what are', 'show', 'display'], |
|
'unique': ['unique', 'distinct', 'different'] |
|
} |
|
|
|
detected_query_type = None |
|
for query_type, indicators in query_types.items(): |
|
if any(indicator in question_lower for indicator in indicators): |
|
detected_query_type = query_type |
|
break |
|
|
|
result['query_type'] = detected_query_type or 'unknown' |
|
|
|
|
|
target_columns = [] |
|
for col in headers: |
|
if col.lower() in question_lower: |
|
target_columns.append(col) |
|
|
|
|
|
if not target_columns: |
|
for col in headers: |
|
col_parts = col.lower().split() |
|
if any(part in question_lower for part in col_parts if len(part) > 3): |
|
target_columns.append(col) |
|
|
|
|
|
if not target_columns: |
|
target_columns = headers |
|
result['confidence'] = max(0.4, result['confidence']) |
|
result['explanation'] = "No specific column identified, analyzing all columns." |
|
else: |
|
result['confidence'] = 0.7 |
|
target_columns_str = ", ".join(target_columns) |
|
result['explanation'] = f"Analyzing columns: {target_columns_str}" |
|
|
|
|
|
try: |
|
if detected_query_type == 'count': |
|
if 'rows' in question_lower or 'entries' in question_lower: |
|
|
|
answer = len(rows) |
|
result['answer'] = str(answer) |
|
result['confidence'] = 0.9 |
|
result['explanation'] = f"Counted {answer} rows in the table." |
|
elif 'columns' in question_lower: |
|
|
|
answer = len(headers) |
|
result['answer'] = str(answer) |
|
result['confidence'] = 0.9 |
|
result['explanation'] = f"Counted {answer} columns in the table." |
|
else: |
|
|
|
counts = {} |
|
for col in target_columns: |
|
if col in df.columns: |
|
if data_types.get(col) == 'categorical': |
|
value_counts = df[col].value_counts().to_dict() |
|
counts[col] = sum(value_counts.values()) |
|
else: |
|
counts[col] = df[col].count() |
|
|
|
if counts: |
|
if len(counts) == 1: |
|
col = list(counts.keys())[0] |
|
answer = counts[col] |
|
result['answer'] = str(answer) |
|
result['confidence'] = 0.8 |
|
result['explanation'] = f"Counted {answer} non-null values in column '{col}'." |
|
else: |
|
answer_parts = [f"{col}: {count}" for col, count in counts.items()] |
|
result['answer'] = "; ".join(answer_parts) |
|
result['confidence'] = 0.7 |
|
result['explanation'] = f"Counted values in multiple columns: {', '.join(counts.keys())}." |
|
|
|
elif detected_query_type == 'sum': |
|
sums = {} |
|
for col in target_columns: |
|
if col in df.columns and data_types.get(col) in ['numeric', 'integer']: |
|
sums[col] = df[col].sum() |
|
|
|
if sums: |
|
if len(sums) == 1: |
|
col = list(sums.keys())[0] |
|
answer = sums[col] |
|
result['answer'] = str(answer) |
|
result['confidence'] = 0.85 |
|
result['explanation'] = f"Calculated sum of {answer} for column '{col}'." |
|
else: |
|
answer_parts = [f"{col}: {sum_val}" for col, sum_val in sums.items()] |
|
result['answer'] = "; ".join(answer_parts) |
|
result['confidence'] = 0.75 |
|
result['explanation'] = f"Calculated sums for multiple columns: {', '.join(sums.keys())}." |
|
else: |
|
result['answer'] = "No numeric columns found to sum." |
|
result['confidence'] = 0.5 |
|
|
|
elif detected_query_type == 'average': |
|
averages = {} |
|
for col in target_columns: |
|
if col in df.columns and data_types.get(col) in ['numeric', 'integer']: |
|
averages[col] = df[col].mean() |
|
|
|
if averages: |
|
if len(averages) == 1: |
|
col = list(averages.keys())[0] |
|
answer = averages[col] |
|
result['answer'] = f"{answer:.2f}" |
|
result['confidence'] = 0.85 |
|
result['explanation'] = f"Calculated average of {answer:.2f} for column '{col}'." |
|
else: |
|
answer_parts = [f"{col}: {avg:.2f}" for col, avg in averages.items()] |
|
result['answer'] = "; ".join(answer_parts) |
|
result['confidence'] = 0.75 |
|
result['explanation'] = f"Calculated averages for multiple columns: {', '.join(averages.keys())}." |
|
else: |
|
result['answer'] = "No numeric columns found to average." |
|
result['confidence'] = 0.5 |
|
|
|
elif detected_query_type == 'median': |
|
medians = {} |
|
for col in target_columns: |
|
if col in df.columns and data_types.get(col) in ['numeric', 'integer']: |
|
medians[col] = df[col].median() |
|
|
|
if medians: |
|
if len(medians) == 1: |
|
col = list(medians.keys())[0] |
|
answer = medians[col] |
|
result['answer'] = f"{answer:.2f}" |
|
result['confidence'] = 0.85 |
|
result['explanation'] = f"Calculated median of {answer:.2f} for column '{col}'." |
|
else: |
|
answer_parts = [f"{col}: {med:.2f}" for col, med in medians.items()] |
|
result['answer'] = "; ".join(answer_parts) |
|
result['confidence'] = 0.75 |
|
result['explanation'] = f"Calculated medians for multiple columns: {', '.join(medians.keys())}." |
|
else: |
|
result['answer'] = "No numeric columns found to find median." |
|
result['confidence'] = 0.5 |
|
|
|
elif detected_query_type == 'min': |
|
minimums = {} |
|
for col in target_columns: |
|
if col in df.columns and data_types.get(col) in ['numeric', 'integer']: |
|
minimums[col] = df[col].min() |
|
|
|
if minimums: |
|
if len(minimums) == 1: |
|
col = list(minimums.keys())[0] |
|
answer = minimums[col] |
|
result['answer'] = str(answer) |
|
result['confidence'] = 0.85 |
|
result['explanation'] = f"Found minimum value of {answer} in column '{col}'." |
|
else: |
|
answer_parts = [f"{col}: {min_val}" for col, min_val in minimums.items()] |
|
result['answer'] = "; ".join(answer_parts) |
|
result['confidence'] = 0.75 |
|
result['explanation'] = f"Found minimum values for multiple columns: {', '.join(minimums.keys())}." |
|
else: |
|
result['answer'] = "No numeric columns found to determine minimum." |
|
result['confidence'] = 0.5 |
|
|
|
elif detected_query_type == 'max': |
|
maximums = {} |
|
for col in target_columns: |
|
if col in df.columns and data_types.get(col) in ['numeric', 'integer']: |
|
maximums[col] = df[col].max() |
|
|
|
if maximums: |
|
if len(maximums) == 1: |
|
col = list(maximums.keys())[0] |
|
answer = maximums[col] |
|
result['answer'] = str(answer) |
|
result['confidence'] = 0.85 |
|
result['explanation'] = f"Found maximum value of {answer} in column '{col}'." |
|
else: |
|
answer_parts = [f"{col}: {max_val}" for col, max_val in maximums.items()] |
|
result['answer'] = "; ".join(answer_parts) |
|
result['confidence'] = 0.75 |
|
result['explanation'] = f"Found maximum values for multiple columns: {', '.join(maximums.keys())}." |
|
else: |
|
result['answer'] = "No numeric columns found to determine maximum." |
|
result['confidence'] = 0.5 |
|
|
|
elif detected_query_type == 'unique': |
|
uniques = {} |
|
for col in target_columns: |
|
if col in df.columns: |
|
unique_values = df[col].unique().tolist() |
|
if len(unique_values) <= 10: |
|
uniques[col] = unique_values |
|
else: |
|
uniques[col] = f"{len(unique_values)} unique values" |
|
|
|
if uniques: |
|
if len(uniques) == 1: |
|
col = list(uniques.keys())[0] |
|
unique_val = uniques[col] |
|
if isinstance(unique_val, list): |
|
result['answer'] = ", ".join(str(v) for v in unique_val) |
|
else: |
|
result['answer'] = unique_val |
|
result['confidence'] = 0.85 |
|
result['explanation'] = f"Found unique values in column '{col}'." |
|
else: |
|
answer_parts = [] |
|
for col, vals in uniques.items(): |
|
if isinstance(vals, list): |
|
answer_parts.append(f"{col}: {', '.join(str(v) for v in vals)}") |
|
else: |
|
answer_parts.append(f"{col}: {vals}") |
|
result['answer'] = "; ".join(answer_parts) |
|
result['confidence'] = 0.75 |
|
result['explanation'] = f"Found unique values for multiple columns: {', '.join(uniques.keys())}." |
|
else: |
|
result['answer'] = "Could not determine unique values for the specified columns." |
|
result['confidence'] = 0.5 |
|
|
|
elif detected_query_type == 'compare': |
|
|
|
if len(target_columns) >= 2: |
|
col1, col2 = target_columns[:2] |
|
if data_types.get(col1) in ['numeric', 'integer'] and data_types.get(col2) in ['numeric', 'integer']: |
|
|
|
avg1 = df[col1].mean() |
|
avg2 = df[col2].mean() |
|
|
|
if avg1 > avg2: |
|
result['answer'] = f"'{col1}' has a higher average ({avg1:.2f}) than '{col2}' ({avg2:.2f})." |
|
elif avg2 > avg1: |
|
result['answer'] = f"'{col2}' has a higher average ({avg2:.2f}) than '{col1}' ({avg1:.2f})." |
|
else: |
|
result['answer'] = f"'{col1}' and '{col2}' have the same average ({avg1:.2f})." |
|
|
|
result['confidence'] = 0.8 |
|
result['explanation'] = f"Compared averages of columns '{col1}' and '{col2}'." |
|
else: |
|
result['answer'] = f"Cannot compare non-numeric columns '{col1}' and '{col2}'." |
|
result['confidence'] = 0.7 |
|
else: |
|
result['answer'] = "Need at least two columns to compare." |
|
result['confidence'] = 0.6 |
|
|
|
|
|
else: |
|
|
|
row_count, col_count = table_data['dimensions'] |
|
result['answer'] = f"The table has {row_count} rows and {col_count} columns." |
|
result['confidence'] = 0.5 |
|
result['explanation'] = "Provided a general summary as the specific query type couldn't be determined." |
|
|
|
except Exception as e: |
|
logger.error(f"Error answering table question: {str(e)}") |
|
logger.debug(traceback.format_exc()) |
|
result['answer'] = f"Error processing question: {str(e)}" |
|
result['confidence'] = 0.0 |
|
result['explanation'] = "An error occurred during analysis." |
|
|
|
return result |
|
def check_commutative_property(self, table_data: Dict[str, Any], operation: str) -> Dict[str, Any]: |
|
""" |
|
Check if an operation (like addition, multiplication) is commutative across table data. |
|
|
|
Args: |
|
table_data: Parsed table data |
|
operation: Operation to check ('add', 'multiply', '+', '*') |
|
|
|
Returns: |
|
Dict with results of commutativity check |
|
""" |
|
result = { |
|
'is_commutative': False, |
|
'explanation': None, |
|
'tested_pairs': [], |
|
'confidence': 0.0 |
|
} |
|
|
|
if not table_data.get('success', False) or not table_data.get('rows'): |
|
result['explanation'] = "Cannot perform check as the table could not be properly parsed." |
|
return result |
|
|
|
|
|
operations = { |
|
'add': np.add, |
|
'multiply': np.multiply, |
|
'+': np.add, |
|
'*': np.multiply |
|
} |
|
|
|
op_func = operations.get(operation) |
|
if not op_func: |
|
result['explanation'] = f"Unsupported operation: {operation}" |
|
return result |
|
|
|
headers = table_data['headers'] |
|
rows = table_data['rows'] |
|
data_types = table_data['data_types'] |
|
|
|
|
|
numeric_cols = [col for col in headers if data_types.get(col) in ['numeric', 'integer']] |
|
|
|
if len(numeric_cols) < 2: |
|
result['explanation'] = "Need at least two numeric columns to test commutativity." |
|
return result |
|
|
|
|
|
df = self._convert_to_dataframe(headers, rows) |
|
|
|
|
|
commutative_pairs = 0 |
|
non_commutative_pairs = 0 |
|
tested_pairs = [] |
|
|
|
for i in range(len(numeric_cols)): |
|
for j in range(i+1, len(numeric_cols)): |
|
col1, col2 = numeric_cols[i], numeric_cols[j] |
|
|
|
|
|
result1 = op_func(df[col1], df[col2]) |
|
result2 = op_func(df[col2], df[col1]) |
|
|
|
|
|
is_equal = np.allclose(result1, result2, rtol=1e-05, atol=1e-08, equal_nan=True) |
|
|
|
test_result = { |
|
'col1': col1, |
|
'col2': col2, |
|
'is_commutative': is_equal |
|
} |
|
tested_pairs.append(test_result) |
|
|
|
if is_equal: |
|
commutative_pairs += 1 |
|
else: |
|
non_commutative_pairs += 1 |
|
|
|
|
|
total_pairs = commutative_pairs + non_commutative_pairs |
|
if total_pairs > 0: |
|
commutativity_ratio = commutative_pairs / total_pairs |
|
result['is_commutative'] = commutativity_ratio >= 0.99 |
|
result['confidence'] = commutativity_ratio |
|
|
|
if result['is_commutative']: |
|
result['explanation'] = f"Operation '{operation}' is commutative across all tested column pairs." |
|
else: |
|
result['explanation'] = f"Operation '{operation}' is not commutative for some column pairs." |
|
else: |
|
result['explanation'] = "No column pairs were tested for commutativity." |
|
|
|
result['tested_pairs'] = tested_pairs |
|
return result |
|
def _detect_table_format(self, table_text: str) -> str: |
|
""" |
|
Detect the format of a table based on its text representation. |
|
|
|
Args: |
|
table_text: Text containing the table |
|
|
|
Returns: |
|
Detected format ('markdown', 'csv', 'tsv', 'plain') |
|
""" |
|
|
|
if '|' in table_text and '-+-' in table_text.replace(' ', '') or ('|' in table_text and any(line.strip().startswith('|') for line in table_text.split('\n'))): |
|
return 'markdown' |
|
|
|
|
|
if ',' in table_text and table_text.count(',') > table_text.count('\n'): |
|
return 'csv' |
|
|
|
|
|
if '\t' in table_text: |
|
return 'tsv' |
|
|
|
|
|
return 'plain' |
|
|
|
def _parse_markdown_table(self, table_text: str) -> Tuple[List[str], List[List[Any]]]: |
|
""" |
|
Parse a markdown-formatted table. |
|
|
|
Args: |
|
table_text: Text containing markdown table |
|
|
|
Returns: |
|
Tuple of (headers, rows) |
|
""" |
|
lines = table_text.strip().split('\n') |
|
|
|
|
|
header_row = None |
|
for i, line in enumerate(lines): |
|
if '|' in line: |
|
header_row = i |
|
break |
|
|
|
if header_row is None: |
|
raise ValueError("No valid markdown table found") |
|
|
|
|
|
header_line = lines[header_row] |
|
headers = [h.strip() for h in header_line.split('|')] |
|
headers = [h for h in headers if h] |
|
|
|
|
|
separator_row = header_row + 1 |
|
if separator_row < len(lines) and all(c in '-|:' for c in lines[separator_row] if not c.isspace()): |
|
data_start = separator_row + 1 |
|
else: |
|
data_start = header_row + 1 |
|
|
|
|
|
rows = [] |
|
for i in range(data_start, len(lines)): |
|
line = lines[i].strip() |
|
if not line or '|' not in line: |
|
continue |
|
|
|
row_values = [cell.strip() for cell in line.split('|')] |
|
row_values = [cell for cell in row_values if cell != ''] |
|
|
|
|
|
converted_row = self._convert_values(row_values) |
|
|
|
if converted_row: |
|
rows.append(converted_row) |
|
|
|
return headers, rows |
|
|
|
def _parse_csv(self, table_text: str) -> Tuple[List[str], List[List[Any]]]: |
|
""" |
|
Parse a CSV-formatted table. |
|
|
|
Args: |
|
table_text: Text containing CSV data |
|
|
|
Returns: |
|
Tuple of (headers, rows) |
|
""" |
|
csv_file = io.StringIO(table_text) |
|
reader = csv.reader(csv_file) |
|
|
|
all_rows = list(reader) |
|
if not all_rows: |
|
raise ValueError("No data found in CSV text") |
|
|
|
headers = all_rows[0] |
|
data_rows = [] |
|
|
|
for row in all_rows[1:]: |
|
|
|
if len(row) != len(headers): |
|
continue |
|
|
|
|
|
converted_row = self._convert_values(row) |
|
data_rows.append(converted_row) |
|
|
|
return headers, data_rows |
|
|
|
def _parse_tsv(self, table_text: str) -> Tuple[List[str], List[List[Any]]]: |
|
""" |
|
Parse a TSV-formatted table. |
|
|
|
Args: |
|
table_text: Text containing TSV data |
|
|
|
Returns: |
|
Tuple of (headers, rows) |
|
""" |
|
tsv_file = io.StringIO(table_text) |
|
reader = csv.reader(tsv_file, delimiter='\t') |
|
|
|
all_rows = list(reader) |
|
if not all_rows: |
|
raise ValueError("No data found in TSV text") |
|
|
|
headers = all_rows[0] |
|
data_rows = [] |
|
|
|
for row in all_rows[1:]: |
|
|
|
if len(row) != len(headers): |
|
continue |
|
|
|
|
|
converted_row = self._convert_values(row) |
|
data_rows.append(converted_row) |
|
|
|
return headers, data_rows |
|
|
|
def _parse_plain_table(self, table_text: str) -> Tuple[List[str], List[List[Any]]]: |
|
""" |
|
Parse a plain text table with space delimiters. |
|
|
|
Args: |
|
table_text: Text containing plain table data |
|
|
|
Returns: |
|
Tuple of (headers, rows) |
|
""" |
|
lines = table_text.strip().split('\n') |
|
if not lines: |
|
raise ValueError("No data found in plain text table") |
|
|
|
|
|
first_line = lines[0] |
|
columns = [] |
|
|
|
if ' ' in first_line: |
|
|
|
parts = re.findall(r'[^"]\S+(?:\s+\S+)*[^"]|"[^"]*"', first_line) |
|
headers = [p.strip().strip('"') for p in parts if p.strip()] |
|
else: |
|
|
|
headers = first_line.split() |
|
|
|
|
|
rows = [] |
|
for i in range(1, len(lines)): |
|
line = lines[i].strip() |
|
if not line: |
|
continue |
|
|
|
|
|
if ' ' in first_line: |
|
parts = re.findall(r'[^"]\S+(?:\s+\S+)*[^"]|"[^"]*"', line) |
|
values = [p.strip().strip('"') for p in parts if p.strip()] |
|
else: |
|
values = line.split() |
|
|
|
|
|
if len(values) > len(headers): |
|
values = values[:len(headers)] |
|
elif len(values) < len(headers): |
|
values.extend([''] * (len(headers) - len(values))) |
|
|
|
|
|
converted_row = self._convert_values(values) |
|
rows.append(converted_row) |
|
|
|
return headers, rows |
|
|
|
def _convert_values(self, values: List[str]) -> List[Any]: |
|
""" |
|
Convert string values to appropriate Python types. |
|
|
|
Args: |
|
values: List of string values |
|
|
|
Returns: |
|
List of values converted to appropriate types |
|
""" |
|
converted = [] |
|
for val in values: |
|
val = val.strip() |
|
|
|
|
|
try: |
|
|
|
converted_val = int(val) |
|
except ValueError: |
|
try: |
|
|
|
converted_val = float(val) |
|
except ValueError: |
|
|
|
converted_val = val |
|
|
|
|
|
if val.lower() in ('true', 'yes', 'y'): |
|
converted_val = True |
|
elif val.lower() in ('false', 'no', 'n'): |
|
converted_val = False |
|
elif val.lower() in ('none', 'null', 'na', '-', ''): |
|
converted_val = None |
|
|
|
converted.append(converted_val) |
|
|
|
return converted |
|
|
|
def _determine_column_types(self, headers: List[str], rows: List[List[Any]]) -> Dict[str, str]: |
|
""" |
|
Determine the data type of each column based on its values. |
|
|
|
Args: |
|
headers: List of column headers |
|
rows: List of data rows |
|
|
|
Returns: |
|
Dict mapping column names to data types |
|
""" |
|
if not rows: |
|
return {h: 'unknown' for h in headers} |
|
|
|
col_count = len(headers) |
|
type_counts = {h: {'integer': 0, 'numeric': 0, 'boolean': 0, 'date': 0, 'categorical': 0} for h in headers} |
|
|
|
for row in rows: |
|
for i, val in enumerate(row[:col_count]): |
|
col_name = headers[i] |
|
|
|
if val is None: |
|
continue |
|
|
|
if isinstance(val, bool): |
|
type_counts[col_name]['boolean'] += 1 |
|
elif isinstance(val, int): |
|
type_counts[col_name]['integer'] += 1 |
|
elif isinstance(val, float): |
|
type_counts[col_name]['numeric'] += 1 |
|
elif isinstance(val, str): |
|
|
|
if re.match(r'\d{1,4}[-/]\d{1,2}[-/]\d{1,4}', val): |
|
type_counts[col_name]['date'] += 1 |
|
else: |
|
type_counts[col_name]['categorical'] += 1 |
|
|
|
|
|
data_types = {} |
|
|
|
for col in headers: |
|
counts = type_counts[col] |
|
|
|
|
|
if sum(counts.values()) == 0: |
|
data_types[col] = 'unknown' |
|
continue |
|
|
|
|
|
max_type = max(counts.items(), key=lambda x: x[1]) |
|
most_common_type = max_type[0] |
|
|
|
|
|
if most_common_type == 'integer' and counts['numeric'] > 0: |
|
data_types[col] = 'numeric' |
|
else: |
|
data_types[col] = most_common_type |
|
|
|
return data_types |
|
|
|
def _convert_to_dataframe(self, headers: List[str], rows: List[List[Any]]) -> pd.DataFrame: |
|
""" |
|
Convert headers and rows to a pandas DataFrame. |
|
|
|
Args: |
|
headers: List of column headers |
|
rows: List of data rows |
|
|
|
Returns: |
|
pandas DataFrame |
|
""" |
|
|
|
df = pd.DataFrame(rows, columns=headers) |
|
|
|
|
|
for col in df.columns: |
|
|
|
if df[col].dtype == 'object': |
|
try: |
|
|
|
numeric_col = pd.to_numeric(df[col], errors='coerce') |
|
|
|
|
|
if not numeric_col.isna().all(): |
|
df[col] = numeric_col |
|
except: |
|
pass |
|
|
|
return df |