|
import speech_recognition as sr |
|
import numpy as np |
|
import io |
|
import config |
|
|
|
class SpeechTranscriber: |
|
def __init__(self): |
|
self.recognizer = sr.Recognizer() |
|
self.recognizer.energy_threshold = config.ENERGY_THRESHOLD |
|
self.recognizer.dynamic_energy_threshold = config.DYNAMIC_ENERGY_THRESHOLD |
|
self.recognizer.pause_threshold = config.PAUSE_THRESHOLD |
|
self.audio_buffer = bytearray() |
|
|
|
def add_audio_chunk(self, audio_chunk): |
|
|
|
self.audio_buffer.extend(audio_chunk.tobytes()) |
|
|
|
def get_transcript_chunk(self): |
|
|
|
min_bytes = config.SAMPLE_RATE * config.MIN_PROCESSING_DURATION * 2 |
|
if len(self.audio_buffer) < min_bytes: |
|
return None |
|
|
|
|
|
audio_data = sr.AudioData( |
|
bytes(self.audio_buffer), |
|
config.SAMPLE_RATE, |
|
2 |
|
) |
|
|
|
try: |
|
|
|
text = self.recognizer.recognize_google(audio_data) |
|
|
|
self.audio_buffer = bytearray() |
|
return text |
|
except sr.UnknownValueError: |
|
|
|
self.audio_buffer = bytearray() |
|
return None |
|
except sr.RequestError as e: |
|
print(f"Speech recognition error: {str(e)}") |
|
return None |