|
import gradio as gr |
|
import cv2 |
|
import numpy as np |
|
from object_detection import ObjectDetector |
|
import tempfile |
|
import os |
|
import torch |
|
import time |
|
|
|
|
|
try: |
|
detector = ObjectDetector() |
|
print("Detector initialized successfully") |
|
except Exception as e: |
|
print(f"Error initializing detector: {str(e)}") |
|
detector = None |
|
|
|
def process_video(video_path): |
|
if detector is None: |
|
return None, "Error: Detector initialization failed" |
|
|
|
try: |
|
start_time = time.time() |
|
|
|
with tempfile.TemporaryDirectory() as temp_dir: |
|
|
|
cap = cv2.VideoCapture(video_path) |
|
if not cap.isOpened(): |
|
return None, "Error: Could not open video file" |
|
|
|
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
|
|
|
max_frames = min(100, total_frames) |
|
|
|
|
|
output_path = os.path.join(temp_dir, "output.mp4") |
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) |
|
|
|
frame_count = 0 |
|
processed_frames = 0 |
|
while cap.isOpened() and frame_count < max_frames: |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
|
|
|
|
try: |
|
results = detector.detect(frame) |
|
|
|
|
|
for det in results['detections']: |
|
x1, y1, x2, y2 = det['bbox'] |
|
cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2) |
|
cv2.putText(frame, f"{det['class']} {det['confidence']:.2f}", |
|
(int(x1), int(y1)-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) |
|
|
|
|
|
for pose in results['pose_detections']: |
|
keypoints = pose['keypoints'] |
|
for kp in keypoints: |
|
x, y, conf = kp |
|
if conf > 0.5: |
|
cv2.circle(frame, (int(x), int(y)), 4, (0, 0, 255), -1) |
|
|
|
|
|
y_offset = 30 |
|
cv2.putText(frame, f"Total Objects: {results['stats']['total_objects']}", |
|
(10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) |
|
y_offset += 30 |
|
|
|
|
|
cv2.putText(frame, "Scene Context:", (10, y_offset), |
|
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2) |
|
y_offset += 30 |
|
cv2.putText(frame, f"Scene Type: {results['analysis']['scene_context']['scene_type']}", |
|
(10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2) |
|
y_offset += 30 |
|
|
|
|
|
cv2.putText(frame, "Cognitive Analysis:", (10, y_offset), |
|
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2) |
|
y_offset += 30 |
|
cv2.putText(frame, f"Group Activity: {results['analysis']['cognitive']['group_activity']}", |
|
(10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2) |
|
|
|
|
|
out.write(frame) |
|
processed_frames += 1 |
|
except Exception as e: |
|
print(f"Error processing frame {frame_count}: {str(e)}") |
|
|
|
frame_count += 1 |
|
|
|
|
|
if frame_count % 5 != 0: |
|
continue |
|
|
|
|
|
cap.release() |
|
out.release() |
|
|
|
|
|
processing_time = time.time() - start_time |
|
|
|
|
|
status = f"Processing complete!\nProcessed {processed_frames} frames in {processing_time:.2f} seconds" |
|
return output_path, status |
|
except Exception as e: |
|
return None, f"Error processing video: {str(e)}" |
|
|
|
|
|
iface = gr.Interface( |
|
fn=process_video, |
|
inputs=gr.Video(), |
|
outputs=[ |
|
gr.Video(label="Processed Video"), |
|
gr.Textbox(label="Status") |
|
], |
|
title="Glad8tr Video Analysis", |
|
description="Upload a video to analyze objects, poses, and cognitive states. Note: Processing is limited to first 100 frames for demo purposes.", |
|
examples=[ |
|
["teensonstreet.mp4"] |
|
], |
|
allow_flagging="never" |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
iface.launch(share=True) |