|
|
|
|
|
|
|
import gradio as gr |
|
import torch |
|
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline |
|
import numpy as np |
|
import soundfile as sf |
|
import logging |
|
import time |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
device = "cuda:0" if torch.cuda.is_available() else "cpu" |
|
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 |
|
logger.info(f"Using device: {device}") |
|
|
|
|
|
stt_model_id = "openai/whisper-tiny" |
|
summarizer_model_id = "sshleifer/distilbart-cnn-6-6" |
|
|
|
|
|
logger.info("Loading STT model...") |
|
stt_model = AutoModelForSpeechSeq2Seq.from_pretrained( |
|
stt_model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True |
|
) |
|
stt_model.to(device) |
|
processor = AutoProcessor.from_pretrained(stt_model_id) |
|
stt_pipeline = pipeline( |
|
"automatic-speech-recognition", |
|
model=stt_model, |
|
tokenizer=processor.tokenizer, |
|
feature_extractor=processor.feature_extractor, |
|
max_new_tokens=128, |
|
chunk_length_s=30, |
|
batch_size=16, |
|
torch_dtype=torch_dtype, |
|
device=device, |
|
) |
|
|
|
logger.info("Loading Summarization pipeline...") |
|
summarizer = pipeline( |
|
"summarization", |
|
model=summarizer_model_id, |
|
device=device |
|
) |
|
|
|
|
|
meetings_data = {"google_meet_session": {"transcript": "", "summary": "", "last_summary_time": 0}} |
|
|
|
|
|
def format_summary_as_bullets(summary_text): |
|
if not summary_text: |
|
return "No summary generated yet." |
|
sentences = summary_text.replace(". ", ".\n- ").split('\n') |
|
bullet_summary = "- " + "\n".join(sentences).strip() |
|
return "\n".join([line for line in bullet_summary.split('\n') if line.strip() not in ['-', '']]) |
|
|
|
def process_audio(audio, meeting_id="google_meet_session"): |
|
"""Process audio chunks for transcription and summarization""" |
|
try: |
|
if audio is None: |
|
return "No audio input.", "No summary generated." |
|
|
|
|
|
|
|
if isinstance(audio, tuple) and len(audio) == 2: |
|
sample_rate, audio_data = audio |
|
else: |
|
|
|
try: |
|
audio_data, sample_rate = sf.read(audio) |
|
except Exception as e: |
|
logger.error(f"Error reading audio file: {e}") |
|
return "Error processing audio input.", meetings_data[meeting_id].get("summary", "No summary available.") |
|
|
|
|
|
|
|
if len(audio_data.shape) > 1 and audio_data.shape[1] > 1: |
|
|
|
audio_data = np.mean(audio_data, axis=1) |
|
|
|
|
|
audio_chunk = audio_data.astype(np.float32) |
|
|
|
|
|
logger.info(f"Audio chunk shape: {audio_chunk.shape}, Sample rate: {sample_rate}") |
|
logger.info(f"Audio min/max values: {np.min(audio_chunk)}/{np.max(audio_chunk)}") |
|
|
|
|
|
if np.max(np.abs(audio_chunk)) < 0.01: |
|
logger.info("Audio chunk contains mostly silence, skipping transcription") |
|
return meetings_data[meeting_id].get("transcript", ""), meetings_data[meeting_id].get("summary", "No summary generated yet.") |
|
|
|
|
|
try: |
|
result = stt_pipeline({"sampling_rate": sample_rate, "raw": audio_chunk}) |
|
new_text = result["text"].strip() if result["text"] else "" |
|
logger.info(f"Transcription: '{new_text}'") |
|
except Exception as e: |
|
logger.error(f"Error during transcription: {e}") |
|
return meetings_data[meeting_id].get("transcript", ""), meetings_data[meeting_id].get("summary", "Transcription failed.") |
|
|
|
|
|
meeting = meetings_data[meeting_id] |
|
if meeting["transcript"] and new_text: |
|
meeting["transcript"] += " " + new_text |
|
else: |
|
meeting["transcript"] = new_text |
|
|
|
|
|
current_time = time.time() |
|
if (len(meeting["transcript"]) > 50 and |
|
(current_time - meeting["last_summary_time"] > 30)): |
|
try: |
|
summary_result = summarizer(meeting["transcript"], max_length=150, min_length=30, do_sample=False) |
|
if summary_result and isinstance(summary_result, list): |
|
raw_summary = summary_result[0]['summary_text'] |
|
meeting["summary"] = format_summary_as_bullets(raw_summary) |
|
meeting["last_summary_time"] = current_time |
|
else: |
|
logger.warning("Summary generation returned unexpected format") |
|
except Exception as e: |
|
logger.error(f"Error during summarization: {e}") |
|
|
|
|
|
return meeting["transcript"], meeting["summary"] |
|
|
|
except Exception as general_error: |
|
|
|
logger.error(f"Unexpected error in process_audio: {general_error}") |
|
return "Error processing audio. Please check logs.", meetings_data[meeting_id].get("summary", "No summary available.") |
|
|
|
def open_google_meet(meet_link): |
|
"""Generate HTML for opening Google Meet link""" |
|
if not meet_link or "meet.google.com" not in meet_link: |
|
return "Please enter a valid Google Meet link (e.g., https://meet.google.com/xyz-abcd-123)." |
|
return f'<a href="{meet_link}" target="_blank">Click here to join your Google Meet</a><br><br><b>Instructions:</b> After clicking the link, open it in a new tab, join the meeting, and use the microphone below to capture audio for real-time notes.<br><br><b>Note:</b> This app captures audio from your microphone, not directly from Google Meet. Position your microphone close to your speakers for best results.' |
|
|
|
def clear_session(meeting_id="google_meet_session"): |
|
"""Reset the meeting data""" |
|
meetings_data[meeting_id] = {"transcript": "", "summary": "", "last_summary_time": 0} |
|
return "Session cleared. Ready for a new meeting.", "" |
|
|
|
|
|
def create_gradio_interface(): |
|
with gr.Blocks() as demo: |
|
gr.Markdown("# Real-Time Google Meet Notes Generator") |
|
gr.Markdown("Enter your Google Meet link, join the meeting, and get real-time notes using your microphone.") |
|
|
|
with gr.Row(): |
|
meet_link = gr.Textbox(label="Google Meet Link", placeholder="https://meet.google.com/xyz-abcd-123") |
|
join_button = gr.Button("Join Meeting") |
|
|
|
join_output = gr.HTML(label="Meeting Link Status") |
|
|
|
with gr.Row(): |
|
audio_input = gr.Audio( |
|
sources=["microphone"], |
|
type="numpy", |
|
label="Live Microphone Input (Start speaking in your meeting)", |
|
streaming=True, |
|
autoplay=True |
|
) |
|
clear_button = gr.Button("Clear Session") |
|
|
|
with gr.Row(): |
|
transcript_output = gr.Textbox(label="Real-Time Transcription", lines=10) |
|
summary_output = gr.Textbox(label="Bullet Point Notes (Updates every ~30s)", lines=10) |
|
|
|
|
|
join_button.click( |
|
fn=open_google_meet, |
|
inputs=[meet_link], |
|
outputs=[join_output] |
|
) |
|
|
|
|
|
clear_button.click( |
|
fn=clear_session, |
|
inputs=[], |
|
outputs=[transcript_output, summary_output] |
|
) |
|
|
|
|
|
audio_input.stream( |
|
fn=process_audio, |
|
inputs=[audio_input], |
|
outputs=[transcript_output, summary_output] |
|
) |
|
|
|
|
|
gr.Markdown(""" |
|
## Instructions: |
|
1. Enter your Google Meet link and click "Join Meeting" |
|
2. In the new tab, join your meeting |
|
3. Allow microphone access for this app (important!) |
|
4. Position your microphone to clearly capture the meeting audio |
|
5. The app will transcribe what it hears and generate notes automatically |
|
|
|
## Troubleshooting: |
|
- If no transcription appears, make sure your microphone is capturing the meeting audio |
|
- Try positioning your device's microphone closer to your speakers |
|
- If needed, click "Clear Session" to reset the transcript and summary |
|
- For best results, use headphones for the meeting and keep the microphone close to your speakers |
|
""") |
|
|
|
return demo |
|
|
|
|
|
demo = create_gradio_interface() |
|
demo.launch(share=True) |