SCGR's picture
dynamic json schema
76f5d42
from fastapi import APIRouter, HTTPException, Depends
from sqlalchemy.orm import Session
from typing import List, Dict, Any
from .. import crud, database, schemas
from ..services.schema_validator import schema_validator
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from ..routers.admin import verify_admin_token
router = APIRouter()
security = HTTPBearer()
def get_db():
db = database.SessionLocal()
try:
yield db
finally:
db.close()
def verify_admin_access(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verify admin token for schema endpoints"""
token = credentials.credentials
if not verify_admin_token(token):
raise HTTPException(
status_code=401,
detail="Invalid or expired admin token"
)
return token
@router.get("/schemas", response_model=List[Dict[str, Any]])
async def get_schemas(
db: Session = Depends(get_db),
token: str = Depends(verify_admin_access)
):
"""Get all JSON schemas (admin only)"""
try:
# Get schemas from database
db_schemas = crud.get_all_schemas(db)
schemas_list = []
for schema in db_schemas:
schemas_list.append({
"schema_id": schema.schema_id,
"title": schema.title,
"version": schema.version,
"created_at": schema.created_at.isoformat() if schema.created_at else None,
"schema": schema.schema
})
return schemas_list
except Exception as e:
raise HTTPException(500, f"Failed to get schemas: {str(e)}")
@router.get("/schemas/{schema_id}", response_model=Dict[str, Any])
async def get_schema(
schema_id: str,
db: Session = Depends(get_db),
token: str = Depends(verify_admin_access)
):
"""Get a specific JSON schema (admin only)"""
try:
schema = crud.get_schema(db, schema_id)
if not schema:
raise HTTPException(404, f"Schema {schema_id} not found")
return {
"schema_id": schema.schema_id,
"title": schema.title,
"version": schema.version,
"created_at": schema.created_at.isoformat() if schema.created_at else None,
"schema": schema.schema
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(500, f"Failed to get schema: {str(e)}")
@router.post("/schemas/validate")
async def validate_data_against_schema(
data: Dict[str, Any],
schema_id: str,
db: Session = Depends(get_db),
token: str = Depends(verify_admin_access)
):
"""Validate JSON data against a specific schema (admin only)"""
try:
# Get the schema from database
schema = crud.get_schema(db, schema_id)
if not schema:
raise HTTPException(404, f"Schema {schema_id} not found")
# Validate the data
is_valid, error_msg = schema_validator.validate_against_schema(
data, schema.schema, schema_id
)
return {
"is_valid": is_valid,
"error_message": error_msg,
"schema_id": schema_id,
"data": data
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(500, f"Validation failed: {str(e)}")
@router.post("/schemas/test-validation")
async def test_schema_validation(
image_type: str,
data: Dict[str, Any],
token: str = Depends(verify_admin_access)
):
"""Test data validation against image type schemas (admin only)"""
try:
# Validate data against the appropriate schema for the image type
cleaned_data, is_valid, error_msg = schema_validator.clean_and_validate_data(
data, image_type
)
return {
"is_valid": is_valid,
"error_message": error_msg,
"image_type": image_type,
"original_data": data,
"cleaned_data": cleaned_data if is_valid else None
}
except Exception as e:
raise HTTPException(500, f"Test validation failed: {str(e)}")
@router.get("/schemas/validation-stats")
async def get_validation_stats(
db: Session = Depends(get_db),
token: str = Depends(verify_admin_access)
):
"""Get validation statistics (admin only)"""
try:
# Get recent images with validation info
recent_images = crud.get_recent_images_with_validation(db, limit=100)
stats = {
"total_images": len(recent_images),
"validation_passed": 0,
"validation_failed": 0,
"validation_errors": [],
"by_image_type": {
"crisis_map": {"total": 0, "passed": 0, "failed": 0},
"drone_image": {"total": 0, "passed": 0, "failed": 0}
}
}
for img in recent_images:
if hasattr(img, 'raw_json') and img.raw_json:
image_type = img.image_type
if image_type in stats["by_image_type"]:
stats["by_image_type"][image_type]["total"] += 1
# Check if validation failed
if img.raw_json.get("validation_failed"):
stats["validation_failed"] += 1
if image_type in stats["by_image_type"]:
stats["by_image_type"][image_type]["failed"] += 1
error = img.raw_json.get("validation_error", "Unknown error")
stats["validation_errors"].append({
"image_id": str(img.image_id),
"image_type": image_type,
"error": error,
"created_at": img.created_at.isoformat() if img.created_at else None
})
else:
stats["validation_passed"] += 1
if image_type in stats["by_image_type"]:
stats["by_image_type"][image_type]["passed"] += 1
return stats
except Exception as e:
raise HTTPException(500, f"Failed to get validation stats: {str(e)}")
@router.put("/schemas/{schema_id}")
async def update_schema(
schema_id: str,
schema_data: Dict[str, Any],
db: Session = Depends(get_db),
token: str = Depends(verify_admin_access)
):
"""Update a JSON schema (admin only)"""
try:
# Get the existing schema
existing_schema = crud.get_schema(db, schema_id)
if not existing_schema:
raise HTTPException(404, f"Schema {schema_id} not found")
# Update only the schema content
existing_schema.schema = schema_data.get("schema", existing_schema.schema)
db.commit()
db.refresh(existing_schema)
# Clear schema cache to ensure fresh data on next validation
schema_validator.clear_schema_cache(schema_id)
return {
"schema_id": existing_schema.schema_id,
"title": existing_schema.title,
"version": existing_schema.version,
"created_at": existing_schema.created_at.isoformat() if existing_schema.created_at else None,
"schema": existing_schema.schema
}
except HTTPException:
raise
except Exception as e:
db.rollback()
raise HTTPException(500, f"Failed to update schema: {str(e)}")