File size: 3,073 Bytes
9555edd
 
 
 
 
 
c029cee
9555edd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c029cee
9555edd
c029cee
9555edd
 
 
 
 
 
 
 
 
 
 
 
 
c029cee
 
9555edd
 
 
c029cee
 
 
 
 
9555edd
c029cee
9555edd
 
c029cee
9555edd
c029cee
9555edd
 
 
c029cee
 
 
9555edd
c029cee
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# asr.py
import numpy as np
import soundfile as sf
from scipy.signal import resample_poly
from silero_vad import load_silero_vad, VADIterator
from moonshine_onnx import MoonshineOnnxModel, load_tokenizer
from utils import s2tw_converter
import re

SAMPLING_RATE = 16000
CHUNK_SIZE = 512

tokenizer = load_tokenizer()

def clean_transcript(text):
    text = re.sub(r'[�\uFFFD��]', '', text)
    text = re.sub(r'([\u4e00-\u9fa5])\1{2,}', r'\1', text)
    text = re.sub(r'([\u4e00-\u9fa5]) ([ \u4e00-\u9fa5])', r'\1\2', text)
    return text

def transcribe_file(audio_path, vad_threshold, model_name):
    vad_model = load_silero_vad(onnx=True)
    vad_iterator = VADIterator(model=vad_model, sampling_rate=SAMPLING_RATE, threshold=vad_threshold)
    model = MoonshineOnnxModel(model_name=f"moonshine/{model_name}")

    wav, orig_sr = sf.read(audio_path)
    if orig_sr != SAMPLING_RATE:
        gcd = np.gcd(int(orig_sr), SAMPLING_RATE)
        up = SAMPLING_RATE // gcd
        down = orig_sr // gcd
        wav = resample_poly(wav, up, down)
    if wav.ndim > 1:
        wav = wav.mean(axis=1)

    utterances = []  # Store all utterances (start, end, text)
    speech_buffer = np.array([], dtype=np.float32)
    segment_start = 0.0  # Track start time of current segment

    i = 0
    while i < len(wav):
        chunk = wav[i:i + CHUNK_SIZE]
        if len(chunk) < CHUNK_SIZE:
            chunk = np.pad(chunk, (0, CHUNK_SIZE - len(chunk)), mode='constant')
        i += CHUNK_SIZE

        speech_dict = vad_iterator(chunk)
        speech_buffer = np.concatenate([speech_buffer, chunk])

        if speech_dict:
            if "end" in speech_dict:
                # Calculate timestamps
                segment_end = i / SAMPLING_RATE
                text = model.generate(speech_buffer[np.newaxis, :].astype(np.float32))
                text = tokenizer.decode_batch(text)[0].strip()
                if text:
                    cleaned_text = clean_transcript(s2tw_converter.convert(text))
                    utterances.append((segment_start, segment_end, cleaned_text))
                    # Yield current utterance + all accumulated utterances
                    yield utterances[-1], utterances.copy()
                # Reset for next segment
                speech_buffer = np.array([], dtype=np.float32)
                segment_start = i / SAMPLING_RATE  # Start of next segment
                vad_iterator.reset_states()

    # Process final segment
    if len(speech_buffer) > SAMPLING_RATE * 0.5:
        segment_end = len(wav) / SAMPLING_RATE
        text = model.generate(speech_buffer[np.newaxis, :].astype(np.float32))
        text = tokenizer.decode_batch(text)[0].strip()
        if text:
            cleaned_text = clean_transcript(s2tw_converter.convert(text))
            utterances.append((segment_start, segment_end, cleaned_text))
            yield utterances[-1], utterances.copy()

    # Final yield with all utterances
    if utterances:
        yield None, utterances
    else:
        yield None, [(-1, -1, "No speech detected")]