Spaces:
Runtime error
Runtime error
File size: 6,270 Bytes
8e7062f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
import yaml
import json
import markdown
from fuzzywuzzy import fuzz, process
from typing import Dict, List, Any
import logging
def parse_yaml(yaml_text: str) -> Dict[str, Any]:
"""Parse YAML text and return dictionary"""
try:
return yaml.safe_load(yaml_text)
except yaml.YAMLError as e:
logging.error(f"YAML parsing error: {str(e)}")
raise e
def fuzzy_search(query: str, data: Dict[str, Any], threshold: int = 60) -> List[Dict[str, Any]]:
"""Perform fuzzy search on dictionary data"""
matches = []
if not isinstance(data, dict):
return matches
for key, value in data.items():
if isinstance(value, (str, int, float)):
value_str = str(value)
# Check fuzzy match for key
key_score = fuzz.partial_ratio(query.lower(), key.lower())
if key_score >= threshold:
matches.append({
'type': 'key',
'field': key,
'value': value_str,
'score': key_score
})
# Check fuzzy match for value
value_score = fuzz.partial_ratio(query.lower(), value_str.lower())
if value_score >= threshold:
matches.append({
'type': 'value',
'field': key,
'value': value_str,
'score': value_score
})
# Sort by score descending
matches.sort(key=lambda x: x['score'], reverse=True)
return matches
def render_markdown(text: str) -> str:
"""Render markdown text to HTML with emoji support"""
try:
md = markdown.Markdown(extensions=['extra', 'codehilite'])
html = md.convert(text)
# Basic emoji support - convert common emoji codes
emoji_map = {
':smile:': 'π',
':heart:': 'β€οΈ',
':thumbsup:': 'π',
':thumbsdown:': 'π',
':fire:': 'π₯',
':rocket:': 'π',
':star:': 'β',
':check:': 'β
',
':x:': 'β',
':warning:': 'β οΈ',
':info:': 'βΉοΈ',
':bulb:': 'π‘',
':tada:': 'π'
}
for code, emoji in emoji_map.items():
html = html.replace(code, emoji)
return html
except Exception as e:
logging.error(f"Markdown rendering error: {str(e)}")
return text
def create_dynamic_table(table_name: str, schema: Dict[str, Any]) -> bool:
"""Create a dynamic table based on schema (for future implementation)"""
# This function can be expanded to create actual database tables
# For now, we use the generic DataRecord model with JSON storage
try:
logging.info(f"Creating dynamic table: {table_name} with schema: {schema}")
return True
except Exception as e:
logging.error(f"Error creating dynamic table: {str(e)}")
return False
def validate_schema(schema: Dict[str, Any]) -> bool:
"""Validate table schema format"""
if not isinstance(schema, dict):
return False
if 'fields' not in schema:
return False
if not isinstance(schema['fields'], list):
return False
for field in schema['fields']:
if not isinstance(field, dict):
return False
if 'name' not in field or 'type' not in field:
return False
return True
def process_pipeline_data(pipeline_config: Dict[str, Any], source_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Process data through a pipeline configuration"""
processed_data = source_data.copy()
try:
# Apply transformations based on pipeline config
transformations = pipeline_config.get('transformations', [])
for transformation in transformations:
transform_type = transformation.get('type')
if transform_type == 'filter':
condition = transformation.get('condition')
processed_data = [item for item in processed_data if eval_condition(item, condition)]
elif transform_type == 'map':
mapping = transformation.get('mapping')
for item in processed_data:
apply_mapping(item, mapping)
elif transform_type == 'sort':
field = transformation.get('field')
reverse = transformation.get('reverse', False)
processed_data.sort(key=lambda x: x.get(field, ''), reverse=reverse)
return processed_data
except Exception as e:
logging.error(f"Pipeline processing error: {str(e)}")
return source_data
def eval_condition(data: Dict[str, Any], condition: Dict[str, Any]) -> bool:
"""Evaluate a condition against data"""
try:
field = condition.get('field')
operator = condition.get('operator')
value = condition.get('value')
if not field or not operator:
return True
data_value = data.get(field)
if operator == 'equals':
return data_value == value
elif operator == 'contains':
if data_value is None or value is None:
return False
return str(value).lower() in str(data_value).lower()
elif operator == 'gt':
try:
return float(data_value or 0) > float(value or 0)
except (ValueError, TypeError):
return False
elif operator == 'lt':
try:
return float(data_value or 0) < float(value or 0)
except (ValueError, TypeError):
return False
return True
except Exception:
return True
def apply_mapping(data: Dict[str, Any], mapping: Dict[str, str]) -> None:
"""Apply field mapping to data"""
try:
for old_field, new_field in mapping.items():
if old_field in data:
data[new_field] = data.pop(old_field)
except Exception as e:
logging.error(f"Mapping error: {str(e)}")
|