import os import uuid import hashlib import subprocess import shutil import signal import re import math from datetime import datetime, timedelta from pathlib import Path from typing import Dict, Any, List, Union from enum import Enum from fastapi import FastAPI, File, UploadFile, BackgroundTasks, HTTPException, Path as FastApiPath, Depends, Query, status from fastapi.responses import JSONResponse, FileResponse from pydantic import BaseModel, Field, validator from apscheduler.schedulers.asyncio import AsyncIOScheduler # --- Конфигурация и глобальные переменные --- # Ключ API для доступа API_KEY = os.getenv("API_KEY") # --- ИЗМЕНЕНИЕ: Параметры для VFR -> CFR вынесены сюда --- # Настройки для конвертации видео с переменной частотой кадров (VFR) # в видео с постоянной частотой кадров (CFR). # Этот шаг выполняется для стабилизации видео перед основным кодированием. VFR_TO_CFR_CRF = 17 # Качество (чем ниже, тем лучше). 17 - очень высокое. VFR_TO_CFR_PRESET = 4 # Скорость (чем ниже, тем качественнее, но медленнее). # Директории для хранения данных BASE_DATA_DIR = Path("data") UPLOADS_DIR = BASE_DATA_DIR / "uploads" OUTPUTS_DIR = BASE_DATA_DIR / "outputs" LOGS_DIR = BASE_DATA_DIR / "logs" # "Базы данных" в памяти FILES_DB: Dict[str, Dict[str, Any]] = {} TASKS_DB: Dict[str, Dict[str, Any]] = {} # Настройки для очистки MAX_AGE_HOURS = 24 MAX_TOTAL_SIZE_GB = 50 CLEANUP_INTERVAL_MINUTES = 30 # Планировщик для задачи очистки scheduler = AsyncIOScheduler() # --- Зависимость для проверки токена --- async def verify_token(token: str = Query(..., description="Токен для доступа к API.")): """Проверяет, соответствует ли предоставленный токен ключу API сервера.""" if not API_KEY: print("КРИТИЧЕСКАЯ ОШИБКА: Переменная окружения API_KEY не установлена!") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Ключ API не сконфигурирован на сервере." ) if token != API_KEY: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Неверный или отсутствующий токен API." ) # --- Перечисления и константы --- class Encoder(str, Enum): av1 = "svt-av1" x265 = "libx265" x264 = "libx264" class X265Preset(str, Enum): ultrafast = "ultrafast" superfast = "superfast" veryfast = "veryfast" faster = "faster" fast = "fast" medium = "medium" slow = "slow" slower = "slower" veryslow = "veryslow" placebo = "placebo" # --- Модели данных для API (Pydantic) --- class UploadResponse(BaseModel): message: str file_hash: str file_path: str original_filename: str is_new: bool class BaseTaskRequest(BaseModel): input_hash: str = Field(..., description="SHA256 хеш загруженного файла.") encoder: Encoder = Field(Encoder.av1, description="Выбор кодека.") extra_options: str = Field("", description="Дополнительные опции для ab-av1.") class AutoEncodeRequest(BaseTaskRequest): preset: Union[int, X265Preset] = Field(3, description="Пресет для кодировщика. Для AV1: 0-12 (int). Для x265: 'slow', 'medium' и т.д. (str).") min_vmaf: int = Field(96, description="Целевой минимальный VMAF.", ge=0, le=100) @validator('preset') def preset_must_match_encoder(cls, v, values): encoder = values.get('encoder') if encoder == Encoder.av1 and not isinstance(v, int): raise ValueError('Для AV1 пресет должен быть числом (0-12).') if encoder == Encoder.x265 and not isinstance(v, X265Preset): raise ValueError("Для x265 пресет должен быть строкой (например, 'slow').") if isinstance(v, int) and (v < 0 or v > 12): raise ValueError('Пресет для AV1 должен быть в диапазоне от 0 до 12.') return v class CrfSearchRequest(AutoEncodeRequest): pass class EncodeRequest(BaseTaskRequest): preset: Union[int, X265Preset] = Field(3, description="Пресет для кодировщика. Для AV1: 0-12 (int). Для x265: 'slow', 'medium' и т.д. (str).") crf: float = Field(20, description="Значение CRF (Constant Rate Factor). Допускаются дробные значения.") @validator('preset') def preset_must_match_encoder(cls, v, values): encoder = values.get('encoder') if encoder == Encoder.av1 and not isinstance(v, int): raise ValueError('Для AV1 пресет должен быть числом (0-12).') if encoder == Encoder.x265 and not isinstance(v, X265Preset): raise ValueError("Для x265 пресет должен быть строкой (например, 'slow').") if isinstance(v, int) and (v < 0 or v > 12): raise ValueError('Пресет для AV1 должен быть в диапазоне от 0 до 12.') return v class TaskStatusResponse(BaseModel): task_id: str task_type: str status: str input_hash: str command: str created_at: datetime started_at: datetime | None finished_at: datetime | None output_path: str | None log_path: str last_log_line: str | None encoder: Encoder | None = None preset: Union[int, str] | None = None crf: float | None = None min_vmaf: int | None = None extra_options: str | None = None class TaskCreateResponse(BaseModel): message: str task_id: str status_url: str log_url: str manage_url: str # --- Логика очистки (Janitor) --- async def cleanup_files(): """Периодическая задача для удаления старых файлов и контроля размера хранилища.""" print(f"[{datetime.utcnow()}] Запуск задачи очистки...") now = datetime.utcnow() max_age_limit = now - timedelta(hours=MAX_AGE_HOURS) max_size_bytes = MAX_TOTAL_SIZE_GB * (1024**3) all_managed_files: List[Path] = [] for dir_path in [UPLOADS_DIR, OUTPUTS_DIR, LOGS_DIR]: all_managed_files.extend(list(dir_path.glob("**/*"))) active_files = set() for task_id, task in TASKS_DB.items(): if task['status'] in ['pending', 'running']: if task['input_hash'] in FILES_DB: active_files.add(Path(FILES_DB[task['input_hash']]['path'])) if task.get('output_path'): active_files.add(Path(task['output_path'])) if task.get('log_path'): active_files.add(Path(task['log_path'])) files_to_delete = set() files_with_meta = [] for file_path in all_managed_files: if not file_path.is_file() or file_path in active_files: continue try: mtime = datetime.utcfromtimestamp(file_path.stat().st_mtime) if mtime < max_age_limit: files_to_delete.add(file_path) else: files_with_meta.append({'path': file_path, 'mtime': mtime, 'size': file_path.stat().st_size}) except FileNotFoundError: continue current_total_size = sum(f.stat().st_size for f in all_managed_files if f.is_file()) if current_total_size > max_size_bytes: files_with_meta.sort(key=lambda x: x['mtime']) size_to_free = current_total_size - max_size_bytes freed_size = 0 for file_meta in files_with_meta: if freed_size >= size_to_free: break if file_meta['path'] not in files_to_delete and file_meta['path'] not in active_files: files_to_delete.add(file_meta['path']) freed_size += file_meta['size'] deleted_count = 0 for file_path in files_to_delete: try: file_path.unlink() deleted_count += 1 if str(file_path).startswith(str(UPLOADS_DIR)): file_hash_to_del = file_path.stem FILES_DB.pop(file_hash_to_del, None) elif str(file_path).startswith(str(OUTPUTS_DIR)) or str(file_path).startswith(str(LOGS_DIR)): task_id_to_del = file_path.stem TASKS_DB.pop(task_id_to_del, None) except Exception as e: print(f"Ошибка при удалении файла {file_path}: {e}") if deleted_count > 0: print(f"Очистка завершена. Удалено {deleted_count} файлов.") # --- Инициализация FastAPI приложения --- app = FastAPI( title="ab-av1 API Server", description="REST API для асинхронного кодирования видео. **Интерактивная документация: `/docs`**.", version="2.8.0", # Версия обновлена ) @app.on_event("startup") async def on_startup(): """Создаем директории, проверяем API_KEY и запускаем планировщик.""" if not API_KEY: print("КРИТИЧЕСКАЯ ОШИБКА: Переменная окружения API_KEY не установлена. Сервер не сможет авторизовать запросы.") else: print("Ключ API успешно загружен.") for dir_path in [UPLOADS_DIR, OUTPUTS_DIR, LOGS_DIR]: dir_path.mkdir(parents=True, exist_ok=True) scheduler.add_job(cleanup_files, 'interval', minutes=CLEANUP_INTERVAL_MINUTES) scheduler.start() print("Планировщик очистки файлов запущен.") @app.on_event("shutdown") async def on_shutdown(): """Останавливаем планировщик и активные процессы при выключении.""" scheduler.shutdown() print("Планировщик остановлен.") for task_id, task in TASKS_DB.items(): if task.get('process') and task['status'] == 'running': print(f"Остановка процесса для задачи {task_id}...") task['process'].terminate() try: task['process'].wait(timeout=5) except subprocess.TimeoutExpired: task['process'].kill() # --- Вспомогательные функции --- def run_simple_task(task_id: str, command: list[str], log_path: Path): """Выполняет простую одношаговую команду в фоне (для encode и crf-search).""" task = TASKS_DB[task_id] task["started_at"] = datetime.utcnow() task["status"] = "running" try: process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding='utf-8', bufsize=1, preexec_fn=os.setsid ) task['process'] = process with open(log_path, "w", encoding='utf-8') as log_file: log_file.write(f"Executing command: {' '.join(command)}\n\n") for line in iter(process.stdout.readline, ''): log_file.write(line) log_file.flush() task["last_log_line"] = line.strip() process.wait() if task.get('was_cancelled'): task["status"] = "cancelled" elif process.returncode == 0: task["status"] = "completed" else: task["status"] = "failed" task["last_log_line"] = f"Процесс завершился с кодом {process.returncode}" except Exception as e: task["status"] = "failed" error_message = f"Произошло исключение Python: {e}" task["last_log_line"] = error_message with open(log_path, "a", encoding='utf-8') as log_file: log_file.write(f"\n--- PYTHON EXCEPTION ---\n{error_message}") finally: task["finished_at"] = datetime.utcnow() task.pop('process', None) def run_auto_encode_workflow(task_id: str, request: AutoEncodeRequest): """ Выполняет сложный воркфлоу для auto-encode с проверкой VFR. """ task = TASKS_DB[task_id] log_path = Path(task["log_path"]) original_input_path = Path(FILES_DB[request.input_hash]["path"]) temp_cfr_path = None def _log_message(message: str, level: str = "INFO"): line = f"[{datetime.utcnow().strftime('%H:%M:%S')}] [{level}] {message}\n" print(f"Task {task_id}: {line.strip()}") with open(log_path, "a", encoding='utf-8') as log_file: log_file.write(line) task["last_log_line"] = message def _run_sub_task(command: list[str], sub_task_name: str) -> str: _log_message(f"Запуск подзадачи: {sub_task_name}") _log_message(f"Команда: {' '.join(command)}") process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding='utf-8', bufsize=1, preexec_fn=os.setsid ) task['process'] = process full_output = [] with open(log_path, "a", encoding='utf-8') as log_file: for line in iter(process.stdout.readline, ''): log_file.write(line) log_file.flush() full_output.append(line) if line.strip(): task["last_log_line"] = line.strip() process.wait() task.pop('process', None) if process.returncode != 0: raise RuntimeError(f"Подзадача '{sub_task_name}' завершилась с ошибкой (код: {process.returncode}). Смотрите лог для деталей.") return "".join(full_output) try: task["status"] = "running" task["started_at"] = datetime.utcnow() with open(log_path, "w", encoding='utf-8') as f: f.write(f"Starting auto-encode workflow for task {task_id}\n") if not shutil.which("mediainfo"): raise RuntimeError("Утилита 'mediainfo' не найдена в системном PATH. Пожалуйста, установите ее.") _log_message("Шаг 1/4: Проверка на Variable Frame Rate (VFR)") vfr_check_cmd = ["mediainfo", "--Inform=Video;%FrameRate_Mode%", str(original_input_path)] vfr_check = subprocess.run(vfr_check_cmd, capture_output=True, text=True, check=True) frame_rate_mode = vfr_check.stdout.strip() crf_search_input = original_input_path if "VFR" in frame_rate_mode.upper(): _log_message(f"Обнаружен VFR ({frame_rate_mode}). Начало конвертации в CFR.") max_fps_res_cmd = ["mediainfo", "--Inform=Video;%FrameRate_Maximum%", str(original_input_path)] max_fps_res = subprocess.run(max_fps_res_cmd, capture_output=True, text=True, check=True) target_fps = math.ceil(float(max_fps_res.stdout.strip())) _log_message(f"Целевой FPS для CFR: {target_fps}") temp_cfr_path = OUTPUTS_DIR / f"{task_id}_temp_cfr.mp4" # --- ИЗМЕНЕНИЕ: Используются переменные из конфигурации --- cfr_cmd = [ "ffmpeg", "-i", str(original_input_path), "-vf", f"fps={target_fps}", "-c:v", "libsvtav1", "-crf", str(VFR_TO_CFR_CRF), "-preset", str(VFR_TO_CFR_PRESET), "-c:a", "copy", "-c:s", "copy", str(temp_cfr_path) ] _run_sub_task(cfr_cmd, "VFR в CFR конвертация") crf_search_input = temp_cfr_path _log_message("Конвертация в CFR успешно завершена.") else: _log_message(f"Обнаружен CFR ({frame_rate_mode}). Конвертация не требуется.") _log_message("Шаг 2/4: Поиск оптимального CRF (crf-search)") preset_value = request.preset.value if isinstance(request.preset, Enum) else str(request.preset) search_cmd = [ "ab-av1", "crf-search", "-i", str(crf_search_input), "-e", request.encoder.value, "--preset", preset_value, "--min-vmaf", str(request.min_vmaf) ] if request.extra_options: search_cmd.extend(request.extra_options.split()) search_output = _run_sub_task(search_cmd, "Поиск CRF") found_crf = None match = re.search(r"encode .*--crf ([\d.]+)", search_output) if match: found_crf = match.group(1) else: match = re.search(r"^crf ([\d.]+) VMAF", search_output, re.MULTILINE) if match: found_crf = match.group(1) if not found_crf: raise RuntimeError("Не удалось найти рекомендованный CRF в выводе crf-search.") _log_message(f"Найден оптимальный CRF: {found_crf}") _log_message("Шаг 3/4: Финальное кодирование") final_output_path = Path(task["output_path"]) encode_cmd = [ "ab-av1", "encode", "-i", str(crf_search_input), "-o", str(final_output_path), "--crf", found_crf, "-e", request.encoder.value, "--preset", preset_value ] if request.extra_options: encode_cmd.extend(request.extra_options.split()) _run_sub_task(encode_cmd, "Финальное кодирование") task["status"] = "completed" _log_message("Шаг 4/4: Задача успешно завершена!") except Exception as e: task["status"] = "failed" error_message = f"Воркфлоу завершился с ошибкой: {e}" _log_message(error_message, level="ERROR") finally: if temp_cfr_path and temp_cfr_path.exists(): _log_message("Очистка временных файлов...") temp_cfr_path.unlink() task["finished_at"] = datetime.utcnow() task.pop('process', None) def create_task(task_type: str, request: BaseTaskRequest, background_tasks: BackgroundTasks): """Общая функция для создания и запуска любого типа задачи.""" if request.input_hash not in FILES_DB: raise HTTPException(status_code=404, detail=f"Файл с хешем {request.input_hash} не найден.") input_path = Path(FILES_DB[request.input_hash]["path"]) if not input_path.exists(): raise HTTPException(status_code=404, detail=f"Исходный файл для хеша {request.input_hash} был удален.") task_id = str(uuid.uuid4()) output_path = OUTPUTS_DIR / f"{task_id}.mp4" log_path = LOGS_DIR / f"{task_id}.log" command = [] command_str = "" if task_type in ["encode", "crf-search"]: preset_value = request.preset.value if isinstance(request.preset, Enum) else str(request.preset) command = ["ab-av1", task_type, "-i", str(input_path)] command.extend(["-e", request.encoder.value, "--preset", preset_value]) if task_type == "encode": crf_value = request.crf if hasattr(request, 'crf') else 20 command.extend(["-o", str(output_path), "--crf", str(crf_value)]) else: # crf-search min_vmaf_value = request.min_vmaf if hasattr(request, 'min_vmaf') else 96 command.extend(["--min-vmaf", str(min_vmaf_value)]) if request.extra_options: command.extend(request.extra_options.split()) command_str = " ".join(command) else: command_str = f"auto-encode workflow for {request.input_hash}" TASKS_DB[task_id] = { "task_id": task_id, "task_type": task_type, "status": "pending", "input_hash": request.input_hash, "command": command_str, "created_at": datetime.utcnow(), "started_at": None, "finished_at": None, "output_path": str(output_path) if task_type != "crf-search" else None, "log_path": str(log_path), "last_log_line": "Задача поставлена в очередь.", "encoder": request.encoder, "preset": request.preset, "extra_options": request.extra_options, "crf": getattr(request, 'crf', None), "min_vmaf": getattr(request, 'min_vmaf', None), } if task_type == "auto-encode": background_tasks.add_task(run_auto_encode_workflow, task_id, request) else: background_tasks.add_task(run_simple_task, task_id, command, log_path) return TaskCreateResponse( message=f"Задача '{task_type}' успешно создана.", task_id=task_id, status_url=f"/tasks/{task_id}/status", log_url=f"/tasks/{task_id}/log", manage_url="/manage" ) # --- Эндпоинты API --- @app.get("/", summary="Статус API") def read_root(): return {"status": "ok"} @app.post("/upload", response_model=UploadResponse, summary="Загрузка видеофайла", dependencies=[Depends(verify_token)]) async def upload_file(file: UploadFile = File(...)): contents = await file.read() file_hash = hashlib.sha256(contents).hexdigest() if file_hash in FILES_DB and Path(FILES_DB[file_hash]["path"]).exists(): return UploadResponse( message="Файл с таким хешем уже существует.", file_hash=file_hash, file_path=FILES_DB[file_hash]["path"], original_filename=FILES_DB[file_hash]["original_filename"], is_new=False ) file_extension = Path(file.filename).suffix saved_path = UPLOADS_DIR / f"{file_hash}{file_extension}" with open(saved_path, "wb") as f: f.write(contents) FILES_DB[file_hash] = { "path": str(saved_path), "original_filename": file.filename, "uploaded_at": datetime.utcnow(), "size": len(contents) } return UploadResponse( message="Файл успешно загружен.", file_hash=file_hash, file_path=str(saved_path), original_filename=file.filename, is_new=True ) @app.post("/tasks/auto-encode", response_model=TaskCreateResponse, status_code=202, summary="Запуск задачи auto-encode с логикой VFR", dependencies=[Depends(verify_token)]) async def task_auto_encode(request: AutoEncodeRequest, background_tasks: BackgroundTasks): return create_task("auto-encode", request, background_tasks) @app.post("/tasks/crf-search", response_model=TaskCreateResponse, status_code=202, summary="Запуск задачи crf-search", dependencies=[Depends(verify_token)]) async def task_crf_search(request: CrfSearchRequest, background_tasks: BackgroundTasks): return create_task("crf-search", request, background_tasks) @app.post("/tasks/encode", response_model=TaskCreateResponse, status_code=202, summary="Запуск задачи encode", dependencies=[Depends(verify_token)]) async def task_encode(request: EncodeRequest, background_tasks: BackgroundTasks): return create_task("encode", request, background_tasks) @app.get("/tasks/{task_id}/status", response_model=TaskStatusResponse, summary="Получить статус задачи", dependencies=[Depends(verify_token)]) def get_task_status(task_id: str = FastApiPath(..., description="ID задачи")): if task_id not in TASKS_DB: raise HTTPException(status_code=404, detail="Задача не найдена.") task_info = TASKS_DB[task_id].copy() task_info.pop('process', None) return task_info @app.get("/tasks/{task_id}/log", summary="Получить полный лог задачи", dependencies=[Depends(verify_token)]) def get_task_log(task_id: str = FastApiPath(..., description="ID задачи")): if task_id not in TASKS_DB: raise HTTPException(status_code=404, detail="Задача не найдена.") log_path = Path(TASKS_DB[task_id]["log_path"]) if not log_path.exists(): return JSONResponse(status_code=404, content={"detail": "Файл лога не найден."}) return FileResponse(log_path, media_type="text/plain", filename=log_path.name) @app.get("/download/{task_id}", summary="Скачать результат кодирования", dependencies=[Depends(verify_token)]) def download_result(task_id: str = FastApiPath(..., description="ID задачи")): if task_id not in TASKS_DB: raise HTTPException(status_code=404, detail="Задача не найдена.") task = TASKS_DB[task_id] if task["status"] != "completed": raise HTTPException(status_code=400, detail=f"Задача еще не завершена. Статус: {task['status']}.") output_path = task.get("output_path") if not output_path or not Path(output_path).exists(): raise HTTPException(status_code=404, detail="Выходной файл не найден.") output_path = Path(output_path) input_hash = task["input_hash"] original_filename = Path(FILES_DB[input_hash]["original_filename"]).stem return FileResponse(output_path, filename=f"{original_filename}_{task['task_type']}_{task_id[:8]}{output_path.suffix}") # --- Эндпоинты для управления --- @app.get("/manage", summary="Получить списки задач и файлов (JSON)", dependencies=[Depends(verify_token)]) async def get_management_page(): """ Возвращает JSON со списками всех задач и загруженных файлов. """ tasks_list = [] for task_id, task_data in TASKS_DB.items(): task_info = task_data.copy() task_info.pop('process', None) tasks_list.append(task_info) sorted_tasks = sorted(tasks_list, key=lambda x: x['created_at'], reverse=True) return {"tasks": sorted_tasks, "files": FILES_DB} @app.post("/manage/task/{task_id}/cancel", summary="Прервать выполняющуюся задачу", dependencies=[Depends(verify_token)]) def cancel_task(task_id: str = FastApiPath(..., description="ID задачи для прерывания")): if task_id not in TASKS_DB: raise HTTPException(status_code=404, detail="Задача не найдена.") task = TASKS_DB[task_id] if task['status'] != 'running' or 'process' not in task: raise HTTPException(status_code=400, detail="Задачу нельзя прервать (она не выполняется).") print(f"Прерывание задачи {task_id} (PID: {task['process'].pid})...") task['was_cancelled'] = True os.killpg(os.getpgid(task['process'].pid), signal.SIGTERM) return {"message": f"Команда на прерывание задачи {task_id} отправлена."} @app.delete("/manage/task/{task_id}", summary="Удалить задачу и ее файлы", dependencies=[Depends(verify_token)]) def delete_task(task_id: str = FastApiPath(..., description="ID задачи для удаления")): if task_id not in TASKS_DB: raise HTTPException(status_code=404, detail="Задача не найдена.") task = TASKS_DB[task_id] if task['status'] == 'running': raise HTTPException(status_code=400, detail="Сначала прервите выполняющуюся задачу.") if task.get('output_path') and Path(task['output_path']).exists(): Path(task['output_path']).unlink(missing_ok=True) if task.get('log_path') and Path(task['log_path']).exists(): Path(task['log_path']).unlink(missing_ok=True) del TASKS_DB[task_id] return {"message": f"Задача {task_id} и ее файлы были удалены."} @app.delete("/manage/file/{file_hash}", summary="Удалить загруженный файл", dependencies=[Depends(verify_token)]) def delete_file(file_hash: str = FastApiPath(..., description="Хеш файла для удаления")): if file_hash not in FILES_DB: raise HTTPException(status_code=404, detail="Файл не найден.") for task in TASKS_DB.values(): if task['input_hash'] == file_hash and task['status'] in ['running', 'pending']: raise HTTPException(status_code=400, detail=f"Нельзя удалить файл, так как он используется активной задачей {task['task_id']}.") file_path = Path(FILES_DB[file_hash]['path']) if file_path.exists(): file_path.unlink() del FILES_DB[file_hash] return {"message": f"Файл {file_hash} был удален."}