from PIL import Image import io from io import BytesIO from .detectors.fft import run_fft from .detectors.metadata import run_metadata from .detectors.ela import run_ela from .preprocess import preprocess_image from fastapi import HTTPException,status,Depends from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials security=HTTPBearer() import os async def process_image_ela(image_bytes: bytes, quality: int=90): image = Image.open(io.BytesIO(image_bytes)) if image.mode != "RGB": image = image.convert("RGB") compressed_image = preprocess_image(image, quality) ela_result = run_ela(compressed_image, quality) return { "is_edited": ela_result, "ela_score": ela_result } async def process_fft_image(image_bytes: bytes,threshold:float=0.95) -> dict: image = Image.open(BytesIO(image_bytes)).convert("RGB") result = run_fft(image,threshold) return {"edited": bool(result)} async def process_meta_image(image_bytes: bytes) -> dict: try: result = run_metadata(image_bytes) return {"source": result} # e.g. "edited", "phone_capture", "unknown" except Exception as e: # Handle errors gracefully, return useful message or raise HTTPException if preferred return {"error": str(e)} async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): token = credentials.credentials expected_token = os.getenv("MY_SECRET_TOKEN") if token != expected_token: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Invalid or expired token" ) return token