File size: 5,471 Bytes
c599e3a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
import gradio as gr
import cv2
import numpy as np
from object_detection import ObjectDetector
import tempfile
import os
import torch
import time
# Initialize the detector
try:
detector = ObjectDetector()
print("Detector initialized successfully")
except Exception as e:
print(f"Error initializing detector: {str(e)}")
detector = None
def process_video(video_path):
if detector is None:
return None, "Error: Detector initialization failed"
try:
start_time = time.time()
# Create a temporary directory for processed frames
with tempfile.TemporaryDirectory() as temp_dir:
# Open the video
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return None, "Error: Could not open video file"
# Get video properties
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Limit processing to first 100 frames for demo purposes
max_frames = min(100, total_frames)
# Create output video writer
output_path = os.path.join(temp_dir, "output.mp4")
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
frame_count = 0
processed_frames = 0
while cap.isOpened() and frame_count < max_frames:
ret, frame = cap.read()
if not ret:
break
# Process frame
try:
results = detector.detect(frame)
# Draw detections
for det in results['detections']:
x1, y1, x2, y2 = det['bbox']
cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
cv2.putText(frame, f"{det['class']} {det['confidence']:.2f}",
(int(x1), int(y1)-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
# Draw pose detections
for pose in results['pose_detections']:
keypoints = pose['keypoints']
for kp in keypoints:
x, y, conf = kp
if conf > 0.5:
cv2.circle(frame, (int(x), int(y)), 4, (0, 0, 255), -1)
# Draw analysis box
y_offset = 30
cv2.putText(frame, f"Total Objects: {results['stats']['total_objects']}",
(10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
y_offset += 30
# Draw scene context
cv2.putText(frame, "Scene Context:", (10, y_offset),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
y_offset += 30
cv2.putText(frame, f"Scene Type: {results['analysis']['scene_context']['scene_type']}",
(10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
y_offset += 30
# Draw cognitive analysis
cv2.putText(frame, "Cognitive Analysis:", (10, y_offset),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
y_offset += 30
cv2.putText(frame, f"Group Activity: {results['analysis']['cognitive']['group_activity']}",
(10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
# Write frame
out.write(frame)
processed_frames += 1
except Exception as e:
print(f"Error processing frame {frame_count}: {str(e)}")
frame_count += 1
# Process every 5th frame to speed up processing
if frame_count % 5 != 0:
continue
# Release resources
cap.release()
out.release()
# Calculate processing time
processing_time = time.time() - start_time
# Return the processed video with detailed status
status = f"Processing complete!\nProcessed {processed_frames} frames in {processing_time:.2f} seconds"
return output_path, status
except Exception as e:
return None, f"Error processing video: {str(e)}"
# Create Gradio interface
iface = gr.Interface(
fn=process_video,
inputs=gr.Video(),
outputs=[
gr.Video(label="Processed Video"),
gr.Textbox(label="Status")
],
title="Glad8tr Video Analysis",
description="Upload a video to analyze objects, poses, and cognitive states. Note: Processing is limited to first 100 frames for demo purposes.",
examples=[
["teensonstreet.mp4"]
],
allow_flagging="never"
)
# Launch the interface
if __name__ == "__main__":
iface.launch(share=True) |