import streamlit as st from asr import transcribe_file from summarization import summarize_transcript from podcast import search_podcast_series, fetch_episodes, download_podcast_audio, fetch_audio from utils import model_names, available_gguf_llms import base64 import time from datetime import datetime # ✅ Correct import import html as html_lib # Required for text escaping in transcripts import json # ✅ Added for passing data to JS # Session state init if "transcript" not in st.session_state: st.session_state.transcript = "" if "summary" not in st.session_state: st.session_state.summary = "" if "status" not in st.session_state: st.session_state.status = "Ready" if "audio_path" not in st.session_state: st.session_state.audio_path = None if "utterances" not in st.session_state: st.session_state.utterances = [] if "audio_base64" not in st.session_state: st.session_state.audio_base64 = None if "prev_audio_path" not in st.session_state: st.session_state.prev_audio_path = None if "transcribing" not in st.session_state: st.session_state.transcribing = False st.set_page_config(page_title="🎙️ Moonshine ASR + LLM", layout="wide") st.title("🎙️ Speech Summarization with Moonshine ASR & LLM") with st.sidebar: st.header("⚙️ Settings") vad_threshold = st.slider("VAD Threshold", 0.1, 0.9, 0.5) model_name = st.selectbox("Moonshine Model", model_names.keys()) llm_model = st.selectbox("LLM for Summarization", list(available_gguf_llms.keys())) prompt_input = st.text_area("Custom Prompt", value="Summarize the transcript below.") tab1, tab2, tab3 = st.tabs(["📻 Podcast", "🎵 Audio Input", "📄 Results"]) with tab1: st.subheader("Search Podcast") query = st.text_input("Enter podcast name") if st.button("Search Series"): series_list = search_podcast_series(query) st.session_state.series_list = series_list if "series_list" in st.session_state: series_titles = [f"{s['title']} by {s['artist']}" for s in st.session_state.series_list] selected_title = st.selectbox("Select Series", series_titles) series = next((s for s in st.session_state.series_list if f"{s['title']} by {s['artist']}" == selected_title), None) if series: st.image(series["thumbnail"], width=150) st.text_area("Series Info", value=f"Title: {series['title']}\nArtist: {series['artist']}\nEpisodes: {series['episode_count']}", disabled=True) if st.button("Load Episodes"): episodes = fetch_episodes(series["feed_url"]) st.session_state.episodes = episodes if "episodes" in st.session_state: episode_titles = [e["title"] for e in st.session_state.episodes] selected_episode = st.selectbox("Select Episode", episode_titles) episode = next((e for e in st.session_state.episodes if e["title"] == selected_episode), None) if episode: st.text_area("Episode Info", value=f"Title: {episode['title']}\nPublished: {episode['published']}\nDuration: {episode['duration']}", disabled=True) if st.button("Download Episode"): audio_path, status = download_podcast_audio(episode["audio_url"], episode["title"], st.session_state.status) st.session_state.audio_path = audio_path st.session_state.status = status with tab2: st.subheader("Upload or Fetch Audio") youtube_url = st.text_input("YouTube URL") if st.button("Fetch from YouTube"): audio_path, status = fetch_audio(youtube_url, st.session_state.status) st.session_state.audio_path = audio_path st.session_state.audio_base64 = None # ✅ Clear base64 st.session_state.status = status uploaded_file = st.file_uploader("Upload Audio", type=["mp3", "wav"]) if uploaded_file: # FIX: Write to /tmp directory instead of current directory temp_audio_path = "/tmp/temp_audio.mp3" with open(temp_audio_path, "wb") as f: f.write(uploaded_file.getbuffer()) st.session_state.audio_path = temp_audio_path st.session_state.audio_base64 = None # ✅ Clear base64 with tab3: if st.session_state.audio_path and st.session_state.get("prev_audio_path") != st.session_state.audio_path: st.session_state.audio_base64 = None st.session_state.prev_audio_path = st.session_state.audio_path st.subheader("🎤 Transcription & Summary") st.markdown("---") status_placeholder = st.empty() summary_container = st.container() # ===== Audio Player and Transcript Logic ===== # If we have an audio path, prepare the base64 encoding if st.session_state.audio_path and not st.session_state.audio_base64: try: with open(st.session_state.audio_path, "rb") as f: audio_bytes = f.read() st.session_state.audio_base64 = base64.b64encode(audio_bytes).decode('utf-8') except Exception as e: st.error(f"Audio loading error: {str(e)}") def create_interactive_player(audio_base64, utterances): """ Generates a single, self-contained HTML component for the audio player and the interactive transcript. Why this works: - All HTML (player, transcript) and JavaScript logic live in the SAME context. - No more complex, failing postMessage communication between different iframes. - Highlighting is handled instantly in the browser, not by slow Python reruns. - Clicking to seek is also instant, as the JS has direct access to the player. """ # Pass utterances data to JavaScript safely utterances_json = json.dumps(utterances) html_content = f""" Interactive Player
""" return html_content # Placeholder for transcript display (either streaming text or interactive player) transcript_display = st.empty() # ===== Transcription Process ===== if st.button("🎙️ Transcribe Audio", key="transcribe_button_tab3"): if st.session_state.audio_path: status_placeholder.info("🔊 Transcribing audio... Please wait.") # Reset previous results st.session_state.utterances = [] st.session_state.transcript = "" st.session_state.transcribing = True # Set up live streaming display with transcript_display.container(): st.markdown("### 📝 Live Transcript (Streaming)") live_placeholder = st.empty() try: transcription_gen = transcribe_file( st.session_state.audio_path, vad_threshold, model_names[model_name] ) for _, all_utts in transcription_gen: st.session_state.utterances = list(all_utts) if all_utts else [] st.session_state.transcript = "\n".join( f"{text}" for start, end, text in st.session_state.utterances ) live_placeholder.markdown(st.session_state.transcript) st.session_state.transcribing = False status_placeholder.success("✅ Transcription completed! The interactive player is now active.") st.rerun() except Exception as e: status_placeholder.error(f"Transcription error: {str(e)}") st.session_state.transcribing = False else: status_placeholder.warning("⚠️ No audio file available") # ===== Summarization Process ===== if st.button("📝 Generate Summary", key="summarize_button_tab3"): if st.session_state.transcript: status_placeholder.info("🧠 Generating summary...") st.session_state.summary = "" summary_container.empty() # Clear old summary live_summary_area = st.empty() with live_summary_area.container(): st.markdown("### 📝 Live Summary (In Progress)") progress_placeholder = st.empty() summary_gen = summarize_transcript(st.session_state.transcript, llm_model, prompt_input) # Accumulate the summary in session_state for accumulated_summary in summary_gen: st.session_state.summary = accumulated_summary progress_placeholder.markdown(accumulated_summary) # Clear the "Live Summary" placeholder live_summary_area.empty() else: status_placeholder.warning("⚠️ No transcript available") # Display the interactive player if transcription is complete if st.session_state.get("audio_base64") and st.session_state.get("utterances") and not st.session_state.transcribing: component_html = create_interactive_player(st.session_state.audio_base64, st.session_state.utterances) # Calculate a dynamic height for the component estimated_height = min(600, max(200, len(st.session_state.utterances) * 50 + 100)) with transcript_display.container(): st.components.v1.html(component_html, height=estimated_height, scrolling=True) elif not st.session_state.utterances and not st.session_state.transcribing: with transcript_display.container(): st.info("No transcript available. Click 'Transcribe Audio' to generate one.") # ✅ THIS BLOCK NOW HANDLES ALL DISPLAYING # Display the final summary if it exists in the session state if st.session_state.summary: with summary_container: # Title changed for consistency st.markdown("### 📝 Final Summary") st.markdown(st.session_state.summary)