|
import gradio as gr |
|
import cv2 |
|
import torch |
|
import os |
|
import datetime |
|
import subprocess |
|
from transformers import AutoProcessor, AutoModelForCausalLM |
|
from PIL import Image |
|
from huggingface_hub import HfApi, hf_hub_download |
|
from fastapi import FastAPI, File, UploadFile, Form |
|
from fastapi.responses import JSONResponse |
|
import uvicorn |
|
import shutil |
|
import pandas as pd |
|
|
|
app = FastAPI() |
|
|
|
|
|
print("Memuat model GIT...") |
|
processor = AutoProcessor.from_pretrained("microsoft/git-base-coco") |
|
model = AutoModelForCausalLM.from_pretrained("microsoft/git-base-coco") |
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
model.to(device) |
|
print("Model GIT berhasil dimuat!") |
|
|
|
|
|
DATASET_REPO_ID = "monikahung/videos_throw_garbage" |
|
HF_TOKEN = os.getenv("HF_TOKEN_VIDEOS") |
|
api = HfApi(token=HF_TOKEN) |
|
|
|
|
|
TEMP_DIR = "/tmp/uploads" |
|
os.makedirs(TEMP_DIR, exist_ok=True) |
|
|
|
|
|
def update_metadata_csv(file_name, mission_type, status): |
|
""" |
|
Update metadata.csv di dataset HF dengan hasil validasi. |
|
""" |
|
local_metadata_path = os.path.join(TEMP_DIR, "metadata.csv") |
|
try: |
|
|
|
hf_hub_download( |
|
repo_id=DATASET_REPO_ID, |
|
repo_type="dataset", |
|
filename="metadata.csv", |
|
local_dir=TEMP_DIR, |
|
token=HF_TOKEN, |
|
) |
|
df = pd.read_csv(local_metadata_path) |
|
except Exception: |
|
|
|
df = pd.DataFrame(columns=["timestamp", "file_name", "mission_type", "status"]) |
|
|
|
|
|
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
new_entry = { |
|
"timestamp": timestamp, |
|
"file_name": file_name, |
|
"mission_type": mission_type, |
|
"status": status, |
|
} |
|
df = pd.concat([df, pd.DataFrame([new_entry])], ignore_index=True) |
|
|
|
|
|
df.to_csv(local_metadata_path, index=False) |
|
|
|
|
|
api.upload_file( |
|
path_or_fileobj=local_metadata_path, |
|
path_in_repo="metadata.csv", |
|
repo_id=DATASET_REPO_ID, |
|
repo_type="dataset", |
|
) |
|
print(f"Metadata diperbarui: {new_entry}") |
|
|
|
|
|
def upload_video_to_dataset(video_path, folder_name, mission_type=None, status=None): |
|
""" |
|
Mengunggah file video ke Hugging Face Dataset di folder yang ditentukan |
|
lalu update metadata.csv. |
|
""" |
|
if not HF_TOKEN: |
|
print("Peringatan: Token Hugging Face tidak ditemukan. Tidak dapat mengunggah ke dataset.") |
|
return |
|
|
|
try: |
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
|
file_name = f"{timestamp}_{os.path.basename(video_path)}" |
|
path_in_repo = f"{folder_name}/{file_name}" |
|
|
|
api.upload_file( |
|
path_or_fileobj=video_path, |
|
path_in_repo=path_in_repo, |
|
repo_id=DATASET_REPO_ID, |
|
repo_type="dataset", |
|
) |
|
print(f"File {file_name} berhasil diunggah ke folder '{folder_name}'.") |
|
|
|
|
|
update_metadata_csv(file_name, mission_type or folder_name, status or "uploaded") |
|
except Exception as e: |
|
print(f"Gagal mengunggah file ke dataset: {e}") |
|
|
|
|
|
def process_and_slow_video(video_path, slow_factor=2): |
|
""" |
|
Memperlambat video menggunakan FFmpeg. |
|
""" |
|
slowed_video_path = f"{os.path.splitext(video_path)[0]}_slowed.mp4" |
|
|
|
command = [ |
|
'ffmpeg', |
|
'-i', video_path, |
|
'-filter:v', f'setpts={slow_factor}*PTS', |
|
'-c:a', 'aac', |
|
'-y', slowed_video_path |
|
] |
|
|
|
try: |
|
subprocess.run(command, check=True, capture_output=True, text=True) |
|
print(f"Video berhasil diperlambat. Disimpan di: {slowed_video_path}") |
|
return slowed_video_path |
|
except subprocess.CalledProcessError as e: |
|
print(f"FFmpeg gagal. Error: {e.stderr}") |
|
return None |
|
except FileNotFoundError: |
|
print("FFmpeg tidak ditemukan. Pastikan sudah terinstal dan ada di PATH.") |
|
return None |
|
|
|
|
|
def validate_video_with_git(video_path, mission_type): |
|
""" |
|
Validasi video menggunakan model GIT dan mengunggahnya ke dataset. |
|
""" |
|
slowed_video_path = process_and_slow_video(video_path) |
|
if not slowed_video_path: |
|
upload_video_to_dataset(video_path, "neither", mission_type, "gagal") |
|
return "Gagal. Gagal memproses video." |
|
|
|
if mission_type == 'paper': |
|
required_keywords = ['person', 'paper', 'yellow trash can'] |
|
elif mission_type == 'leaf': |
|
required_keywords = ['person', 'leaves', 'green trash can'] |
|
elif mission_type == 'plastic_bottle': |
|
required_keywords = ['person', 'plastic bottle', 'yellow trash can'] |
|
elif mission_type == 'drink_cans': |
|
required_keywords = ['person', 'drink cans', 'yellow trash can'] |
|
else: |
|
upload_video_to_dataset(video_path, "neither", mission_type, "gagal") |
|
return "Gagal. Tipe misi tidak valid." |
|
|
|
cap = cv2.VideoCapture(slowed_video_path) |
|
validation_status = "gagal" |
|
valid_frames_count = 0 |
|
frame_count = 0 |
|
frame_interval = 2 |
|
|
|
try: |
|
while cap.isOpened(): |
|
ret, frame_bgr = cap.read() |
|
if not ret: |
|
break |
|
|
|
if frame_count % frame_interval == 0: |
|
frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) |
|
pil_image = Image.fromarray(frame_rgb) |
|
|
|
pixel_values = processor(images=pil_image, return_tensors="pt").pixel_values.to(device) |
|
generated_ids = model.generate(pixel_values=pixel_values, max_length=50) |
|
caption = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip().lower() |
|
|
|
print(f"Processing frame {frame_count}: '{caption}'") |
|
|
|
found_keywords = [kw for kw in required_keywords if kw in caption] |
|
if len(found_keywords) >= 2: |
|
valid_frames_count += 1 |
|
else: |
|
valid_frames_count = 0 |
|
|
|
if valid_frames_count >= 3: |
|
validation_status = "valid" |
|
break |
|
|
|
frame_count += 1 |
|
finally: |
|
|
|
cap.release() |
|
if os.path.exists(slowed_video_path): |
|
os.remove(slowed_video_path) |
|
|
|
|
|
if validation_status == "valid": |
|
upload_video_to_dataset(video_path, mission_type, mission_type, "valid") |
|
else: |
|
upload_video_to_dataset(video_path, "neither", mission_type, "gagal") |
|
|
|
return validation_status |
|
|
|
|
|
|
|
@app.post("/validate") |
|
async def validate_video(file: UploadFile = File(...), mission_type: str = Form(...)): |
|
temp_file_path = os.path.join(TEMP_DIR, file.filename) |
|
try: |
|
with open(temp_file_path, "wb") as buffer: |
|
shutil.copyfileobj(file.file, buffer) |
|
|
|
result = validate_video_with_git(temp_file_path, mission_type) |
|
return JSONResponse({"status": result}) |
|
finally: |
|
|
|
if os.path.exists(temp_file_path): |
|
os.remove(temp_file_path) |
|
|
|
@app.get("/") |
|
async def root(): |
|
return {"status": "Video validation API is running."} |
|
|
|
|
|
if __name__ == "__main__": |
|
uvicorn.run(app, host="0.0.0.0", port=7860) |