import gradio as gr import cv2 import torch import os import datetime import subprocess from transformers import AutoProcessor, AutoModelForCausalLM from PIL import Image from huggingface_hub import HfApi, hf_hub_download from fastapi import FastAPI, File, UploadFile, Form from fastapi.responses import JSONResponse import uvicorn import shutil import pandas as pd app = FastAPI() # Muat model GIT print("Memuat model GIT...") processor = AutoProcessor.from_pretrained("microsoft/git-base-coco") model = AutoModelForCausalLM.from_pretrained("microsoft/git-base-coco") device = "cuda" if torch.cuda.is_available() else "cpu" model.to(device) print("Model GIT berhasil dimuat!") # Konfigurasi Hugging Face Hub DATASET_REPO_ID = "monikahung/videos_throw_garbage" HF_TOKEN = os.getenv("HF_TOKEN_VIDEOS") api = HfApi(token=HF_TOKEN) # Direktori sementara untuk menyimpan file yang diunggah TEMP_DIR = "/tmp/uploads" os.makedirs(TEMP_DIR, exist_ok=True) def update_metadata_csv(file_name, mission_type, status): """ Update metadata.csv di dataset HF dengan hasil validasi. """ local_metadata_path = os.path.join(TEMP_DIR, "metadata.csv") try: # Download metadata.csv yang sudah ada (jika ada) hf_hub_download( repo_id=DATASET_REPO_ID, repo_type="dataset", filename="metadata.csv", local_dir=TEMP_DIR, token=HF_TOKEN, ) df = pd.read_csv(local_metadata_path) except Exception: # Jika belum ada metadata.csv, buat DataFrame baru df = pd.DataFrame(columns=["timestamp", "file_name", "mission_type", "status"]) # Tambahkan entri baru timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") new_entry = { "timestamp": timestamp, "file_name": file_name, "mission_type": mission_type, "status": status, } df = pd.concat([df, pd.DataFrame([new_entry])], ignore_index=True) # Simpan kembali ke lokal df.to_csv(local_metadata_path, index=False) # Upload metadata.csv yang sudah diperbarui ke dataset api.upload_file( path_or_fileobj=local_metadata_path, path_in_repo="metadata.csv", repo_id=DATASET_REPO_ID, repo_type="dataset", ) print(f"Metadata diperbarui: {new_entry}") def upload_video_to_dataset(video_path, folder_name, mission_type=None, status=None): """ Mengunggah file video ke Hugging Face Dataset di folder yang ditentukan lalu update metadata.csv. """ if not HF_TOKEN: print("Peringatan: Token Hugging Face tidak ditemukan. Tidak dapat mengunggah ke dataset.") return try: timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") file_name = f"{timestamp}_{os.path.basename(video_path)}" path_in_repo = f"{folder_name}/{file_name}" api.upload_file( path_or_fileobj=video_path, path_in_repo=path_in_repo, repo_id=DATASET_REPO_ID, repo_type="dataset", ) print(f"File {file_name} berhasil diunggah ke folder '{folder_name}'.") # Update metadata.csv update_metadata_csv(file_name, mission_type or folder_name, status or "uploaded") except Exception as e: print(f"Gagal mengunggah file ke dataset: {e}") def process_and_slow_video(video_path, slow_factor=2): """ Memperlambat video menggunakan FFmpeg. """ slowed_video_path = f"{os.path.splitext(video_path)[0]}_slowed.mp4" command = [ 'ffmpeg', '-i', video_path, '-filter:v', f'setpts={slow_factor}*PTS', '-c:a', 'aac', '-y', slowed_video_path ] try: subprocess.run(command, check=True, capture_output=True, text=True) print(f"Video berhasil diperlambat. Disimpan di: {slowed_video_path}") return slowed_video_path except subprocess.CalledProcessError as e: print(f"FFmpeg gagal. Error: {e.stderr}") return None except FileNotFoundError: print("FFmpeg tidak ditemukan. Pastikan sudah terinstal dan ada di PATH.") return None def validate_video_with_git(video_path, mission_type): """ Validasi video menggunakan model GIT dan mengunggahnya ke dataset. """ slowed_video_path = process_and_slow_video(video_path) if not slowed_video_path: upload_video_to_dataset(video_path, "neither", mission_type, "gagal") return "Gagal. Gagal memproses video." if mission_type == 'paper': required_keywords = ['person', 'paper', 'yellow trash can'] elif mission_type == 'leaf': required_keywords = ['person', 'leaves', 'green trash can'] elif mission_type == 'plastic_bottle': required_keywords = ['person', 'plastic bottle', 'yellow trash can'] elif mission_type == 'drink_cans': required_keywords = ['person', 'drink cans', 'yellow trash can'] else: upload_video_to_dataset(video_path, "neither", mission_type, "gagal") return "Gagal. Tipe misi tidak valid." cap = cv2.VideoCapture(slowed_video_path) validation_status = "gagal" valid_frames_count = 0 frame_count = 0 frame_interval = 2 try: while cap.isOpened(): ret, frame_bgr = cap.read() if not ret: break if frame_count % frame_interval == 0: frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) pil_image = Image.fromarray(frame_rgb) pixel_values = processor(images=pil_image, return_tensors="pt").pixel_values.to(device) generated_ids = model.generate(pixel_values=pixel_values, max_length=50) caption = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip().lower() print(f"Processing frame {frame_count}: '{caption}'") found_keywords = [kw for kw in required_keywords if kw in caption] if len(found_keywords) >= 2: valid_frames_count += 1 else: valid_frames_count = 0 if valid_frames_count >= 3: validation_status = "valid" break frame_count += 1 finally: # Pastikan semua resource ditutup cap.release() if os.path.exists(slowed_video_path): os.remove(slowed_video_path) # Upload video dan update metadata if validation_status == "valid": upload_video_to_dataset(video_path, mission_type, mission_type, "valid") else: upload_video_to_dataset(video_path, "neither", mission_type, "gagal") return validation_status # Endpoint untuk validasi video @app.post("/validate") async def validate_video(file: UploadFile = File(...), mission_type: str = Form(...)): temp_file_path = os.path.join(TEMP_DIR, file.filename) try: with open(temp_file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) result = validate_video_with_git(temp_file_path, mission_type) return JSONResponse({"status": result}) finally: # Hapus file sementara dengan aman if os.path.exists(temp_file_path): os.remove(temp_file_path) @app.get("/") async def root(): return {"status": "Video validation API is running."} if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)