Spaces:
Running
Running
from PIL import Image | |
import io | |
from io import BytesIO | |
from .detectors.fft import run_fft | |
from .detectors.metadata import run_metadata | |
from .detectors.ela import run_ela | |
from .preprocess import preprocess_image | |
from fastapi import HTTPException,status,Depends | |
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
security=HTTPBearer() | |
import os | |
async def process_image_ela(image_bytes: bytes, quality: int=90): | |
image = Image.open(io.BytesIO(image_bytes)) | |
if image.mode != "RGB": | |
image = image.convert("RGB") | |
compressed_image = preprocess_image(image, quality) | |
ela_result = run_ela(compressed_image, quality) | |
return { | |
"is_edited": ela_result, | |
"ela_score": ela_result | |
} | |
async def process_fft_image(image_bytes: bytes,threshold:float=0.95) -> dict: | |
image = Image.open(BytesIO(image_bytes)).convert("RGB") | |
result = run_fft(image,threshold) | |
return {"edited": bool(result)} | |
async def process_meta_image(image_bytes: bytes) -> dict: | |
try: | |
result = run_metadata(image_bytes) | |
return {"source": result} # e.g. "edited", "phone_capture", "unknown" | |
except Exception as e: | |
# Handle errors gracefully, return useful message or raise HTTPException if preferred | |
return {"error": str(e)} | |
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): | |
token = credentials.credentials | |
expected_token = os.getenv("MY_SECRET_TOKEN") | |
if token != expected_token: | |
raise HTTPException( | |
status_code=status.HTTP_403_FORBIDDEN, | |
detail="Invalid or expired token" | |
) | |
return token | |