|
|
|
|
|
|
|
|
import cv2 |
|
|
import numpy as np |
|
|
import matplotlib.pyplot as plt |
|
|
from typing import List, Dict, Tuple, Optional |
|
|
from ultralytics import YOLO |
|
|
|
|
|
def perform_detection( |
|
|
yolo_model: YOLO, |
|
|
frame: np.ndarray, |
|
|
conf_threshold: float=0.5 |
|
|
) -> Optional[List[Dict]]: |
|
|
""" |
|
|
Runs the YOLO model inference on a single frame. |
|
|
""" |
|
|
if frame is None: |
|
|
print("Error: Input frame is None in perform_detection.") |
|
|
return None |
|
|
try: |
|
|
|
|
|
results = yolo_model.predict(source=frame, conf=conf_threshold, verbose=False) |
|
|
return results |
|
|
except Exception as e: |
|
|
print(f"Error during model prediction: {e}") |
|
|
return None |
|
|
|
|
|
def create_motion_mask(frame, threshold=25): |
|
|
""" |
|
|
Creates a simple motion mask from an image. |
|
|
For the demo, we'll use a basic thresholding approach. |
|
|
""" |
|
|
|
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
|
|
|
|
|
|
|
|
blurred = cv2.GaussianBlur(gray, (5, 5), 0) |
|
|
|
|
|
|
|
|
thresh = cv2.adaptiveThreshold( |
|
|
blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, |
|
|
cv2.THRESH_BINARY_INV, 11, threshold |
|
|
) |
|
|
|
|
|
|
|
|
kernel = np.ones((3, 3), np.uint8) |
|
|
mask = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel, iterations=1) |
|
|
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel, iterations=2) |
|
|
|
|
|
return mask |
|
|
|
|
|
def postprocess_results( |
|
|
results: Optional[List[Dict]], |
|
|
model_class_names: Dict[int, str], |
|
|
mask: Optional[np.ndarray] = None |
|
|
) -> List[Dict]: |
|
|
""" |
|
|
Extracts information from detection results. |
|
|
If a mask is provided, only keeps detections that overlap with the mask. |
|
|
""" |
|
|
detections_list = [] |
|
|
if results is None or not results: |
|
|
return detections_list |
|
|
|
|
|
try: |
|
|
boxes = results[0].boxes |
|
|
except (IndexError, AttributeError) as e: |
|
|
print(f"Warning: Could not access boxes in results: {e}") |
|
|
return detections_list |
|
|
|
|
|
for box in boxes: |
|
|
try: |
|
|
|
|
|
xyxy = box.xyxy[0].cpu().numpy().astype(int) |
|
|
x1, y1, x2, y2 = xyxy |
|
|
|
|
|
|
|
|
if mask is not None: |
|
|
|
|
|
center_x = (x1 + x2) // 2 |
|
|
center_y = (y1 + y2) // 2 |
|
|
|
|
|
|
|
|
|
|
|
y1_safe = max(0, min(y1, mask.shape[0]-1)) |
|
|
y2_safe = max(0, min(y2, mask.shape[0]-1)) |
|
|
x1_safe = max(0, min(x1, mask.shape[1]-1)) |
|
|
x2_safe = max(0, min(x2, mask.shape[1]-1)) |
|
|
|
|
|
|
|
|
box_region = mask[y1_safe:y2_safe, x1_safe:x2_safe] |
|
|
|
|
|
|
|
|
if box_region.size > 0: |
|
|
mask_coverage = np.sum(box_region > 0) / box_region.size |
|
|
else: |
|
|
mask_coverage = 0 |
|
|
|
|
|
|
|
|
if not (0 <= center_y < mask.shape[0] and 0 <= center_x < mask.shape[1] and |
|
|
(mask[center_y, center_x] > 0 or mask_coverage > 0.5)): |
|
|
continue |
|
|
|
|
|
|
|
|
conf = float(box.conf[0].cpu().numpy()) |
|
|
|
|
|
|
|
|
cls_id = int(box.cls[0].cpu().numpy()) |
|
|
class_name = model_class_names.get(cls_id, f"Unknown Class {cls_id}") |
|
|
|
|
|
|
|
|
detections_list.append({ |
|
|
'class_name': class_name, |
|
|
'confidence': conf, |
|
|
'bbox_xyxy': [x1, y1, x2, y2] |
|
|
}) |
|
|
except Exception as e: |
|
|
print(f"Error processing a detection box: {e}") |
|
|
continue |
|
|
return detections_list |
|
|
|
|
|
def draw_detections( |
|
|
frame: np.ndarray, |
|
|
detections: List[Dict], |
|
|
mask: Optional[np.ndarray] = None |
|
|
) -> np.ndarray: |
|
|
""" |
|
|
Draws bounding boxes and labels on the frame. |
|
|
If mask is provided, overlays it on the frame. |
|
|
""" |
|
|
output_frame = frame.copy() |
|
|
|
|
|
|
|
|
if mask is not None and mask.shape[0] > 0 and mask.shape[1] > 0: |
|
|
|
|
|
mask_overlay = np.zeros_like(output_frame) |
|
|
mask_overlay[mask > 0] = [0, 100, 0] |
|
|
|
|
|
|
|
|
output_frame = cv2.addWeighted(output_frame, 0.7, mask_overlay, 0.3, 0) |
|
|
|
|
|
|
|
|
color = (0, 255, 0) |
|
|
font_scale = 1.2 |
|
|
font = cv2.FONT_HERSHEY_SIMPLEX |
|
|
for detection in detections: |
|
|
try: |
|
|
x1, y1, x2, y2 = detection['bbox_xyxy'] |
|
|
class_name = detection['class_name'] |
|
|
conf = detection['confidence'] |
|
|
|
|
|
|
|
|
cv2.rectangle(output_frame, (x1, y1), (x2, y2), color, 5) |
|
|
|
|
|
|
|
|
label = f"{class_name}: {conf:.2f}" |
|
|
|
|
|
|
|
|
(label_width, label_height), baseline = cv2.getTextSize(label, font, font_scale, 3) |
|
|
label_ymin = max(y1, label_height + 10) |
|
|
|
|
|
|
|
|
cv2.rectangle(output_frame, |
|
|
(x1, label_ymin - label_height - 10), |
|
|
(x1 + label_width, label_ymin - baseline), |
|
|
color, |
|
|
cv2.FILLED) |
|
|
|
|
|
|
|
|
cv2.putText(output_frame, |
|
|
label, |
|
|
(x1, label_ymin - 5), |
|
|
font, |
|
|
font_scale, |
|
|
(255, 255, 255), |
|
|
3) |
|
|
except Exception as e: |
|
|
continue |
|
|
return output_frame |
|
|
|
|
|
def load_yolo_model(model_path): |
|
|
""" |
|
|
Loads the YOLO model from the specified path. |
|
|
""" |
|
|
print("Loading the YOLO model...") |
|
|
try: |
|
|
model = YOLO(model_path) |
|
|
class_names = model.names |
|
|
print(f"Model loaded with {len(class_names)} classes!") |
|
|
return model, class_names |
|
|
except Exception as e: |
|
|
print(f"Error loading model: {e}") |
|
|
return None, None |
|
|
|
|
|
def load_image(image_path): |
|
|
""" |
|
|
Loads an image from the specified path. |
|
|
""" |
|
|
print(f"Opening image: {image_path}") |
|
|
image = cv2.imread(image_path) |
|
|
if image is None: |
|
|
print(f"Error: Could not read image file '{image_path}'.") |
|
|
return image |
|
|
|
|
|
def load_or_create_mask(image, mask_path=None): |
|
|
""" |
|
|
Either loads a mask from disk or creates a new one from the image. |
|
|
""" |
|
|
if mask_path and os.path.exists(mask_path): |
|
|
print(f"Loading mask: {mask_path}") |
|
|
mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE) |
|
|
else: |
|
|
print("Creating mask from image...") |
|
|
mask = create_motion_mask(image) |
|
|
|
|
|
return mask |
|
|
|
|
|
def display_results(output_frame, detections, mask=None): |
|
|
""" |
|
|
Displays detection results and saves the output image. |
|
|
""" |
|
|
|
|
|
print("\n--- Insects Detected ---") |
|
|
if detections: |
|
|
for i, obj in enumerate(detections, 1): |
|
|
print(f"{i}. {obj['class_name']} (confidence: {obj['confidence']:.2f})") |
|
|
else: |
|
|
print("No insects detected.") |
|
|
|
|
|
|
|
|
if mask is not None: |
|
|
cv2.imwrite("motion_mask.png", mask) |
|
|
print("Motion mask saved to: motion_mask.png") |
|
|
|
|
|
|
|
|
output_rgb = cv2.cvtColor(output_frame, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
|
|
|
plt.figure(figsize=(10, 8)) |
|
|
plt.imshow(output_rgb) |
|
|
plt.title("Insect Detection Results") |
|
|
plt.axis('off') |
|
|
plt.show() |
|
|
|
|
|
|
|
|
result_path = "detection_result.jpg" |
|
|
cv2.imwrite(result_path, output_frame) |
|
|
print(f"Result saved to: {result_path}") |
|
|
|
|
|
import os |