SpokenVision / app.py
tonyliu404's picture
Rename streamlit.py to app.py
de0cfb1 verified
raw
history blame
7.52 kB
import streamlit as st
import cv2
import tempfile
import numpy as np
import torch
import torch.nn.functional as F
import os
from PIL import Image
import time
import requests
import json
import base64
import threading
import pygame
import queue
backend_server_url = "https://0416-2600-1017-a410-36b8-2357-52be-1318-959b.ngrok-free.app"
response_queue = queue.Queue() #For thread-safe communication between threads
# To keep track of ongoing threads
send_thread = None
# def playAudio(audio_base64):
# # Decode the base64 string into bytes
# audio_bytes = base64.b64decode(audio_base64)
# # Save to a file
# audio_path = "output_audio.wav"
# with open(audio_path, "wb") as f:
# f.write(audio_bytes)
# # Display audio player in Streamlit
# st.audio(audio_bytes, format="audio/wav")
def playAudio(audio_base64):
"""
Play audio file using pygame mixer.
Args:
audio_path: Path to audio file
"""
# Decode the base64 string into bytes
audio_bytes = base64.b64decode(audio_base64)
try:
# Write bytes to a temporary WAV file
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_audio:
temp_audio.write(audio_bytes)
temp_audio_path = temp_audio.name
# Initialize pygame mixer
pygame.mixer.init()
pygame.mixer.music.load(temp_audio_path)
pygame.mixer.music.play()
# Wait for playback to finish
while pygame.mixer.music.get_busy():
pygame.time.Clock().tick(10)
except Exception as e:
print(f"Error playing audio: {e}")
def sendToBackend(frame, audio = None):
try:
# Save current frame to disk
cv2.imwrite("frame.jpg", frame)
# Create an empty audio file (1 second of silence if needed)
empty_audio_path = "input.mp3"
if not os.path.exists(empty_audio_path):
with open(empty_audio_path, "wb") as f:
f.write(b"")
with open("frame.jpg", "rb") as img, open("input.mp3", "rb") as audio:
files = {
"image": ("frame.jpg", img, "image/jpeg"),
"audio": ("input.mp3", audio, "audio/mpeg")
}
# Send the request to the backend server
#response = requests.post("http://localhost:8000/process/", files=files)
response = requests.post(backend_server_url + "/process/", files=files)
if response.status_code == 200: #If the request was successful
st.success("Frame sent successfully!")
response_queue.put(response.json())
else:
st.error(f"Failed: {response.status_code} - {response.text}")
except Exception as e:
st.error(f"Error sending frame: {e}")
def thread_sendToBackend():
global send_thread
if not st.session_state.paused and send_thread is None:
send_thread = threading.Thread(target=sendToBackend, args=(frame,), daemon=True)
send_thread.start()
# Setup
if "last_frame" not in st.session_state:
st.session_state.last_frame = None
if "paused" not in st.session_state:
st.session_state.paused = False
# if not st.session_state.paused:
# threading.Thread(target=sendToBackend, args=(st.session_state.last_frame,), daemon=True).start()
st.title("SpokenVision - Real-time Object Detection and Segmentation")
def toggle_pause():
st.session_state.paused = not st.session_state.paused
col1, col2 = st.columns(2)
with col1:
st.button("Start" if st.session_state.paused else "Pause", on_click=toggle_pause)
def main():
# Camera selection options
camera_options = {
"Built-in Camera": 0,
"External Camera": 1,
"Mobile Phone Camera (requires IP Webcam app)": "http://YOUR_PHONE_IP:8080/video"
}
camera_choice = st.selectbox(
"Choose Camera Source",
list(camera_options.keys()),
index=0
)
# Instructions for phone camera
if "Mobile" in camera_choice:
st.info("""
To use your phone as a camera:
1. Install the 'IP Webcam' app from Play Store (Android) or similar app for iOS
2. Open the app and click 'Start server'
3. Replace 'YOUR_PHONE_IP' in the code with your phone's IP address shown in the app
4. Make sure your phone and computer are on the same network
""")
# Camera stream capture
stframe = st.empty()
camera_source = camera_options[camera_choice]
# Setup capture once and keep it in session
if 'cap' not in st.session_state:
cap = cv2.VideoCapture(camera_source)
if not cap.isOpened():
st.error(f"Could not open camera {camera_source}")
else:
st.session_state.cap = cap
st.session_state.streaming = True
st.session_state.paused = False
threading.Thread(target=sendToBackend, args=(None,), daemon=True).start()
else:
cap = st.session_state.cap
# Start camera feed
try:
if not cap.isOpened():
st.error(f"Could not open camera {camera_source}")
return
while True:
# # Always check and render server response
if not response_queue.empty():
response = response_queue.get()
st.markdown("### Server Response")
if "caption" in response:
st.write("Caption:", response["caption"])
if "audio_base64" in response:
threading.Thread(target=playAudio, args=(response["audio_base64"],), daemon=True).start()
if not st.session_state.paused:
threading.Thread(target=sendToBackend, args=(st.session_state.last_frame,), daemon=True).start() #auto send after receiving audio
if not st.session_state.paused:
ret, frame = cap.read()
if not ret:
st.error("Failed to grab frame from camera")
break
st.session_state.last_frame = frame # Save last good frame
else:
frame = st.session_state.get('last_frame', None)
if frame is None:
time.sleep(0.05)
continue
# Convert BGR to RGB
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Display frame in Streamlit
stframe.image(rgb_frame, channels="RGB", use_container_width=True)
# Start sending frame to backend only if no other thread is running
global send_thread
if not st.session_state.paused and send_thread is None:
send_thread = threading.Thread(target=sendToBackend, args=(frame,), daemon=True)
send_thread.start()
# Small delay to prevent high CPU usage
time.sleep(0.05)
except Exception as e:
st.error(f"Error: {e}")
finally:
# Release resources on exit
if 'cap' in st.session_state:
st.session_state.cap.release()
st.session_state.running = False
st.stop()
if __name__ == "__main__":
main()