File size: 6,270 Bytes
8e7062f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import yaml
import json
import markdown
from fuzzywuzzy import fuzz, process
from typing import Dict, List, Any
import logging

def parse_yaml(yaml_text: str) -> Dict[str, Any]:
    """Parse YAML text and return dictionary"""
    try:
        return yaml.safe_load(yaml_text)
    except yaml.YAMLError as e:
        logging.error(f"YAML parsing error: {str(e)}")
        raise e

def fuzzy_search(query: str, data: Dict[str, Any], threshold: int = 60) -> List[Dict[str, Any]]:
    """Perform fuzzy search on dictionary data"""
    matches = []
    
    if not isinstance(data, dict):
        return matches
    
    for key, value in data.items():
        if isinstance(value, (str, int, float)):
            value_str = str(value)
            
            # Check fuzzy match for key
            key_score = fuzz.partial_ratio(query.lower(), key.lower())
            if key_score >= threshold:
                matches.append({
                    'type': 'key',
                    'field': key,
                    'value': value_str,
                    'score': key_score
                })
            
            # Check fuzzy match for value
            value_score = fuzz.partial_ratio(query.lower(), value_str.lower())
            if value_score >= threshold:
                matches.append({
                    'type': 'value',
                    'field': key,
                    'value': value_str,
                    'score': value_score
                })
    
    # Sort by score descending
    matches.sort(key=lambda x: x['score'], reverse=True)
    return matches

def render_markdown(text: str) -> str:
    """Render markdown text to HTML with emoji support"""
    try:
        md = markdown.Markdown(extensions=['extra', 'codehilite'])
        html = md.convert(text)
        
        # Basic emoji support - convert common emoji codes
        emoji_map = {
            ':smile:': '😊',
            ':heart:': '❀️',
            ':thumbsup:': 'πŸ‘',
            ':thumbsdown:': 'πŸ‘Ž',
            ':fire:': 'πŸ”₯',
            ':rocket:': 'πŸš€',
            ':star:': '⭐',
            ':check:': 'βœ…',
            ':x:': '❌',
            ':warning:': '⚠️',
            ':info:': 'ℹ️',
            ':bulb:': 'πŸ’‘',
            ':tada:': 'πŸŽ‰'
        }
        
        for code, emoji in emoji_map.items():
            html = html.replace(code, emoji)
        
        return html
    except Exception as e:
        logging.error(f"Markdown rendering error: {str(e)}")
        return text

def create_dynamic_table(table_name: str, schema: Dict[str, Any]) -> bool:
    """Create a dynamic table based on schema (for future implementation)"""
    # This function can be expanded to create actual database tables
    # For now, we use the generic DataRecord model with JSON storage
    try:
        logging.info(f"Creating dynamic table: {table_name} with schema: {schema}")
        return True
    except Exception as e:
        logging.error(f"Error creating dynamic table: {str(e)}")
        return False

def validate_schema(schema: Dict[str, Any]) -> bool:
    """Validate table schema format"""
    if not isinstance(schema, dict):
        return False
    
    if 'fields' not in schema:
        return False
    
    if not isinstance(schema['fields'], list):
        return False
    
    for field in schema['fields']:
        if not isinstance(field, dict):
            return False
        if 'name' not in field or 'type' not in field:
            return False
    
    return True

def process_pipeline_data(pipeline_config: Dict[str, Any], source_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Process data through a pipeline configuration"""
    processed_data = source_data.copy()
    
    try:
        # Apply transformations based on pipeline config
        transformations = pipeline_config.get('transformations', [])
        
        for transformation in transformations:
            transform_type = transformation.get('type')
            
            if transform_type == 'filter':
                condition = transformation.get('condition')
                processed_data = [item for item in processed_data if eval_condition(item, condition)]
            
            elif transform_type == 'map':
                mapping = transformation.get('mapping')
                for item in processed_data:
                    apply_mapping(item, mapping)
            
            elif transform_type == 'sort':
                field = transformation.get('field')
                reverse = transformation.get('reverse', False)
                processed_data.sort(key=lambda x: x.get(field, ''), reverse=reverse)
        
        return processed_data
    
    except Exception as e:
        logging.error(f"Pipeline processing error: {str(e)}")
        return source_data

def eval_condition(data: Dict[str, Any], condition: Dict[str, Any]) -> bool:
    """Evaluate a condition against data"""
    try:
        field = condition.get('field')
        operator = condition.get('operator')
        value = condition.get('value')
        
        if not field or not operator:
            return True
            
        data_value = data.get(field)
        
        if operator == 'equals':
            return data_value == value
        elif operator == 'contains':
            if data_value is None or value is None:
                return False
            return str(value).lower() in str(data_value).lower()
        elif operator == 'gt':
            try:
                return float(data_value or 0) > float(value or 0)
            except (ValueError, TypeError):
                return False
        elif operator == 'lt':
            try:
                return float(data_value or 0) < float(value or 0)
            except (ValueError, TypeError):
                return False
        
        return True
    except Exception:
        return True

def apply_mapping(data: Dict[str, Any], mapping: Dict[str, str]) -> None:
    """Apply field mapping to data"""
    try:
        for old_field, new_field in mapping.items():
            if old_field in data:
                data[new_field] = data.pop(old_field)
    except Exception as e:
        logging.error(f"Mapping error: {str(e)}")