from __future__ import annotations import json from pathlib import Path from fastapi import APIRouter, File, Form, HTTPException, UploadFile from fastapi.responses import StreamingResponse from ..models.export import SummaryExportRequest, TranscriptExportRequest from ..models.summarization import SummaryRequest, SpeakerNameDetectionRequest from ..models.transcription import TranscriptionRequest from ..core.config import get_settings from ..services import config_service, export_service, podcast_service from ..services.asr_service import iter_transcription_events from ..services.file_service import save_upload_file, store_audio_file from ..services.summarization_service import iter_summary_events router = APIRouter(prefix="/api") @router.get("/config/models") def fetch_model_catalog(): return config_service.get_model_catalog() @router.post("/transcribe") def transcribe_audio( audio: UploadFile | None = File(default=None), options: str = Form("{}"), source: str | None = Form(default=None), ): payload = TranscriptionRequest(**json.loads(options or "{}")) cleanup_temp = False if audio is not None: temp_path = save_upload_file(audio) _, audio_url = store_audio_file(temp_path) cleanup_temp = True elif source: filename = Path(source).name candidate_path = get_settings().audio_dir / filename if not candidate_path.exists(): raise HTTPException(status_code=404, detail="Audio source not found") temp_path = candidate_path audio_url = source else: raise HTTPException(status_code=400, detail="Either audio upload or source is required") def event_stream(): try: for event in iter_transcription_events(temp_path, audio_url, payload): yield json.dumps(event, ensure_ascii=False) + "\n" finally: if cleanup_temp: temp_path.unlink(missing_ok=True) return StreamingResponse(event_stream(), media_type="application/x-ndjson") @router.post("/summarize") def summarize_text(request: SummaryRequest): def event_stream(): for event in iter_summary_events(request): yield json.dumps(event, ensure_ascii=False) + "\n" return StreamingResponse(event_stream(), media_type="application/x-ndjson") @router.get("/podcast/search") def search_podcast(query: str): return podcast_service.search_series(query) @router.get("/podcast/episodes") def get_podcast_episodes(feed_url: str): return podcast_service.list_episodes(feed_url) @router.post("/podcast/download") def download_episode(payload: dict): audio_url = payload.get("audioUrl") or payload.get("audio_url") title = payload.get("title", "Episode") if not audio_url: raise HTTPException(status_code=400, detail="audioUrl is required") return podcast_service.download_episode(audio_url, title) @router.post("/youtube/fetch") def fetch_youtube_audio(payload: dict): url = payload.get("url") or payload.get("youtubeUrl") if not url: raise HTTPException(status_code=400, detail="url is required") return podcast_service.fetch_youtube_audio(url) @router.post("/export/transcript") def export_transcript(payload: TranscriptExportRequest): content, filename, mime_type = export_service.generate_transcript_export(payload) # Properly encode filename for Content-Disposition header (RFC 6266) import urllib.parse encoded_filename = urllib.parse.quote(filename) content_disposition = f"attachment; filename*=UTF-8''{encoded_filename}" return StreamingResponse( iter([content.encode("utf-8")]), media_type=mime_type, headers={"Content-Disposition": content_disposition}, ) @router.post("/export/summary") def export_summary(payload: SummaryExportRequest): content, filename, mime_type = export_service.generate_summary_export(payload) # Properly encode filename for Content-Disposition header (RFC 6266) import urllib.parse encoded_filename = urllib.parse.quote(filename) content_disposition = f"attachment; filename*=UTF-8''{encoded_filename}" return StreamingResponse( iter([content.encode("utf-8")]), media_type=mime_type, headers={"Content-Disposition": content_disposition}, ) @router.post("/detect-speaker-names") def detect_speaker_names(request: SpeakerNameDetectionRequest): from src.summarization import detect_speaker_names as detect_names result = detect_names(request.utterances, request.llm_model) return result