monikahung's picture
Update app.py for two trash detection
618dc27
import gradio as gr
import cv2
import torch
import os
import datetime
import subprocess
from transformers import AutoProcessor, AutoModelForCausalLM
from PIL import Image
from huggingface_hub import HfApi, hf_hub_download
from fastapi import FastAPI, File, UploadFile, Form
from fastapi.responses import JSONResponse
import uvicorn
import shutil
import pandas as pd
app = FastAPI()
# Muat model GIT
print("Memuat model GIT...")
processor = AutoProcessor.from_pretrained("microsoft/git-base-coco")
model = AutoModelForCausalLM.from_pretrained("microsoft/git-base-coco")
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)
print("Model GIT berhasil dimuat!")
# Konfigurasi Hugging Face Hub
DATASET_REPO_ID = "monikahung/videos_throw_garbage"
HF_TOKEN = os.getenv("HF_TOKEN_VIDEOS")
api = HfApi(token=HF_TOKEN)
# Direktori sementara untuk menyimpan file yang diunggah
TEMP_DIR = "/tmp/uploads"
os.makedirs(TEMP_DIR, exist_ok=True)
def update_metadata_csv(file_name, mission_type, status):
"""
Update metadata.csv di dataset HF dengan hasil validasi.
"""
local_metadata_path = os.path.join(TEMP_DIR, "metadata.csv")
try:
# Download metadata.csv yang sudah ada (jika ada)
hf_hub_download(
repo_id=DATASET_REPO_ID,
repo_type="dataset",
filename="metadata.csv",
local_dir=TEMP_DIR,
token=HF_TOKEN,
)
df = pd.read_csv(local_metadata_path)
except Exception:
# Jika belum ada metadata.csv, buat DataFrame baru
df = pd.DataFrame(columns=["timestamp", "file_name", "mission_type", "status"])
# Tambahkan entri baru
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
new_entry = {
"timestamp": timestamp,
"file_name": file_name,
"mission_type": mission_type,
"status": status,
}
df = pd.concat([df, pd.DataFrame([new_entry])], ignore_index=True)
# Simpan kembali ke lokal
df.to_csv(local_metadata_path, index=False)
# Upload metadata.csv yang sudah diperbarui ke dataset
api.upload_file(
path_or_fileobj=local_metadata_path,
path_in_repo="metadata.csv",
repo_id=DATASET_REPO_ID,
repo_type="dataset",
)
print(f"Metadata diperbarui: {new_entry}")
def upload_video_to_dataset(video_path, folder_name, mission_type=None, status=None):
"""
Mengunggah file video ke Hugging Face Dataset di folder yang ditentukan
lalu update metadata.csv.
"""
if not HF_TOKEN:
print("Peringatan: Token Hugging Face tidak ditemukan. Tidak dapat mengunggah ke dataset.")
return
try:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
file_name = f"{timestamp}_{os.path.basename(video_path)}"
path_in_repo = f"{folder_name}/{file_name}"
api.upload_file(
path_or_fileobj=video_path,
path_in_repo=path_in_repo,
repo_id=DATASET_REPO_ID,
repo_type="dataset",
)
print(f"File {file_name} berhasil diunggah ke folder '{folder_name}'.")
# Update metadata.csv
update_metadata_csv(file_name, mission_type or folder_name, status or "uploaded")
except Exception as e:
print(f"Gagal mengunggah file ke dataset: {e}")
def process_and_slow_video(video_path, slow_factor=2):
"""
Memperlambat video menggunakan FFmpeg.
"""
slowed_video_path = f"{os.path.splitext(video_path)[0]}_slowed.mp4"
command = [
'ffmpeg',
'-i', video_path,
'-filter:v', f'setpts={slow_factor}*PTS',
'-c:a', 'aac',
'-y', slowed_video_path
]
try:
subprocess.run(command, check=True, capture_output=True, text=True)
print(f"Video berhasil diperlambat. Disimpan di: {slowed_video_path}")
return slowed_video_path
except subprocess.CalledProcessError as e:
print(f"FFmpeg gagal. Error: {e.stderr}")
return None
except FileNotFoundError:
print("FFmpeg tidak ditemukan. Pastikan sudah terinstal dan ada di PATH.")
return None
def validate_video_with_git(video_path, mission_type):
"""
Validasi video menggunakan model GIT dan mengunggahnya ke dataset.
"""
slowed_video_path = process_and_slow_video(video_path)
if not slowed_video_path:
upload_video_to_dataset(video_path, "neither", mission_type, "gagal")
return "Gagal. Gagal memproses video."
if mission_type == 'paper':
required_keywords = ['person', 'paper', 'yellow trash can']
elif mission_type == 'leaf':
required_keywords = ['person', 'leaves', 'green trash can']
elif mission_type == 'plastic_bottle':
required_keywords = ['person', 'plastic bottle', 'yellow trash can']
elif mission_type == 'drink_cans':
required_keywords = ['person', 'drink cans', 'yellow trash can']
else:
upload_video_to_dataset(video_path, "neither", mission_type, "gagal")
return "Gagal. Tipe misi tidak valid."
cap = cv2.VideoCapture(slowed_video_path)
validation_status = "gagal"
valid_frames_count = 0
frame_count = 0
frame_interval = 2
try:
while cap.isOpened():
ret, frame_bgr = cap.read()
if not ret:
break
if frame_count % frame_interval == 0:
frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(frame_rgb)
pixel_values = processor(images=pil_image, return_tensors="pt").pixel_values.to(device)
generated_ids = model.generate(pixel_values=pixel_values, max_length=50)
caption = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip().lower()
print(f"Processing frame {frame_count}: '{caption}'")
found_keywords = [kw for kw in required_keywords if kw in caption]
if len(found_keywords) >= 2:
valid_frames_count += 1
else:
valid_frames_count = 0
if valid_frames_count >= 3:
validation_status = "valid"
break
frame_count += 1
finally:
# Pastikan semua resource ditutup
cap.release()
if os.path.exists(slowed_video_path):
os.remove(slowed_video_path)
# Upload video dan update metadata
if validation_status == "valid":
upload_video_to_dataset(video_path, mission_type, mission_type, "valid")
else:
upload_video_to_dataset(video_path, "neither", mission_type, "gagal")
return validation_status
# Endpoint untuk validasi video
@app.post("/validate")
async def validate_video(file: UploadFile = File(...), mission_type: str = Form(...)):
temp_file_path = os.path.join(TEMP_DIR, file.filename)
try:
with open(temp_file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
result = validate_video_with_git(temp_file_path, mission_type)
return JSONResponse({"status": result})
finally:
# Hapus file sementara dengan aman
if os.path.exists(temp_file_path):
os.remove(temp_file_path)
@app.get("/")
async def root():
return {"status": "Video validation API is running."}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)