File size: 5,417 Bytes
5fb87ee |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
import streamlit as st
import cv2
from PIL import Image
import numpy as np
from ultralytics import YOLO
import tempfile
import os
import supervision as sv
from streamlit_webrtc import webrtc_streamer, WebRtcMode
import functools
# --- Page Configuration ---
st.set_page_config(
page_title="Traffic Lane and Object Detection",
page_icon=":camera_video:",
layout="wide",
initial_sidebar_state="expanded",
)
# --- Sidebar ---
st.sidebar.header("Traffic Lane and Object Detection Options")
source_type = st.sidebar.radio("Select Input Source:", ("Image", "Video", "Live Camera Feed")) # Added Live Camera Feed
confidence_threshold = st.sidebar.slider(
"Confidence Threshold", min_value=0.0, max_value=1.0, value=0.25, step=0.05
)
iou_threshold = st.sidebar.slider(
"IoU Threshold", min_value=0.0, max_value=1.0, value=0.45, step=0.05, help="Intersection over Union threshold for NMS"
)
# --- Load YOLO Model ---
@st.cache_resource
def load_model():
model = YOLO("yolov8n.pt") # Load a pretrained YOLOv8n model
return model
model = load_model()
# --- Functions ---
def detect_lanes_and_objects_image(image, model, confidence_threshold, iou_threshold):
"""Runs YOLO on a single image with Supervision."""
img_np = np.array(image)
results = model(img_np, conf=confidence_threshold, iou=iou_threshold)
detections = sv.Detections.from_ultralytics(results[0])
annotator = sv.BoxAnnotator(thickness=2, text_thickness=1, text_scale=0.5)
annotated_frame = annotator.annotate(scene=img_np, detections=detections)
return annotated_frame
def detect_lanes_and_objects_video(video_path, model, confidence_threshold, iou_threshold):
"""Runs YOLO on a video file with Supervision."""
video = cv2.VideoCapture(video_path)
frame_width = int(video.get(3))
frame_height = int(video.get(4))
fps = int(video.get(cv2.CAP_PROP_FPS))
codec = cv2.VideoWriter_fourcc(*"mp4v")
output_path = "output.mp4"
out = cv2.VideoWriter(output_path, codec, fps, (frame_width, frame_height))
stframe = st.empty()
while video.isOpened():
ret, frame = video.read()
if not ret:
break
results = model(frame, conf=confidence_threshold, iou=iou_threshold)
detections = sv.Detections.from_ultralytics(results[0])
annotator = sv.BoxAnnotator(thickness=2, text_thickness=1, text_scale=0.5)
annotated_frame = annotator.annotate(scene=frame, detections=detections)
out.write(annotated_frame)
stframe.image(annotated_frame, channels="BGR", use_column_width=True)
video.release()
out.release()
cv2.destroyAllWindows()
return output_path
def process_frame(frame, model, confidence_threshold, iou_threshold):
"""Process each frame from the webcam with Supervision."""
img = frame.to_ndarray(format="bgr24")
results = model(img, conf=confidence_threshold, iou=iou_threshold)
detections = sv.Detections.from_ultralytics(results[0])
annotator = sv.BoxAnnotator(thickness=2, text_thickness=1, text_scale=0.5)
annotated_frame = annotator.annotate(scene=img, detections=detections)
return annotated_frame
# --- Main Application ---
st.title("Traffic Lane and Object Detection")
if source_type == "Image":
uploaded_file = st.file_uploader("Upload an image", type=["jpg", "jpeg", "png"])
if uploaded_file is not None:
image = Image.open(uploaded_file)
st.image(image, caption="Uploaded Image", use_column_width=True)
if st.button("Run Detection"):
with st.spinner("Running YOLOv8..."):
detected_image = detect_lanes_and_objects_image(
image, model, confidence_threshold, iou_threshold
)
st.image(detected_image, caption="Detected Image", use_column_width=True)
elif source_type == "Video":
uploaded_video = st.file_uploader("Upload a video", type=["mp4", "avi", "mov"])
if uploaded_video is not None:
tfile = tempfile.NamedTemporaryFile(delete=False)
tfile.write(uploaded_video.read())
video_path = tfile.name
if st.button("Run Detection"):
with st.spinner("Running YOLOv8 on video..."):
output_video_path = detect_lanes_and_objects_video(
video_path, model, confidence_threshold, iou_threshold
)
st.video(output_video_path)
tfile.close()
os.unlink(video_path)
os.remove(output_video_path)
elif source_type == "Live Camera Feed":
st.subheader("Live Camera Feed")
if model is None:
st.error("YOLO model is not loaded. Cannot run live feed.")
st.stop()
custom_process_frame = functools.partial(
process_frame,
model=model,
confidence_threshold=confidence_threshold,
iou_threshold=iou_threshold,
)
webrtc_streamer(
key="live-feed",
video_frame_callback=custom_process_frame,
mode=WebRtcMode.SENDRECV,
media_stream_constraints={"video": True, "audio": False},
)
st.markdown("---")
st.markdown(
"""
**Note:** This example uses YOLOv8 for object detection. Lane detection is a more complex task and requires additional image processing techniques. This is a simplified demo and will likely not perform well on complex or noisy video.
"""
) |