import yaml import json import markdown from fuzzywuzzy import fuzz, process from typing import Dict, List, Any import logging def parse_yaml(yaml_text: str) -> Dict[str, Any]: """Parse YAML text and return dictionary""" try: return yaml.safe_load(yaml_text) except yaml.YAMLError as e: logging.error(f"YAML parsing error: {str(e)}") raise e def fuzzy_search(query: str, data: Dict[str, Any], threshold: int = 60) -> List[Dict[str, Any]]: """Perform fuzzy search on dictionary data""" matches = [] if not isinstance(data, dict): return matches for key, value in data.items(): if isinstance(value, (str, int, float)): value_str = str(value) # Check fuzzy match for key key_score = fuzz.partial_ratio(query.lower(), key.lower()) if key_score >= threshold: matches.append({ 'type': 'key', 'field': key, 'value': value_str, 'score': key_score }) # Check fuzzy match for value value_score = fuzz.partial_ratio(query.lower(), value_str.lower()) if value_score >= threshold: matches.append({ 'type': 'value', 'field': key, 'value': value_str, 'score': value_score }) # Sort by score descending matches.sort(key=lambda x: x['score'], reverse=True) return matches def render_markdown(text: str) -> str: """Render markdown text to HTML with emoji support""" try: md = markdown.Markdown(extensions=['extra', 'codehilite']) html = md.convert(text) # Basic emoji support - convert common emoji codes emoji_map = { ':smile:': '😊', ':heart:': 'â¤ī¸', ':thumbsup:': '👍', ':thumbsdown:': '👎', ':fire:': 'đŸ”Ĩ', ':rocket:': '🚀', ':star:': '⭐', ':check:': '✅', ':x:': '❌', ':warning:': 'âš ī¸', ':info:': 'â„šī¸', ':bulb:': '💡', ':tada:': '🎉' } for code, emoji in emoji_map.items(): html = html.replace(code, emoji) return html except Exception as e: logging.error(f"Markdown rendering error: {str(e)}") return text def create_dynamic_table(table_name: str, schema: Dict[str, Any]) -> bool: """Create a dynamic table based on schema (for future implementation)""" # This function can be expanded to create actual database tables # For now, we use the generic DataRecord model with JSON storage try: logging.info(f"Creating dynamic table: {table_name} with schema: {schema}") return True except Exception as e: logging.error(f"Error creating dynamic table: {str(e)}") return False def validate_schema(schema: Dict[str, Any]) -> bool: """Validate table schema format""" if not isinstance(schema, dict): return False if 'fields' not in schema: return False if not isinstance(schema['fields'], list): return False for field in schema['fields']: if not isinstance(field, dict): return False if 'name' not in field or 'type' not in field: return False return True def process_pipeline_data(pipeline_config: Dict[str, Any], source_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Process data through a pipeline configuration""" processed_data = source_data.copy() try: # Apply transformations based on pipeline config transformations = pipeline_config.get('transformations', []) for transformation in transformations: transform_type = transformation.get('type') if transform_type == 'filter': condition = transformation.get('condition') processed_data = [item for item in processed_data if eval_condition(item, condition)] elif transform_type == 'map': mapping = transformation.get('mapping') for item in processed_data: apply_mapping(item, mapping) elif transform_type == 'sort': field = transformation.get('field') reverse = transformation.get('reverse', False) processed_data.sort(key=lambda x: x.get(field, ''), reverse=reverse) return processed_data except Exception as e: logging.error(f"Pipeline processing error: {str(e)}") return source_data def eval_condition(data: Dict[str, Any], condition: Dict[str, Any]) -> bool: """Evaluate a condition against data""" try: field = condition.get('field') operator = condition.get('operator') value = condition.get('value') if not field or not operator: return True data_value = data.get(field) if operator == 'equals': return data_value == value elif operator == 'contains': if data_value is None or value is None: return False return str(value).lower() in str(data_value).lower() elif operator == 'gt': try: return float(data_value or 0) > float(value or 0) except (ValueError, TypeError): return False elif operator == 'lt': try: return float(data_value or 0) < float(value or 0) except (ValueError, TypeError): return False return True except Exception: return True def apply_mapping(data: Dict[str, Any], mapping: Dict[str, str]) -> None: """Apply field mapping to data""" try: for old_field, new_field in mapping.items(): if old_field in data: data[new_field] = data.pop(old_field) except Exception as e: logging.error(f"Mapping error: {str(e)}")