File size: 1,654 Bytes
0b8f50d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from PIL import Image
import io
from io import BytesIO
from .detectors.fft import run_fft
from .detectors.metadata import run_metadata
from .detectors.ela import run_ela
from .preprocess import preprocess_image
from fastapi import HTTPException,status,Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security=HTTPBearer()
import os
async def process_image_ela(image_bytes: bytes, quality: int=90):
    image = Image.open(io.BytesIO(image_bytes))

    if image.mode != "RGB":
        image = image.convert("RGB")

    compressed_image = preprocess_image(image, quality)
    ela_result = run_ela(compressed_image, quality)

    return {
        "is_edited": ela_result,
        "ela_score": ela_result
    }

async def process_fft_image(image_bytes: bytes,threshold:float=0.95) -> dict:
    image = Image.open(BytesIO(image_bytes)).convert("RGB")
    result = run_fft(image,threshold)
    return {"edited": bool(result)}


async def process_meta_image(image_bytes: bytes) -> dict:
    try:
        result = run_metadata(image_bytes)
        return {"source": result}  # e.g. "edited", "phone_capture", "unknown"
    except Exception as e:
        # Handle errors gracefully, return useful message or raise HTTPException if preferred
        return {"error": str(e)}


async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
    token = credentials.credentials
    expected_token = os.getenv("MY_SECRET_TOKEN")
    if token != expected_token:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Invalid or expired token"
        )
    return token