Luigi's picture
Add speaker name detection feature
77e98bd
from __future__ import annotations
import json
from pathlib import Path
from fastapi import APIRouter, File, Form, HTTPException, UploadFile
from fastapi.responses import StreamingResponse
from ..models.export import SummaryExportRequest, TranscriptExportRequest
from ..models.summarization import SummaryRequest, SpeakerNameDetectionRequest
from ..models.transcription import TranscriptionRequest
from ..core.config import get_settings
from ..services import config_service, export_service, podcast_service
from ..services.asr_service import iter_transcription_events
from ..services.file_service import save_upload_file, store_audio_file
from ..services.summarization_service import iter_summary_events
router = APIRouter(prefix="/api")
@router.get("/config/models")
def fetch_model_catalog():
return config_service.get_model_catalog()
@router.post("/transcribe")
def transcribe_audio(
audio: UploadFile | None = File(default=None),
options: str = Form("{}"),
source: str | None = Form(default=None),
):
payload = TranscriptionRequest(**json.loads(options or "{}"))
cleanup_temp = False
if audio is not None:
temp_path = save_upload_file(audio)
_, audio_url = store_audio_file(temp_path)
cleanup_temp = True
elif source:
filename = Path(source).name
candidate_path = get_settings().audio_dir / filename
if not candidate_path.exists():
raise HTTPException(status_code=404, detail="Audio source not found")
temp_path = candidate_path
audio_url = source
else:
raise HTTPException(status_code=400, detail="Either audio upload or source is required")
def event_stream():
try:
for event in iter_transcription_events(temp_path, audio_url, payload):
yield json.dumps(event, ensure_ascii=False) + "\n"
finally:
if cleanup_temp:
temp_path.unlink(missing_ok=True)
return StreamingResponse(event_stream(), media_type="application/x-ndjson")
@router.post("/summarize")
def summarize_text(request: SummaryRequest):
def event_stream():
for event in iter_summary_events(request):
yield json.dumps(event, ensure_ascii=False) + "\n"
return StreamingResponse(event_stream(), media_type="application/x-ndjson")
@router.get("/podcast/search")
def search_podcast(query: str):
return podcast_service.search_series(query)
@router.get("/podcast/episodes")
def get_podcast_episodes(feed_url: str):
return podcast_service.list_episodes(feed_url)
@router.post("/podcast/download")
def download_episode(payload: dict):
audio_url = payload.get("audioUrl") or payload.get("audio_url")
title = payload.get("title", "Episode")
if not audio_url:
raise HTTPException(status_code=400, detail="audioUrl is required")
return podcast_service.download_episode(audio_url, title)
@router.post("/youtube/fetch")
def fetch_youtube_audio(payload: dict):
url = payload.get("url") or payload.get("youtubeUrl")
if not url:
raise HTTPException(status_code=400, detail="url is required")
return podcast_service.fetch_youtube_audio(url)
@router.post("/export/transcript")
def export_transcript(payload: TranscriptExportRequest):
content, filename, mime_type = export_service.generate_transcript_export(payload)
# Properly encode filename for Content-Disposition header (RFC 6266)
import urllib.parse
encoded_filename = urllib.parse.quote(filename)
content_disposition = f"attachment; filename*=UTF-8''{encoded_filename}"
return StreamingResponse(
iter([content.encode("utf-8")]),
media_type=mime_type,
headers={"Content-Disposition": content_disposition},
)
@router.post("/export/summary")
def export_summary(payload: SummaryExportRequest):
content, filename, mime_type = export_service.generate_summary_export(payload)
# Properly encode filename for Content-Disposition header (RFC 6266)
import urllib.parse
encoded_filename = urllib.parse.quote(filename)
content_disposition = f"attachment; filename*=UTF-8''{encoded_filename}"
return StreamingResponse(
iter([content.encode("utf-8")]),
media_type=mime_type,
headers={"Content-Disposition": content_disposition},
)
@router.post("/detect-speaker-names")
def detect_speaker_names(request: SpeakerNameDetectionRequest):
from src.summarization import detect_speaker_names as detect_names
result = detect_names(request.utterances, request.llm_model)
return result