# app.py (Simplified & Robust Meeting Summarizer) import gradio as gr import threading import queue import time import wave import os from datetime import datetime from analyzer import MeetingAnalyzer from integrations import Notifier import config import speech_recognition as sr class MeetingProcessor: def __init__(self): self.analyzer = MeetingAnalyzer() self.notifier = Notifier() self.running = False self.start_time = None self.transcript_history = [] self.summary = "" self.action_items = [] self.urgent_alerts = [] self.transcript_queue = queue.Queue() self.recording_file = None self.recognizer = sr.Recognizer() def start_processing(self): if self.running: return "Already running!" self.running = True self.start_time = time.time() self.transcript_history = [] self.summary = "" self.action_items = [] self.urgent_alerts = [] # Create a unique filename for this recording timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") self.recording_file = f"meeting_{timestamp}.wav" # Start processing threads threading.Thread(target=self._audio_capture_thread, daemon=True).start() threading.Thread(target=self._transcription_thread, daemon=True).start() return "Meeting processing started! 🎤" def _audio_capture_thread(self): """Capture audio from microphone and save to file""" try: with sr.Microphone() as source: print("Adjusting for ambient noise...") self.recognizer.adjust_for_ambient_noise(source, duration=1) print("Microphone ready! Recording meeting...") # Start recording audio = self.recognizer.listen(source, timeout=None, phrase_time_limit=300) # Save audio to file with open(self.recording_file, "wb") as f: f.write(audio.get_wav_data()) # Add to queue for processing self.transcript_queue.put(self.recording_file) except Exception as e: print(f"Audio capture error: {str(e)}") def _transcription_thread(self): """Transcribe audio files as they become available""" while self.running: try: audio_file = self.transcript_queue.get(timeout=1.0) if audio_file: # Transcribe the audio file with sr.AudioFile(audio_file) as source: audio = self.recognizer.record(source) try: text = self.recognizer.recognize_google(audio) self.transcript_history.append(text) self.analyzer.process_chunk(text) # Generate interim summary if len(self.transcript_history) % 3 == 0: self.summary = self.analyzer.generate_summary() self.action_items = self.analyzer.extract_action_items() # Check for urgent items urgent_items = self.analyzer.detect_urgent_action_items() if urgent_items: self.urgent_alerts.extend(urgent_items) self.notifier.send_urgent_alert(urgent_items) except sr.UnknownValueError: print("Speech recognition could not understand audio") except sr.RequestError as e: print(f"Speech recognition error: {str(e)}") except queue.Empty: continue def stop_processing(self): if not self.running: return "Not running!" self.running = False # Generate final analysis self.summary = self.analyzer.generate_summary() self.action_items = self.analyzer.extract_action_items() # Send final report self.notifier.send_comprehensive_report( summary=self.summary, action_items=self.action_items, decisions=self.analyzer.extract_decisions(), transcript="\n".join(self.transcript_history), recipients=config.NOTIFICATION_RECIPIENTS ) return "Meeting processing stopped! Report sent. ✅" def get_current_status(self): if not self.running: return { "status": "Stopped", "duration": "00:00", "transcript": "", "summary": self.summary, "action_items": self.action_items, "alerts": self.urgent_alerts } elapsed = time.time() - self.start_time mins, secs = divmod(int(elapsed), 60) # Only show last 3 transcript entries recent_transcript = "\n".join(self.transcript_history[-3:]) return { "status": "Recording", "duration": f"{mins:02d}:{secs:02d}", "transcript": recent_transcript, "summary": self.summary if self.summary else "Summary will appear during meeting", "action_items": self.action_items, "alerts": self.urgent_alerts } # Initialize processor processor = MeetingProcessor() # Create Gradio interface with gr.Blocks(title="Meeting Notes Generator", theme="soft") as app: gr.Markdown("# 📝 Meeting Notes Generator") gr.Markdown("Start this during any meeting to automatically capture notes and action items") with gr.Row(): start_btn = gr.Button("Start Meeting", variant="primary") stop_btn = gr.Button("Stop Meeting", variant="stop") status_text = gr.Textbox(label="Status", interactive=False, value="Ready to start meeting") with gr.Row(): with gr.Column(): duration_display = gr.Textbox(label="Duration", interactive=False) transcript_box = gr.Textbox(label="Live Transcript", lines=6, interactive=False) with gr.Column(): summary_box = gr.Textbox(label="Meeting Summary", lines=6, interactive=False) with gr.Row(): action_items_box = gr.Textbox(label="Action Items", lines=4, interactive=False) alerts_box = gr.Textbox(label="Urgent Alerts", lines=4, interactive=False) # Update function for components def update_components(): current_status = processor.get_current_status() return [ current_status["duration"], current_status["transcript"], "\n".join( f"• {item['task']} (Owner: {item['owner']}, Deadline: {item['deadline']})" for item in current_status["action_items"] ), "\n".join( f"🚨 {item['task']} (Owner: {item['owner']}, Deadline: {item['deadline']})" for item in current_status["alerts"] ) if current_status["alerts"] else "No urgent alerts", current_status["summary"], current_status["status"] ] # Button actions start_btn.click( fn=processor.start_processing, inputs=[], outputs=[status_text] ) stop_btn.click( fn=processor.stop_processing, inputs=[], outputs=[status_text] ) # Manual refresh button refresh_btn = gr.Button("Refresh View", variant="secondary") refresh_btn.click( update_components, inputs=[], outputs=[ duration_display, transcript_box, action_items_box, alerts_box, summary_box, status_text ] ) # Instructions gr.Markdown("### How to use:") gr.Markdown("1. Click **Start Meeting** when your meeting begins") gr.Markdown("2. Continue with your meeting as normal") gr.Markdown("3. Click **Stop Meeting** when finished") gr.Markdown("4. View your meeting summary and action items below") gr.Markdown("5. Click **Refresh View** to update the display") # Initialize with current status app.load( update_components, inputs=[], outputs=[ duration_display, transcript_box, action_items_box, alerts_box, summary_box, status_text ] ) if __name__ == "__main__": app.launch()