|
|
|
import sounddevice as sd |
|
import numpy as np |
|
import wave |
|
import os |
|
from datetime import datetime |
|
import config |
|
|
|
class AudioRecorder: |
|
def __init__(self): |
|
self.is_recording = False |
|
self.sample_rate = config.SAMPLE_RATE |
|
self.filename = f"meeting_{datetime.now().strftime('%Y%m%d_%H%M%S')}.wav" |
|
self.audio_buffer = [] |
|
self.stream = None |
|
|
|
def start_recording(self): |
|
self.is_recording = True |
|
self.audio_buffer = [] |
|
|
|
def callback(indata, frames, time, status): |
|
if status: |
|
print(f"Audio error: {status}") |
|
if self.is_recording: |
|
self.audio_buffer.append(indata.copy()) |
|
|
|
self.stream = sd.InputStream( |
|
samplerate=self.sample_rate, |
|
channels=config.CHANNELS, |
|
callback=callback, |
|
dtype='int16' |
|
) |
|
self.stream.start() |
|
return True |
|
|
|
def get_audio_chunk(self): |
|
|
|
|
|
if self.is_recording and self.audio_buffer: |
|
return self.audio_buffer[-1] |
|
return None |
|
|
|
def stop_recording(self): |
|
if self.is_recording: |
|
self.is_recording = False |
|
self.stream.stop() |
|
self.stream.close() |
|
self._save_recording() |
|
return os.path.join(config.RECORDINGS_DIR, self.filename) |
|
return None |
|
|
|
def _save_recording(self): |
|
os.makedirs(config.RECORDINGS_DIR, exist_ok=True) |
|
audio_data = np.concatenate(self.audio_buffer, axis=0) |
|
with wave.open(os.path.join(config.RECORDINGS_DIR, self.filename), 'wb') as wf: |
|
wf.setnchannels(config.CHANNELS) |
|
wf.setsampwidth(2) |
|
wf.setframerate(self.sample_rate) |
|
wf.writeframes(audio_data.tobytes()) |