from fastapi import APIRouter, HTTPException, Depends from sqlalchemy.orm import Session from typing import List, Dict, Any from .. import crud, database, schemas from ..services.schema_validator import schema_validator from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from ..routers.admin import verify_admin_token router = APIRouter() security = HTTPBearer() def get_db(): db = database.SessionLocal() try: yield db finally: db.close() def verify_admin_access(credentials: HTTPAuthorizationCredentials = Depends(security)): """Verify admin token for schema endpoints""" token = credentials.credentials if not verify_admin_token(token): raise HTTPException( status_code=401, detail="Invalid or expired admin token" ) return token @router.get("/schemas", response_model=List[Dict[str, Any]]) async def get_schemas( db: Session = Depends(get_db), token: str = Depends(verify_admin_access) ): """Get all JSON schemas (admin only)""" try: # Get schemas from database db_schemas = crud.get_all_schemas(db) schemas_list = [] for schema in db_schemas: schemas_list.append({ "schema_id": schema.schema_id, "title": schema.title, "version": schema.version, "created_at": schema.created_at.isoformat() if schema.created_at else None, "schema": schema.schema }) return schemas_list except Exception as e: raise HTTPException(500, f"Failed to get schemas: {str(e)}") @router.get("/schemas/{schema_id}", response_model=Dict[str, Any]) async def get_schema( schema_id: str, db: Session = Depends(get_db), token: str = Depends(verify_admin_access) ): """Get a specific JSON schema (admin only)""" try: schema = crud.get_schema(db, schema_id) if not schema: raise HTTPException(404, f"Schema {schema_id} not found") return { "schema_id": schema.schema_id, "title": schema.title, "version": schema.version, "created_at": schema.created_at.isoformat() if schema.created_at else None, "schema": schema.schema } except HTTPException: raise except Exception as e: raise HTTPException(500, f"Failed to get schema: {str(e)}") @router.post("/schemas/validate") async def validate_data_against_schema( data: Dict[str, Any], schema_id: str, db: Session = Depends(get_db), token: str = Depends(verify_admin_access) ): """Validate JSON data against a specific schema (admin only)""" try: # Get the schema from database schema = crud.get_schema(db, schema_id) if not schema: raise HTTPException(404, f"Schema {schema_id} not found") # Validate the data is_valid, error_msg = schema_validator.validate_against_schema( data, schema.schema, schema_id ) return { "is_valid": is_valid, "error_message": error_msg, "schema_id": schema_id, "data": data } except HTTPException: raise except Exception as e: raise HTTPException(500, f"Validation failed: {str(e)}") @router.post("/schemas/test-validation") async def test_schema_validation( image_type: str, data: Dict[str, Any], token: str = Depends(verify_admin_access) ): """Test data validation against image type schemas (admin only)""" try: # Validate data against the appropriate schema for the image type cleaned_data, is_valid, error_msg = schema_validator.clean_and_validate_data( data, image_type ) return { "is_valid": is_valid, "error_message": error_msg, "image_type": image_type, "original_data": data, "cleaned_data": cleaned_data if is_valid else None } except Exception as e: raise HTTPException(500, f"Test validation failed: {str(e)}") @router.get("/schemas/validation-stats") async def get_validation_stats( db: Session = Depends(get_db), token: str = Depends(verify_admin_access) ): """Get validation statistics (admin only)""" try: # Get recent images with validation info recent_images = crud.get_recent_images_with_validation(db, limit=100) stats = { "total_images": len(recent_images), "validation_passed": 0, "validation_failed": 0, "validation_errors": [], "by_image_type": { "crisis_map": {"total": 0, "passed": 0, "failed": 0}, "drone_image": {"total": 0, "passed": 0, "failed": 0} } } for img in recent_images: if hasattr(img, 'raw_json') and img.raw_json: image_type = img.image_type if image_type in stats["by_image_type"]: stats["by_image_type"][image_type]["total"] += 1 # Check if validation failed if img.raw_json.get("validation_failed"): stats["validation_failed"] += 1 if image_type in stats["by_image_type"]: stats["by_image_type"][image_type]["failed"] += 1 error = img.raw_json.get("validation_error", "Unknown error") stats["validation_errors"].append({ "image_id": str(img.image_id), "image_type": image_type, "error": error, "created_at": img.created_at.isoformat() if img.created_at else None }) else: stats["validation_passed"] += 1 if image_type in stats["by_image_type"]: stats["by_image_type"][image_type]["passed"] += 1 return stats except Exception as e: raise HTTPException(500, f"Failed to get validation stats: {str(e)}")