import os import cv2 import math import numpy as np import av import streamlit as st import pandas as pd import altair as alt import time from streamlit_webrtc import ( webrtc_streamer, VideoProcessorBase, WebRtcMode, VideoHTMLAttributes, ) from streamlit_autorefresh import st_autorefresh from twilio.rest import Client from line_detector import ( LineDetector, HoughLinesP, AdaptiveHoughLinesP, RansacLine, RotatedRect, FitEllipse, ) from pid_controller import PIDController # Set page configuration st.set_page_config( page_title="Line Follower PID", page_icon="🚁", layout="wide", initial_sidebar_state="expanded", ) def get_ice_servers(): """ Get ICE servers configuration. For Streamlit Cloud deployment, a TURN server is required in addition to STUN. This function will try to use Twilio's TURN server service if credentials are available, otherwise it falls back to a free STUN server from Google. """ try: # Try to get Twilio credentials from environment variables account_sid = os.environ.get("TWILIO_ACCOUNT_SID") auth_token = os.environ.get("TWILIO_AUTH_TOKEN") if account_sid and auth_token: client = Client(account_sid, auth_token) token = client.tokens.create() return token.ice_servers else: st.warning( "Twilio credentials not found. Using free STUN server only, which may not work reliably." # Removed Streamlit Cloud mention for generality ) except Exception as e: st.error(f"Error setting up Twilio TURN servers: {e}") # Fallback to Google's free STUN server return [{"urls": ["stun:stun.l.google.com:19302"]}] # Apply custom CSS for a modern minimalist design st.markdown( """ """, unsafe_allow_html=True, ) # App header with minimalist design st.title("🚁 Drone Line Follower") st.markdown("Vision-based line tracking with real-time PID control") # Add project description st.markdown( """ This application simulates a drone's line-following behavior using visual information. It processes an image feed, applies a color filter (HSV) to isolate the line, approximates the line's position and angle using selectable methods (like HoughLinesP, RANSAC, etc.), and uses PID controllers to adjust the simulated drone's angle (yaw) and lateral position (roll) to stay centered on the line. """ ) # Add a neat divider st.markdown("
", unsafe_allow_html=True) # Setup the sidebar with camera parameters with st.sidebar: st.markdown("## Control Panel") # Method selection with modern look st.markdown("### Detection Method") method_name = st.selectbox( "Select algorithm", [ "HoughLinesP", "AdaptiveHoughLinesP", "RansacLine", "FitEllipse", "RotatedRect", ], ) # Create tabs for different setting categories settings_tab, tuning_tab = st.tabs(["Camera Settings", "PID Tuning"]) with settings_tab: # HSV Filter with modern sliders st.markdown("#### HSV Filter") # Create two columns for min/max values col_min, col_max = st.columns(2) with col_min: st.markdown("##### Min") h_min = st.slider("H min", 0, 179, 0) s_min = st.slider("S min", 0, 255, 0) v_min = st.slider("V min", 0, 255, 0) with col_max: st.markdown("##### Max") h_max = st.slider("H max", 0, 179, 179) s_max = st.slider("S max", 0, 255, 255) v_max = st.slider("V max", 0, 255, 255) # ROI settings st.markdown("#### Region of Interest") roi_width = st.slider("Width", 50, 640, 320, step=10) roi_height = st.slider("Height", 50, 480, 240, step=10) with tuning_tab: # PID Controller Settings with better organization st.markdown("#### Angle Control (Yaw)") # PID Parameters for Angle Control angle_kp = st.slider( "Kp", 0.0, 5.0, 1.0, 0.1, help="Proportional gain for angle control" ) angle_ki = st.slider( "Ki", 0.0, 1.0, 0.0, 0.01, help="Integral gain for angle control" ) angle_kd = st.slider( "Kd", 0.0, 5.0, 0.5, 0.1, help="Derivative gain for angle control" ) angle_setpoint = st.slider( "Setpoint", -90.0, 90.0, 0.0, 1.0, help="Desired angle in degrees (0° = vertical)", ) st.markdown("#### Position Control (Roll)") # PID Parameters for Position Control pos_kp = st.slider( "Kp", 0.0, 2.0, 0.5, 0.05, help="Proportional gain for position control", ) pos_ki = st.slider( "Ki", 0.0, 1.0, 0.0, 0.01, help="Integral gain for position control", ) pos_kd = st.slider( "Kd", 0.0, 5.0, 0.2, 0.05, help="Derivative gain for position control", ) pos_setpoint = st.slider( "Setpoint", -100, 100, 0, 1, help="Desired position (0 = center of frame)", ) # Reset PID Controllers Button - outside tabs for easy access if st.button("Reset PID Controllers", type="primary"): st.session_state.reset_pid = True else: st.session_state.reset_pid = False # Add instructions at the bottom of sidebar with st.expander("How to use", expanded=False): st.markdown( """ ### Quick Guide 1. **Start camera** stream 2. **Adjust HSV filters** to isolate the line 3. **Set region of interest** for detection 4. **Choose detection algorithm** 5. **Tune PID parameters**: - Start with Kp only - Add Kd to reduce oscillation - Add Ki to eliminate steady-state error [About PID tuning →](https://youtu.be/wkfEZmsQqiA?si=uikKLLS4MLxxTI5m) """ ) # Map method names to actual methods method_map = { "HoughLinesP": HoughLinesP, "AdaptiveHoughLinesP": AdaptiveHoughLinesP, "RansacLine": RansacLine, "FitEllipse": FitEllipse, "RotatedRect": RotatedRect, } # Initialize session state for HSV values, method, and ROI settings if "hsv_lower" not in st.session_state: st.session_state.hsv_lower = [h_min, s_min, v_min] st.session_state.hsv_upper = [h_max, s_max, v_max] st.session_state.method = method_name st.session_state.roi_width = roi_width st.session_state.roi_height = roi_height # Update session state with current values st.session_state.hsv_lower = [h_min, s_min, v_min] st.session_state.hsv_upper = [h_max, s_max, v_max] st.session_state.method = method_name st.session_state.roi_width = roi_width st.session_state.roi_height = roi_height # Initialize session state for PID outputs if "yaw_output" not in st.session_state: st.session_state.yaw_output = 0.0 st.session_state.roll_output = 0.0 st.session_state.p_term_angle = 0.0 st.session_state.i_term_angle = 0.0 st.session_state.d_term_angle = 0.0 st.session_state.p_term_pos = 0.0 st.session_state.i_term_pos = 0.0 st.session_state.d_term_pos = 0.0 class VideoTransformer(VideoProcessorBase): def __init__(self): self.detector = LineDetector(estimation_method=HoughLinesP) self.hsv_lower = np.array([0, 0, 0], dtype=np.uint8) self.hsv_upper = np.array([179, 255, 255], dtype=np.uint8) self.method = HoughLinesP self.roi_size = (320, 240) # Initialize PID Controllers self.angle_pid = PIDController( kp=1.0, ki=0.0, kd=0.5, setpoint=0.0, min_output=-100, max_output=100 ) self.position_pid = PIDController( kp=0.5, ki=0.0, kd=0.2, setpoint=0.0, min_output=-100, max_output=100 ) # Frame counter for smoother updates self.frame_count = 0 # Initialize instance variables for PID outputs self.yaw_output = 0.0 self.roll_output = 0.0 self.p_term_angle = 0.0 self.i_term_angle = 0.0 self.d_term_angle = 0.0 self.p_term_pos = 0.0 self.i_term_pos = 0.0 self.d_term_pos = 0.0 def recv(self, frame: av.VideoFrame) -> av.VideoFrame: img = frame.to_ndarray(format="bgr24") # Update detector with latest settings self.detector.color_detector.hsv_color = np.vstack( [self.hsv_lower, self.hsv_upper] ) self.detector.estimation_method = self.method # Run detection output, roi_mask, cx, ang, conf = self.detector.detect_line( img, region=self.roi_size, draw=False ) # Reset PID controllers if requested if "reset_pid" in st.session_state and st.session_state.reset_pid: self.angle_pid.reset() self.position_pid.reset() st.session_state.reset_pid = False # Update PID controllers with latest settings self.angle_pid.kp = st.session_state.get("angle_kp", 1.0) self.angle_pid.ki = st.session_state.get("angle_ki", 0.0) self.angle_pid.kd = st.session_state.get("angle_kd", 0.5) self.angle_pid.setpoint = st.session_state.get("angle_setpoint", 0.0) self.position_pid.kp = st.session_state.get("pos_kp", 0.5) self.position_pid.ki = st.session_state.get("pos_ki", 0.0) self.position_pid.kd = st.session_state.get("pos_kd", 0.2) self.position_pid.setpoint = st.session_state.get("pos_setpoint", 0.0) # Compute PID outputs based on detected values yaw_output = roll_output = 0.0 if not math.isnan(ang) and not math.isnan(cx): # Get image dimensions h, w = img.shape[:2] # Normalize center position to be relative to center of frame # cx is already relative to ROI normalized_cx = cx - (w / 2) # Calculate PID outputs yaw_output, p_angle, i_angle, d_angle = self.angle_pid.compute(ang) roll_output, p_pos, i_pos, d_pos = self.position_pid.compute(normalized_cx) self.yaw_output = yaw_output self.roll_output = roll_output self.p_term_angle = p_angle self.i_term_angle = i_angle self.d_term_angle = d_angle self.p_term_pos = p_pos self.i_term_pos = i_pos self.d_term_pos = d_pos self.frame_count += 1 else: self.yaw_output = 0.0 self.roll_output = 0.0 # Draw diagnostics with modern minimalist style h, w = img.shape[:2] # Modern color scheme for all UI elements roi_color = (41, 128, 185) # Blue text_bg_color = (52, 73, 94, 200) # Dark slate with higher opacity text_color = (255, 255, 255) # Pure white for better contrast # Create a clean, non-obtrusive design # Draw ROI rectangle with modern blue color and thinner line cx_mask, cy_mask = w // 2, h // 2 w_roi, h_roi = self.roi_size off_x, off_y = cx_mask - w_roi // 2, cy_mask - h_roi // 2 top_left = (off_x, off_y) bottom_right = (off_x + w_roi, off_y + h_roi) # Draw more professional ROI border - thinner and with rounded corners effect cv2.rectangle(output, top_left, bottom_right, roi_color, 2) # Draw dots at corners for rounded look corner_radius = 3 for corner in [ top_left, (bottom_right[0], top_left[1]), (top_left[0], bottom_right[1]), bottom_right, ]: cv2.circle(output, corner, corner_radius, roi_color, -1) # Create a cleaner info overlay # Bottom right position for less interference with the line overlay_height = 90 overlay_width = 200 overlay_margin = 15 overlay_position = ( w - overlay_width - overlay_margin, h - overlay_height - overlay_margin, ) # Create semi-transparent overlay overlay = output.copy() cv2.rectangle( overlay, overlay_position, (overlay_position[0] + overlay_width, overlay_position[1] + overlay_height), text_bg_color[:3], # OpenCV doesn't support alpha in rectangle -1, ) # Apply transparency alpha = 0.75 cv2.addWeighted(overlay, alpha, output, 1 - alpha, 0, output) # Add a subtle border cv2.rectangle( output, overlay_position, (overlay_position[0] + overlay_width, overlay_position[1] + overlay_height), (255, 255, 255, 128), # White border 1, ) # Modern font font = cv2.FONT_HERSHEY_SIMPLEX font_scale = 0.55 font_thickness = 1 line_height = 20 # Start position for text text_start_x = overlay_position[0] + 10 text_start_y = overlay_position[1] + 20 # Function to draw text with subtle shadow for better readability def draw_text_with_shadow(text, pos_y, color=text_color): # Shadow effect (subtle) cv2.putText( output, text, (text_start_x + 1, pos_y + 1), font, font_scale, (0, 0, 0, 150), font_thickness, ) # Main text cv2.putText( output, text, (text_start_x, pos_y), font, font_scale, color, font_thickness, ) # Draw sensor values with more modern, clean formatting draw_text_with_shadow("Line Detection", text_start_y - 5) # Add a subtle underline cv2.line( output, (text_start_x, text_start_y + 2), (text_start_x + 100, text_start_y + 2), (255, 255, 255, 150), 1, ) if not math.isnan(ang): draw_text_with_shadow(f"Angle: {ang:.1f}", text_start_y + line_height) else: draw_text_with_shadow("Angle: --", text_start_y + line_height) if not math.isnan(cx): draw_text_with_shadow(f"Position: {cx:.1f}", text_start_y + 2 * line_height) else: draw_text_with_shadow("Position: --", text_start_y + 2 * line_height) # Draw PID outputs with color indication yaw_color = (130, 220, 255) if abs(yaw_output) < 50 else (130, 130, 255) draw_text_with_shadow( f"Control: {yaw_output:.1f}, {roll_output:.1f}", text_start_y + 3 * line_height, yaw_color, ) # Fetch the intermediate results for preview filtered = self.detector.color_detector.result # Prepare filtered preview - more compact pw, ph = w // 6, h // 6 # Smaller preview size filtered_preview = cv2.resize(filtered, (pw, ph)) # Add a cleaner border to the preview filtered_preview = cv2.copyMakeBorder( filtered_preview, 1, 1, 1, 1, cv2.BORDER_CONSTANT, value=(255, 255, 255) ) # Position the filter preview in top-right corner more elegantly preview_padding = 10 output[ preview_padding : preview_padding + ph + 2, w - pw - preview_padding - 2 : w - preview_padding, ] = filtered_preview # Add a small "Filter" label above the preview for clarity small_font_scale = 0.4 cv2.putText( output, "Filter", (w - pw - preview_padding, preview_padding - 4), font, small_font_scale, (255, 255, 255), 1, ) return av.VideoFrame.from_ndarray(output, format="bgr24") # Create a simplified layout with two main rows instead of tabs col1, col2 = st.columns([3, 1], gap="large") with col1: # Video stream (expanded from the tab layout) st.markdown("### Line Following Camera") # Wrap WebRTC in a div for styling (optional, if needed) st.markdown('
', unsafe_allow_html=True) # Create the webrtc component webrtc_ctx = webrtc_streamer( key="line-detection", mode=WebRtcMode.SENDRECV, rtc_configuration={"iceServers": get_ice_servers()}, video_processor_factory=VideoTransformer, media_stream_constraints={"video": True, "audio": False}, async_processing=True, video_html_attrs=VideoHTMLAttributes( autoPlay=True, controls=False, style={ "width": f"1280px", "height": f"720px", "border-radius": "8px", "margin": "0 auto", "display": "block", "border": "2px solid #AAAAAA", # Changed border to lighter grey }, ), ) # Pass the settings to the video transformer if webrtc_ctx.video_processor: webrtc_ctx.video_processor.hsv_lower = np.array( st.session_state.hsv_lower, dtype=np.uint8 ) webrtc_ctx.video_processor.hsv_upper = np.array( st.session_state.hsv_upper, dtype=np.uint8 ) webrtc_ctx.video_processor.method = method_map[st.session_state.method] webrtc_ctx.video_processor.roi_size = ( st.session_state.roi_width, st.session_state.roi_height, ) # Get the latest PID outputs from the video processor if webrtc_ctx.state.playing: st.session_state.yaw_output = webrtc_ctx.video_processor.yaw_output st.session_state.roll_output = webrtc_ctx.video_processor.roll_output st.session_state.p_term_angle = webrtc_ctx.video_processor.p_term_angle st.session_state.i_term_angle = webrtc_ctx.video_processor.i_term_angle st.session_state.d_term_angle = webrtc_ctx.video_processor.d_term_angle st.session_state.p_term_pos = webrtc_ctx.video_processor.p_term_pos st.session_state.i_term_pos = webrtc_ctx.video_processor.i_term_pos st.session_state.d_term_pos = webrtc_ctx.video_processor.d_term_pos with col2: # Simplified metrics section that shows only essential values st.markdown("### Control Values") # Display the most important metrics in a clean format # Use vertical layout for metrics instead of columns st.metric("Angle Control (Yaw)", f"{st.session_state.get('yaw_output', 0):.1f}") st.metric( "Position Control (Roll)", f"{st.session_state.get('roll_output', 0):.1f}" ) # Add vertical space before button st.markdown("
", unsafe_allow_html=True) # Add a reset button for the PID controllers if st.button("Reset PID Controllers", use_container_width=True, type="primary"): st.session_state.reset_pid = True # Create a dedicated row for the PID control graph st.markdown("
", unsafe_allow_html=True) # Add divider before graph st.markdown("### PID Controller Output") chart_placeholder = st.empty() # Initialize start_time and pid_df exactly once if "start_time" not in st.session_state: st.session_state.start_time = time.time() if "pid_df" not in st.session_state: # start with a single zero row so the chart axes are set st.session_state.pid_df = pd.DataFrame([{"time_rel": 0.0, "yaw": 0.0, "roll": 0.0}]) # auto‐refresh every 100 ms st_autorefresh(interval=500, limit=None, key="pid_refresh") # On each rerun, if the camera is playing, append the newest PID outputs if webrtc_ctx.state.playing: t = time.time() - st.session_state.start_time new_row = pd.DataFrame( [ { "time_rel": t, "yaw": st.session_state.yaw_output, "roll": st.session_state.roll_output, } ] ) st.session_state.pid_df = pd.concat( [st.session_state.pid_df, new_row], ignore_index=True ) # keep only last 100 points if len(st.session_state.pid_df) > 100: st.session_state.pid_df = st.session_state.pid_df.iloc[-100:].reset_index( drop=True ) # Build an Altair ā€œfoldedā€ chart so you can see both yaw and roll # 1) grab the wide‐form DataFrame df = st.session_state.pid_df # 2) melt it into long‐form df_long = df.melt( id_vars=["time_rel"], value_vars=["yaw", "roll"], var_name="Signal", value_name="Value", ) # 3) build your Altair chart off of df_long chart = ( alt.Chart(df_long) .mark_line(point=False) # Use point=False for cleaner lines .encode( x=alt.X("time_rel:Q", title="Time (s)"), y=alt.Y("Value:Q", title="Controller Output Value"), color=alt.Color( "Signal:N", title="Control Signal", scale=alt.Scale(domain=["yaw", "roll"], range=["#007bff", "#ff7f0e"]), ), # Custom colors tooltip=[ alt.Tooltip("time_rel", title="Time (s)", format=".2f"), alt.Tooltip("Signal", title="Control Signal"), alt.Tooltip("Value", title="Output Value", format=".2f"), ], ) .properties(height=350) # Slightly taller chart .interactive() # Enable zooming and panning ) # 4) draw it inside a container for styling with st.container(): st.markdown('
', unsafe_allow_html=True) chart_placeholder.altair_chart(chart, use_container_width=True) st.markdown("
", unsafe_allow_html=True) # Hide Streamlit footer/menu st.markdown( """ """, unsafe_allow_html=True, )