Spaces:
Running
Running
import json | |
from typing import Dict, Any, Optional, Tuple | |
from jsonschema import validate, ValidationError | |
from jsonschema.validators import Draft7Validator | |
import logging | |
logger = logging.getLogger(__name__) | |
class SchemaValidator: | |
"""Service for validating JSON data against stored schemas""" | |
def __init__(self): | |
self.validators = {} | |
def validate_against_schema(self, data: Dict[str, Any], schema: Dict[str, Any], schema_id: str) -> Tuple[bool, Optional[str]]: | |
""" | |
Validate JSON data against a schema | |
Args: | |
data: The JSON data to validate | |
schema: The JSON schema to validate against | |
schema_id: Identifier for the schema (for logging) | |
Returns: | |
Tuple of (is_valid, error_message) | |
""" | |
try: | |
# Use Draft7Validator for better error messages | |
validator = Draft7Validator(schema) | |
errors = list(validator.iter_errors(data)) | |
if errors: | |
error_messages = [] | |
for error in errors: | |
path = " -> ".join(str(p) for p in error.path) if error.path else "root" | |
error_messages.append(f"{path}: {error.message}") | |
error_msg = f"Schema validation failed for {schema_id}: {'; '.join(error_messages)}" | |
logger.warning(error_msg) | |
return False, error_msg | |
logger.info(f"Schema validation passed for {schema_id}") | |
return True, None | |
except Exception as e: | |
error_msg = f"Schema validation error for {schema_id}: {str(e)}" | |
logger.error(error_msg) | |
return False, error_msg | |
def validate_crisis_map_data(self, data: Dict[str, Any]) -> Tuple[bool, Optional[str]]: | |
""" | |
Validate crisis map data against the default schema | |
""" | |
# Define the expected crisis map schema | |
crisis_schema = { | |
"type": "object", | |
"properties": { | |
"description": {"type": "string"}, | |
"analysis": {"type": "string"}, | |
"recommended_actions": {"type": "string"}, | |
"metadata": { | |
"type": "object", | |
"properties": { | |
"title": {"type": "string"}, | |
"source": {"type": "string"}, | |
"type": {"type": "string"}, | |
"countries": {"type": "array", "items": {"type": "string"}}, | |
"epsg": {"type": "string"} | |
}, | |
"required": ["title", "source", "type", "countries", "epsg"] | |
} | |
}, | |
"required": ["description", "analysis", "recommended_actions", "metadata"] | |
} | |
return self.validate_against_schema(data, crisis_schema, "crisis_map") | |
def validate_drone_data(self, data: Dict[str, Any]) -> Tuple[bool, Optional[str]]: | |
""" | |
Validate drone data against the drone schema | |
""" | |
# Define the expected drone schema | |
drone_schema = { | |
"type": "object", | |
"properties": { | |
"description": {"type": "string"}, | |
"analysis": {"type": "string"}, | |
"recommended_actions": {"type": "string"}, | |
"metadata": { | |
"type": "object", | |
"properties": { | |
"title": {"type": ["string", "null"]}, | |
"source": {"type": ["string", "null"]}, | |
"type": {"type": ["string", "null"]}, | |
"countries": {"type": ["array", "null"], "items": {"type": "string"}}, | |
"epsg": {"type": ["string", "null"]}, | |
"center_lat": {"type": ["number", "null"], "minimum": -90, "maximum": 90}, | |
"center_lon": {"type": ["number", "null"], "minimum": -180, "maximum": 180}, | |
"amsl_m": {"type": ["number", "null"]}, | |
"agl_m": {"type": ["number", "null"]}, | |
"heading_deg": {"type": ["number", "null"], "minimum": 0, "maximum": 360}, | |
"yaw_deg": {"type": ["number", "null"], "minimum": -180, "maximum": 180}, | |
"pitch_deg": {"type": ["number", "null"], "minimum": -90, "maximum": 90}, | |
"roll_deg": {"type": ["number", "null"], "minimum": -180, "maximum": 180}, | |
"rtk_fix": {"type": ["boolean", "null"]}, | |
"std_h_m": {"type": ["number", "null"], "minimum": 0}, | |
"std_v_m": {"type": ["number", "null"], "minimum": 0} | |
} | |
} | |
}, | |
"required": ["description", "analysis", "recommended_actions", "metadata"] | |
} | |
return self.validate_against_schema(data, drone_schema, "drone") | |
def validate_data_by_type(self, data: Dict[str, Any], image_type: str) -> Tuple[bool, Optional[str]]: | |
""" | |
Validate data based on image type | |
Args: | |
data: The JSON data to validate | |
image_type: Either 'crisis_map' or 'drone_image' | |
Returns: | |
Tuple of (is_valid, error_message) | |
""" | |
if image_type == 'drone_image': | |
return self.validate_drone_data(data) | |
elif image_type == 'crisis_map': | |
return self.validate_crisis_map_data(data) | |
else: | |
return False, f"Unknown image type: {image_type}" | |
def clean_and_validate_data(self, raw_data: Dict[str, Any], image_type: str) -> Tuple[Dict[str, Any], bool, Optional[str]]: | |
""" | |
Clean and validate data, returning cleaned data, validation status, and any errors | |
Args: | |
raw_data: Raw data from VLM | |
image_type: Type of image being processed | |
Returns: | |
Tuple of (cleaned_data, is_valid, error_message) | |
""" | |
try: | |
if "raw_response" in raw_data: | |
ai_data = raw_data["raw_response"] | |
if "response" in ai_data: | |
content = ai_data["response"] | |
if isinstance(content, str): | |
try: | |
data = json.loads(content) | |
except json.JSONDecodeError: | |
data = {"description": "", "analysis": content, "recommended_actions": "", "metadata": {}} | |
else: | |
data = content | |
elif "description" in ai_data and "analysis" in ai_data and "recommended_actions" in ai_data and "metadata" in ai_data: | |
data = ai_data | |
elif "analysis" in ai_data and "metadata" in ai_data: | |
# Backward compatibility for old format | |
data = { | |
"description": "", | |
"analysis": ai_data["analysis"], | |
"recommended_actions": "", | |
"metadata": ai_data["metadata"] | |
} | |
else: | |
data = ai_data | |
elif "content" in raw_data: | |
content = raw_data["content"] | |
if isinstance(content, str): | |
try: | |
parsed_content = json.loads(content) | |
data = parsed_content | |
except json.JSONDecodeError: | |
data = {"description": "", "analysis": content, "recommended_actions": "", "metadata": {}} | |
else: | |
data = content | |
else: | |
data = raw_data | |
is_valid, error_msg = self.validate_data_by_type(data, image_type) | |
if is_valid: | |
cleaned_data = self._clean_data(data, image_type) | |
return cleaned_data, True, None | |
else: | |
return data, False, error_msg | |
except Exception as e: | |
error_msg = f"Data processing error: {str(e)}" | |
logger.error(error_msg) | |
return raw_data, False, error_msg | |
def _clean_data(self, data: Dict[str, Any], image_type: str) -> Dict[str, Any]: | |
""" | |
Clean and normalize the data structure | |
""" | |
cleaned = { | |
"description": data.get("description", ""), | |
"analysis": data.get("analysis", ""), | |
"recommended_actions": data.get("recommended_actions", ""), | |
"metadata": {} | |
} | |
metadata = data.get("metadata", {}) | |
# Clean metadata based on image type | |
if image_type == 'crisis_map': | |
cleaned["metadata"] = { | |
"title": metadata.get("title", ""), | |
"source": metadata.get("source", "OTHER"), | |
"type": metadata.get("type", "OTHER"), | |
"countries": metadata.get("countries", []), | |
"epsg": metadata.get("epsg", "OTHER") | |
} | |
elif image_type == 'drone_image': | |
cleaned["metadata"] = { | |
"title": metadata.get("title"), | |
"source": metadata.get("source"), | |
"type": metadata.get("type"), | |
"countries": metadata.get("countries"), | |
"epsg": metadata.get("epsg"), | |
"center_lat": metadata.get("center_lat"), | |
"center_lon": metadata.get("center_lon"), | |
"amsl_m": metadata.get("amsl_m"), | |
"agl_m": metadata.get("agl_m"), | |
"heading_deg": metadata.get("heading_deg"), | |
"yaw_deg": metadata.get("yaw_deg"), | |
"pitch_deg": metadata.get("pitch_deg"), | |
"roll_deg": metadata.get("roll_deg"), | |
"rtk_fix": metadata.get("rtk_fix"), | |
"std_h_m": metadata.get("std_h_m"), | |
"std_v_m": metadata.get("std_v_m") | |
} | |
return cleaned | |
# Global instance | |
schema_validator = SchemaValidator() | |