from __future__ import annotations import shutil import uuid from pathlib import Path from typing import Tuple from fastapi import UploadFile from ..core.config import get_settings settings = get_settings() def cleanup_old_audio_files(max_files: int | None = None) -> None: """Remove old audio files from the static directory to save space.""" max_files = max_files or settings.max_audio_files audio_dir = settings.audio_dir audio_dir.mkdir(parents=True, exist_ok=True) files = sorted(audio_dir.glob("*") , key=lambda f: f.stat().st_mtime if f.exists() else 0) if len(files) <= max_files: return for old_file in files[:-max_files]: try: old_file.unlink() except OSError: continue def save_upload_file(upload: UploadFile) -> Path: """Persist an UploadFile to the temporary directory and return its path.""" tmp_dir = settings.tmp_dir tmp_dir.mkdir(parents=True, exist_ok=True) suffix = Path(upload.filename or "audio").suffix or ".mp3" temp_path = tmp_dir / f"upload_{uuid.uuid4().hex}{suffix}" with temp_path.open("wb") as buffer: shutil.copyfileobj(upload.file, buffer) return temp_path def store_audio_file(audio_path: Path, prefix: str | None = None) -> Tuple[Path, str]: """Copy an audio file to the public static folder and return the new path and URL.""" cleanup_old_audio_files() prefix = prefix or "audio" suffix = audio_path.suffix or ".mp3" dest_filename = f"{prefix}_{uuid.uuid4().hex}{suffix}" dest_path = settings.audio_dir / dest_filename shutil.copy2(audio_path, dest_path) url = f"/media/{dest_filename}" return dest_path, url