Spaces:
Running
Running
File size: 7,522 Bytes
ba5edb0 76f5d42 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
from fastapi import APIRouter, HTTPException, Depends
from sqlalchemy.orm import Session
from typing import List, Dict, Any
from .. import crud, database, schemas
from ..services.schema_validator import schema_validator
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from ..routers.admin import verify_admin_token
router = APIRouter()
security = HTTPBearer()
def get_db():
db = database.SessionLocal()
try:
yield db
finally:
db.close()
def verify_admin_access(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verify admin token for schema endpoints"""
token = credentials.credentials
if not verify_admin_token(token):
raise HTTPException(
status_code=401,
detail="Invalid or expired admin token"
)
return token
@router.get("/schemas", response_model=List[Dict[str, Any]])
async def get_schemas(
db: Session = Depends(get_db),
token: str = Depends(verify_admin_access)
):
"""Get all JSON schemas (admin only)"""
try:
# Get schemas from database
db_schemas = crud.get_all_schemas(db)
schemas_list = []
for schema in db_schemas:
schemas_list.append({
"schema_id": schema.schema_id,
"title": schema.title,
"version": schema.version,
"created_at": schema.created_at.isoformat() if schema.created_at else None,
"schema": schema.schema
})
return schemas_list
except Exception as e:
raise HTTPException(500, f"Failed to get schemas: {str(e)}")
@router.get("/schemas/{schema_id}", response_model=Dict[str, Any])
async def get_schema(
schema_id: str,
db: Session = Depends(get_db),
token: str = Depends(verify_admin_access)
):
"""Get a specific JSON schema (admin only)"""
try:
schema = crud.get_schema(db, schema_id)
if not schema:
raise HTTPException(404, f"Schema {schema_id} not found")
return {
"schema_id": schema.schema_id,
"title": schema.title,
"version": schema.version,
"created_at": schema.created_at.isoformat() if schema.created_at else None,
"schema": schema.schema
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(500, f"Failed to get schema: {str(e)}")
@router.post("/schemas/validate")
async def validate_data_against_schema(
data: Dict[str, Any],
schema_id: str,
db: Session = Depends(get_db),
token: str = Depends(verify_admin_access)
):
"""Validate JSON data against a specific schema (admin only)"""
try:
# Get the schema from database
schema = crud.get_schema(db, schema_id)
if not schema:
raise HTTPException(404, f"Schema {schema_id} not found")
# Validate the data
is_valid, error_msg = schema_validator.validate_against_schema(
data, schema.schema, schema_id
)
return {
"is_valid": is_valid,
"error_message": error_msg,
"schema_id": schema_id,
"data": data
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(500, f"Validation failed: {str(e)}")
@router.post("/schemas/test-validation")
async def test_schema_validation(
image_type: str,
data: Dict[str, Any],
token: str = Depends(verify_admin_access)
):
"""Test data validation against image type schemas (admin only)"""
try:
# Validate data against the appropriate schema for the image type
cleaned_data, is_valid, error_msg = schema_validator.clean_and_validate_data(
data, image_type
)
return {
"is_valid": is_valid,
"error_message": error_msg,
"image_type": image_type,
"original_data": data,
"cleaned_data": cleaned_data if is_valid else None
}
except Exception as e:
raise HTTPException(500, f"Test validation failed: {str(e)}")
@router.get("/schemas/validation-stats")
async def get_validation_stats(
db: Session = Depends(get_db),
token: str = Depends(verify_admin_access)
):
"""Get validation statistics (admin only)"""
try:
# Get recent images with validation info
recent_images = crud.get_recent_images_with_validation(db, limit=100)
stats = {
"total_images": len(recent_images),
"validation_passed": 0,
"validation_failed": 0,
"validation_errors": [],
"by_image_type": {
"crisis_map": {"total": 0, "passed": 0, "failed": 0},
"drone_image": {"total": 0, "passed": 0, "failed": 0}
}
}
for img in recent_images:
if hasattr(img, 'raw_json') and img.raw_json:
image_type = img.image_type
if image_type in stats["by_image_type"]:
stats["by_image_type"][image_type]["total"] += 1
# Check if validation failed
if img.raw_json.get("validation_failed"):
stats["validation_failed"] += 1
if image_type in stats["by_image_type"]:
stats["by_image_type"][image_type]["failed"] += 1
error = img.raw_json.get("validation_error", "Unknown error")
stats["validation_errors"].append({
"image_id": str(img.image_id),
"image_type": image_type,
"error": error,
"created_at": img.created_at.isoformat() if img.created_at else None
})
else:
stats["validation_passed"] += 1
if image_type in stats["by_image_type"]:
stats["by_image_type"][image_type]["passed"] += 1
return stats
except Exception as e:
raise HTTPException(500, f"Failed to get validation stats: {str(e)}")
@router.put("/schemas/{schema_id}")
async def update_schema(
schema_id: str,
schema_data: Dict[str, Any],
db: Session = Depends(get_db),
token: str = Depends(verify_admin_access)
):
"""Update a JSON schema (admin only)"""
try:
# Get the existing schema
existing_schema = crud.get_schema(db, schema_id)
if not existing_schema:
raise HTTPException(404, f"Schema {schema_id} not found")
# Update only the schema content
existing_schema.schema = schema_data.get("schema", existing_schema.schema)
db.commit()
db.refresh(existing_schema)
# Clear schema cache to ensure fresh data on next validation
schema_validator.clear_schema_cache(schema_id)
return {
"schema_id": existing_schema.schema_id,
"title": existing_schema.title,
"version": existing_schema.version,
"created_at": existing_schema.created_at.isoformat() if existing_schema.created_at else None,
"schema": existing_schema.schema
}
except HTTPException:
raise
except Exception as e:
db.rollback()
raise HTTPException(500, f"Failed to update schema: {str(e)}") |