Spaces:
Runtime error
Runtime error
import yaml | |
import json | |
import markdown | |
from fuzzywuzzy import fuzz, process | |
from typing import Dict, List, Any | |
import logging | |
def parse_yaml(yaml_text: str) -> Dict[str, Any]: | |
"""Parse YAML text and return dictionary""" | |
try: | |
return yaml.safe_load(yaml_text) | |
except yaml.YAMLError as e: | |
logging.error(f"YAML parsing error: {str(e)}") | |
raise e | |
def fuzzy_search(query: str, data: Dict[str, Any], threshold: int = 60) -> List[Dict[str, Any]]: | |
"""Perform fuzzy search on dictionary data""" | |
matches = [] | |
if not isinstance(data, dict): | |
return matches | |
for key, value in data.items(): | |
if isinstance(value, (str, int, float)): | |
value_str = str(value) | |
# Check fuzzy match for key | |
key_score = fuzz.partial_ratio(query.lower(), key.lower()) | |
if key_score >= threshold: | |
matches.append({ | |
'type': 'key', | |
'field': key, | |
'value': value_str, | |
'score': key_score | |
}) | |
# Check fuzzy match for value | |
value_score = fuzz.partial_ratio(query.lower(), value_str.lower()) | |
if value_score >= threshold: | |
matches.append({ | |
'type': 'value', | |
'field': key, | |
'value': value_str, | |
'score': value_score | |
}) | |
# Sort by score descending | |
matches.sort(key=lambda x: x['score'], reverse=True) | |
return matches | |
def render_markdown(text: str) -> str: | |
"""Render markdown text to HTML with emoji support""" | |
try: | |
md = markdown.Markdown(extensions=['extra', 'codehilite']) | |
html = md.convert(text) | |
# Basic emoji support - convert common emoji codes | |
emoji_map = { | |
':smile:': 'π', | |
':heart:': 'β€οΈ', | |
':thumbsup:': 'π', | |
':thumbsdown:': 'π', | |
':fire:': 'π₯', | |
':rocket:': 'π', | |
':star:': 'β', | |
':check:': 'β ', | |
':x:': 'β', | |
':warning:': 'β οΈ', | |
':info:': 'βΉοΈ', | |
':bulb:': 'π‘', | |
':tada:': 'π' | |
} | |
for code, emoji in emoji_map.items(): | |
html = html.replace(code, emoji) | |
return html | |
except Exception as e: | |
logging.error(f"Markdown rendering error: {str(e)}") | |
return text | |
def create_dynamic_table(table_name: str, schema: Dict[str, Any]) -> bool: | |
"""Create a dynamic table based on schema (for future implementation)""" | |
# This function can be expanded to create actual database tables | |
# For now, we use the generic DataRecord model with JSON storage | |
try: | |
logging.info(f"Creating dynamic table: {table_name} with schema: {schema}") | |
return True | |
except Exception as e: | |
logging.error(f"Error creating dynamic table: {str(e)}") | |
return False | |
def validate_schema(schema: Dict[str, Any]) -> bool: | |
"""Validate table schema format""" | |
if not isinstance(schema, dict): | |
return False | |
if 'fields' not in schema: | |
return False | |
if not isinstance(schema['fields'], list): | |
return False | |
for field in schema['fields']: | |
if not isinstance(field, dict): | |
return False | |
if 'name' not in field or 'type' not in field: | |
return False | |
return True | |
def process_pipeline_data(pipeline_config: Dict[str, Any], source_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
"""Process data through a pipeline configuration""" | |
processed_data = source_data.copy() | |
try: | |
# Apply transformations based on pipeline config | |
transformations = pipeline_config.get('transformations', []) | |
for transformation in transformations: | |
transform_type = transformation.get('type') | |
if transform_type == 'filter': | |
condition = transformation.get('condition') | |
processed_data = [item for item in processed_data if eval_condition(item, condition)] | |
elif transform_type == 'map': | |
mapping = transformation.get('mapping') | |
for item in processed_data: | |
apply_mapping(item, mapping) | |
elif transform_type == 'sort': | |
field = transformation.get('field') | |
reverse = transformation.get('reverse', False) | |
processed_data.sort(key=lambda x: x.get(field, ''), reverse=reverse) | |
return processed_data | |
except Exception as e: | |
logging.error(f"Pipeline processing error: {str(e)}") | |
return source_data | |
def eval_condition(data: Dict[str, Any], condition: Dict[str, Any]) -> bool: | |
"""Evaluate a condition against data""" | |
try: | |
field = condition.get('field') | |
operator = condition.get('operator') | |
value = condition.get('value') | |
if not field or not operator: | |
return True | |
data_value = data.get(field) | |
if operator == 'equals': | |
return data_value == value | |
elif operator == 'contains': | |
if data_value is None or value is None: | |
return False | |
return str(value).lower() in str(data_value).lower() | |
elif operator == 'gt': | |
try: | |
return float(data_value or 0) > float(value or 0) | |
except (ValueError, TypeError): | |
return False | |
elif operator == 'lt': | |
try: | |
return float(data_value or 0) < float(value or 0) | |
except (ValueError, TypeError): | |
return False | |
return True | |
except Exception: | |
return True | |
def apply_mapping(data: Dict[str, Any], mapping: Dict[str, str]) -> None: | |
"""Apply field mapping to data""" | |
try: | |
for old_field, new_field in mapping.items(): | |
if old_field in data: | |
data[new_field] = data.pop(old_field) | |
except Exception as e: | |
logging.error(f"Mapping error: {str(e)}") | |