VoxSum / src /server /services /asr_service.py
Luigi's picture
status in result tab got more informative
030e33b
from __future__ import annotations
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple
import soundfile as sf
from fastapi import HTTPException
from src.asr import transcribe_file
from src.diarization import (
get_diarization_stats,
init_speaker_embedding_extractor,
merge_consecutive_utterances,
merge_transcription_with_diarization,
perform_speaker_diarization_on_utterances,
)
from src.utils import sensevoice_models
from ..core.config import get_settings
from ..models.transcription import DiarizationOptions, TranscriptionRequest
settings = get_settings()
def _serialize_utterance(utt: Tuple[float, float, str], speaker: Optional[int] = None) -> Dict[str, object]:
start, end, text = utt
payload: Dict[str, object] = {
"start": round(float(start), 3),
"end": round(float(end), 3),
"text": text,
}
if speaker is not None:
payload["speaker"] = int(speaker)
return payload
def _prepare_model_name(options: TranscriptionRequest) -> str:
if options.backend == "sensevoice":
# sensevoice_models stores map from friendly name to repo id
return sensevoice_models.get(options.model_name, options.model_name)
return options.model_name
def iter_transcription_events(
audio_path: Path,
audio_url: str,
options: TranscriptionRequest,
) -> Iterable[Dict[str, object]]:
model_name = _prepare_model_name(options)
try:
generator = transcribe_file(
audio_path=str(audio_path),
vad_threshold=options.vad_threshold,
model_name=model_name,
backend=options.backend,
language=options.language,
textnorm=options.textnorm,
)
yield {
"type": "ready",
"audioUrl": audio_url,
"backend": options.backend,
"model": model_name,
}
yield {
"type": "status",
"message": "Transcribing audio...",
}
final_utterances: List[Tuple[float, float, str]] = []
for current_utterance, all_utterances, progress in generator:
if current_utterance:
start, end, text = current_utterance
yield {
"type": "utterance",
"utterance": _serialize_utterance((start, end, text)),
"index": len(all_utterances) - 1,
"progress": round(progress, 1),
}
final_utterances = list(all_utterances)
# Final event with transcript and optional diarization
diarization_payload = None
if options.diarization.enable:
yield {
"type": "status",
"message": "Performing speaker diarization...",
}
diarization_gen = _run_diarization(audio_path, final_utterances, options.diarization)
for event in diarization_gen:
if event["type"] == "progress":
yield event
elif event["type"] == "result":
diarization_payload = event["payload"]
break
transcript_text = "\n".join([utt[2] for utt in final_utterances])
yield {
"type": "complete",
"utterances": [_serialize_utterance(utt) for utt in final_utterances],
"transcript": transcript_text,
"diarization": diarization_payload,
}
except Exception as exc: # pragma: no cover
raise HTTPException(status_code=500, detail=f"Transcription failed: {exc}")
def _run_diarization(
audio_path: Path,
utterances: List[Tuple[float, float, str]],
options: DiarizationOptions,
):
if not utterances:
yield {"type": "result", "payload": None}
return
extractor_result = init_speaker_embedding_extractor(
cluster_threshold=options.cluster_threshold,
num_speakers=options.num_speakers,
)
if not extractor_result:
yield {"type": "result", "payload": None}
return
embedding_extractor, config_dict = extractor_result
audio, sample_rate = sf.read(str(audio_path), dtype="float32")
if audio.ndim > 1:
audio = audio.mean(axis=1)
if sample_rate != 16000:
# Lazy import to avoid mandatory dependency during module import
from scipy.signal import resample
target_num_samples = int(len(audio) * 16000 / sample_rate)
audio = resample(audio, target_num_samples)
sample_rate = 16000
diarization_gen = perform_speaker_diarization_on_utterances(
audio=audio,
sample_rate=sample_rate,
utterances=utterances,
embedding_extractor=embedding_extractor,
config_dict=config_dict,
progress_callback=None,
)
diarization_segments = None
try:
while True:
item = next(diarization_gen)
if isinstance(item, float):
yield {"type": "progress", "stage": "diarization", "progress": round(item * 100, 1)}
else:
diarization_segments = item
break
except StopIteration as e:
diarization_segments = e.value
if not diarization_segments:
yield {"type": "result", "payload": None}
return
merged = merge_transcription_with_diarization(utterances, diarization_segments)
merged = merge_consecutive_utterances(merged, max_gap=1.0)
stats = get_diarization_stats(merged)
yield {"type": "result", "payload": {
"utterances": [
_serialize_utterance((start, end, text), speaker)
for start, end, text, speaker in merged
],
"stats": stats,
}}