|
|
|
import os |
|
import uuid |
|
import hashlib |
|
import subprocess |
|
import shutil |
|
import signal |
|
import re |
|
import math |
|
from datetime import datetime, timedelta |
|
from pathlib import Path |
|
from typing import Dict, Any, List, Union |
|
from enum import Enum |
|
|
|
from fastapi import FastAPI, File, UploadFile, BackgroundTasks, HTTPException, Path as FastApiPath, Depends, Query, status |
|
from fastapi.responses import JSONResponse, FileResponse |
|
from pydantic import BaseModel, Field, validator |
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler |
|
|
|
|
|
|
|
|
|
API_KEY = os.getenv("API_KEY") |
|
|
|
|
|
|
|
|
|
|
|
VFR_TO_CFR_CRF = 17 |
|
VFR_TO_CFR_PRESET = 4 |
|
|
|
|
|
BASE_DATA_DIR = Path("data") |
|
UPLOADS_DIR = BASE_DATA_DIR / "uploads" |
|
OUTPUTS_DIR = BASE_DATA_DIR / "outputs" |
|
LOGS_DIR = BASE_DATA_DIR / "logs" |
|
|
|
|
|
FILES_DB: Dict[str, Dict[str, Any]] = {} |
|
TASKS_DB: Dict[str, Dict[str, Any]] = {} |
|
|
|
|
|
MAX_AGE_HOURS = 24 |
|
MAX_TOTAL_SIZE_GB = 50 |
|
CLEANUP_INTERVAL_MINUTES = 30 |
|
|
|
|
|
scheduler = AsyncIOScheduler() |
|
|
|
|
|
|
|
async def verify_token(token: str = Query(..., description="Токен для доступа к API.")): |
|
"""Проверяет, соответствует ли предоставленный токен ключу API сервера.""" |
|
if not API_KEY: |
|
print("КРИТИЧЕСКАЯ ОШИБКА: Переменная окружения API_KEY не установлена!") |
|
raise HTTPException( |
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
detail="Ключ API не сконфигурирован на сервере." |
|
) |
|
if token != API_KEY: |
|
raise HTTPException( |
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
detail="Неверный или отсутствующий токен API." |
|
) |
|
|
|
|
|
|
|
class Encoder(str, Enum): |
|
av1 = "svt-av1" |
|
x265 = "libx265" |
|
x264 = "libx264" |
|
|
|
class X265Preset(str, Enum): |
|
ultrafast = "ultrafast" |
|
superfast = "superfast" |
|
veryfast = "veryfast" |
|
faster = "faster" |
|
fast = "fast" |
|
medium = "medium" |
|
slow = "slow" |
|
slower = "slower" |
|
veryslow = "veryslow" |
|
placebo = "placebo" |
|
|
|
|
|
|
|
class UploadResponse(BaseModel): |
|
message: str |
|
file_hash: str |
|
file_path: str |
|
original_filename: str |
|
is_new: bool |
|
|
|
class BaseTaskRequest(BaseModel): |
|
input_hash: str = Field(..., description="SHA256 хеш загруженного файла.") |
|
encoder: Encoder = Field(Encoder.av1, description="Выбор кодека.") |
|
extra_options: str = Field("", description="Дополнительные опции для ab-av1.") |
|
|
|
class AutoEncodeRequest(BaseTaskRequest): |
|
preset: Union[int, X265Preset] = Field(3, description="Пресет для кодировщика. Для AV1: 0-12 (int). Для x265: 'slow', 'medium' и т.д. (str).") |
|
min_vmaf: int = Field(96, description="Целевой минимальный VMAF.", ge=0, le=100) |
|
|
|
@validator('preset') |
|
def preset_must_match_encoder(cls, v, values): |
|
encoder = values.get('encoder') |
|
if encoder == Encoder.av1 and not isinstance(v, int): |
|
raise ValueError('Для AV1 пресет должен быть числом (0-12).') |
|
if encoder == Encoder.x265 and not isinstance(v, X265Preset): |
|
raise ValueError("Для x265 пресет должен быть строкой (например, 'slow').") |
|
if isinstance(v, int) and (v < 0 or v > 12): |
|
raise ValueError('Пресет для AV1 должен быть в диапазоне от 0 до 12.') |
|
return v |
|
|
|
class CrfSearchRequest(AutoEncodeRequest): |
|
pass |
|
|
|
class EncodeRequest(BaseTaskRequest): |
|
preset: Union[int, X265Preset] = Field(3, description="Пресет для кодировщика. Для AV1: 0-12 (int). Для x265: 'slow', 'medium' и т.д. (str).") |
|
crf: float = Field(20, description="Значение CRF (Constant Rate Factor). Допускаются дробные значения.") |
|
|
|
@validator('preset') |
|
def preset_must_match_encoder(cls, v, values): |
|
encoder = values.get('encoder') |
|
if encoder == Encoder.av1 and not isinstance(v, int): |
|
raise ValueError('Для AV1 пресет должен быть числом (0-12).') |
|
if encoder == Encoder.x265 and not isinstance(v, X265Preset): |
|
raise ValueError("Для x265 пресет должен быть строкой (например, 'slow').") |
|
if isinstance(v, int) and (v < 0 or v > 12): |
|
raise ValueError('Пресет для AV1 должен быть в диапазоне от 0 до 12.') |
|
return v |
|
|
|
class TaskStatusResponse(BaseModel): |
|
task_id: str |
|
task_type: str |
|
status: str |
|
input_hash: str |
|
command: str |
|
created_at: datetime |
|
started_at: datetime | None |
|
finished_at: datetime | None |
|
output_path: str | None |
|
log_path: str |
|
last_log_line: str | None |
|
|
|
encoder: Encoder | None = None |
|
preset: Union[int, str] | None = None |
|
crf: float | None = None |
|
min_vmaf: int | None = None |
|
extra_options: str | None = None |
|
|
|
|
|
class TaskCreateResponse(BaseModel): |
|
message: str |
|
task_id: str |
|
status_url: str |
|
log_url: str |
|
manage_url: str |
|
|
|
|
|
async def cleanup_files(): |
|
"""Периодическая задача для удаления старых файлов и контроля размера хранилища.""" |
|
print(f"[{datetime.utcnow()}] Запуск задачи очистки...") |
|
|
|
now = datetime.utcnow() |
|
max_age_limit = now - timedelta(hours=MAX_AGE_HOURS) |
|
max_size_bytes = MAX_TOTAL_SIZE_GB * (1024**3) |
|
|
|
all_managed_files: List[Path] = [] |
|
for dir_path in [UPLOADS_DIR, OUTPUTS_DIR, LOGS_DIR]: |
|
all_managed_files.extend(list(dir_path.glob("**/*"))) |
|
|
|
active_files = set() |
|
for task_id, task in TASKS_DB.items(): |
|
if task['status'] in ['pending', 'running']: |
|
if task['input_hash'] in FILES_DB: |
|
active_files.add(Path(FILES_DB[task['input_hash']]['path'])) |
|
if task.get('output_path'): |
|
active_files.add(Path(task['output_path'])) |
|
if task.get('log_path'): |
|
active_files.add(Path(task['log_path'])) |
|
|
|
files_to_delete = set() |
|
files_with_meta = [] |
|
|
|
for file_path in all_managed_files: |
|
if not file_path.is_file() or file_path in active_files: |
|
continue |
|
try: |
|
mtime = datetime.utcfromtimestamp(file_path.stat().st_mtime) |
|
if mtime < max_age_limit: |
|
files_to_delete.add(file_path) |
|
else: |
|
files_with_meta.append({'path': file_path, 'mtime': mtime, 'size': file_path.stat().st_size}) |
|
except FileNotFoundError: |
|
continue |
|
|
|
current_total_size = sum(f.stat().st_size for f in all_managed_files if f.is_file()) |
|
if current_total_size > max_size_bytes: |
|
files_with_meta.sort(key=lambda x: x['mtime']) |
|
|
|
size_to_free = current_total_size - max_size_bytes |
|
freed_size = 0 |
|
|
|
for file_meta in files_with_meta: |
|
if freed_size >= size_to_free: |
|
break |
|
if file_meta['path'] not in files_to_delete and file_meta['path'] not in active_files: |
|
files_to_delete.add(file_meta['path']) |
|
freed_size += file_meta['size'] |
|
|
|
deleted_count = 0 |
|
for file_path in files_to_delete: |
|
try: |
|
file_path.unlink() |
|
deleted_count += 1 |
|
if str(file_path).startswith(str(UPLOADS_DIR)): |
|
file_hash_to_del = file_path.stem |
|
FILES_DB.pop(file_hash_to_del, None) |
|
elif str(file_path).startswith(str(OUTPUTS_DIR)) or str(file_path).startswith(str(LOGS_DIR)): |
|
task_id_to_del = file_path.stem |
|
TASKS_DB.pop(task_id_to_del, None) |
|
except Exception as e: |
|
print(f"Ошибка при удалении файла {file_path}: {e}") |
|
|
|
if deleted_count > 0: |
|
print(f"Очистка завершена. Удалено {deleted_count} файлов.") |
|
|
|
|
|
app = FastAPI( |
|
title="ab-av1 API Server", |
|
description="REST API для асинхронного кодирования видео. **Интерактивная документация: `/docs`**.", |
|
version="2.8.0", |
|
) |
|
|
|
@app.on_event("startup") |
|
async def on_startup(): |
|
"""Создаем директории, проверяем API_KEY и запускаем планировщик.""" |
|
if not API_KEY: |
|
print("КРИТИЧЕСКАЯ ОШИБКА: Переменная окружения API_KEY не установлена. Сервер не сможет авторизовать запросы.") |
|
else: |
|
print("Ключ API успешно загружен.") |
|
|
|
for dir_path in [UPLOADS_DIR, OUTPUTS_DIR, LOGS_DIR]: |
|
dir_path.mkdir(parents=True, exist_ok=True) |
|
|
|
scheduler.add_job(cleanup_files, 'interval', minutes=CLEANUP_INTERVAL_MINUTES) |
|
scheduler.start() |
|
print("Планировщик очистки файлов запущен.") |
|
|
|
@app.on_event("shutdown") |
|
async def on_shutdown(): |
|
"""Останавливаем планировщик и активные процессы при выключении.""" |
|
scheduler.shutdown() |
|
print("Планировщик остановлен.") |
|
for task_id, task in TASKS_DB.items(): |
|
if task.get('process') and task['status'] == 'running': |
|
print(f"Остановка процесса для задачи {task_id}...") |
|
task['process'].terminate() |
|
try: |
|
task['process'].wait(timeout=5) |
|
except subprocess.TimeoutExpired: |
|
task['process'].kill() |
|
|
|
|
|
|
|
def run_simple_task(task_id: str, command: list[str], log_path: Path): |
|
"""Выполняет простую одношаговую команду в фоне (для encode и crf-search).""" |
|
task = TASKS_DB[task_id] |
|
task["started_at"] = datetime.utcnow() |
|
task["status"] = "running" |
|
try: |
|
process = subprocess.Popen( |
|
command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, |
|
text=True, encoding='utf-8', bufsize=1, preexec_fn=os.setsid |
|
) |
|
task['process'] = process |
|
|
|
with open(log_path, "w", encoding='utf-8') as log_file: |
|
log_file.write(f"Executing command: {' '.join(command)}\n\n") |
|
for line in iter(process.stdout.readline, ''): |
|
log_file.write(line) |
|
log_file.flush() |
|
task["last_log_line"] = line.strip() |
|
|
|
process.wait() |
|
|
|
if task.get('was_cancelled'): |
|
task["status"] = "cancelled" |
|
elif process.returncode == 0: |
|
task["status"] = "completed" |
|
else: |
|
task["status"] = "failed" |
|
task["last_log_line"] = f"Процесс завершился с кодом {process.returncode}" |
|
|
|
except Exception as e: |
|
task["status"] = "failed" |
|
error_message = f"Произошло исключение Python: {e}" |
|
task["last_log_line"] = error_message |
|
with open(log_path, "a", encoding='utf-8') as log_file: |
|
log_file.write(f"\n--- PYTHON EXCEPTION ---\n{error_message}") |
|
finally: |
|
task["finished_at"] = datetime.utcnow() |
|
task.pop('process', None) |
|
|
|
def run_auto_encode_workflow(task_id: str, request: AutoEncodeRequest): |
|
""" |
|
Выполняет сложный воркфлоу для auto-encode с проверкой VFR. |
|
""" |
|
task = TASKS_DB[task_id] |
|
log_path = Path(task["log_path"]) |
|
original_input_path = Path(FILES_DB[request.input_hash]["path"]) |
|
temp_cfr_path = None |
|
|
|
def _log_message(message: str, level: str = "INFO"): |
|
line = f"[{datetime.utcnow().strftime('%H:%M:%S')}] [{level}] {message}\n" |
|
print(f"Task {task_id}: {line.strip()}") |
|
with open(log_path, "a", encoding='utf-8') as log_file: |
|
log_file.write(line) |
|
task["last_log_line"] = message |
|
|
|
def _run_sub_task(command: list[str], sub_task_name: str) -> str: |
|
_log_message(f"Запуск подзадачи: {sub_task_name}") |
|
_log_message(f"Команда: {' '.join(command)}") |
|
|
|
process = subprocess.Popen( |
|
command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, |
|
text=True, encoding='utf-8', bufsize=1, preexec_fn=os.setsid |
|
) |
|
task['process'] = process |
|
|
|
full_output = [] |
|
with open(log_path, "a", encoding='utf-8') as log_file: |
|
for line in iter(process.stdout.readline, ''): |
|
log_file.write(line) |
|
log_file.flush() |
|
full_output.append(line) |
|
if line.strip(): |
|
task["last_log_line"] = line.strip() |
|
|
|
process.wait() |
|
task.pop('process', None) |
|
|
|
if process.returncode != 0: |
|
raise RuntimeError(f"Подзадача '{sub_task_name}' завершилась с ошибкой (код: {process.returncode}). Смотрите лог для деталей.") |
|
|
|
return "".join(full_output) |
|
|
|
try: |
|
task["status"] = "running" |
|
task["started_at"] = datetime.utcnow() |
|
with open(log_path, "w", encoding='utf-8') as f: |
|
f.write(f"Starting auto-encode workflow for task {task_id}\n") |
|
|
|
if not shutil.which("mediainfo"): |
|
raise RuntimeError("Утилита 'mediainfo' не найдена в системном PATH. Пожалуйста, установите ее.") |
|
|
|
_log_message("Шаг 1/4: Проверка на Variable Frame Rate (VFR)") |
|
vfr_check_cmd = ["mediainfo", "--Inform=Video;%FrameRate_Mode%", str(original_input_path)] |
|
vfr_check = subprocess.run(vfr_check_cmd, capture_output=True, text=True, check=True) |
|
frame_rate_mode = vfr_check.stdout.strip() |
|
|
|
crf_search_input = original_input_path |
|
|
|
if "VFR" in frame_rate_mode.upper(): |
|
_log_message(f"Обнаружен VFR ({frame_rate_mode}). Начало конвертации в CFR.") |
|
|
|
max_fps_res_cmd = ["mediainfo", "--Inform=Video;%FrameRate_Maximum%", str(original_input_path)] |
|
max_fps_res = subprocess.run(max_fps_res_cmd, capture_output=True, text=True, check=True) |
|
target_fps = math.ceil(float(max_fps_res.stdout.strip())) |
|
_log_message(f"Целевой FPS для CFR: {target_fps}") |
|
|
|
temp_cfr_path = OUTPUTS_DIR / f"{task_id}_temp_cfr.mp4" |
|
|
|
|
|
cfr_cmd = [ |
|
"ffmpeg", "-i", str(original_input_path), "-vf", f"fps={target_fps}", |
|
"-c:v", "libsvtav1", |
|
"-crf", str(VFR_TO_CFR_CRF), |
|
"-preset", str(VFR_TO_CFR_PRESET), |
|
"-c:a", "copy", "-c:s", "copy", str(temp_cfr_path) |
|
] |
|
_run_sub_task(cfr_cmd, "VFR в CFR конвертация") |
|
crf_search_input = temp_cfr_path |
|
_log_message("Конвертация в CFR успешно завершена.") |
|
else: |
|
_log_message(f"Обнаружен CFR ({frame_rate_mode}). Конвертация не требуется.") |
|
|
|
_log_message("Шаг 2/4: Поиск оптимального CRF (crf-search)") |
|
|
|
preset_value = request.preset.value if isinstance(request.preset, Enum) else str(request.preset) |
|
|
|
search_cmd = [ |
|
"ab-av1", "crf-search", "-i", str(crf_search_input), |
|
"-e", request.encoder.value, "--preset", preset_value, |
|
"--min-vmaf", str(request.min_vmaf) |
|
] |
|
if request.extra_options: |
|
search_cmd.extend(request.extra_options.split()) |
|
|
|
search_output = _run_sub_task(search_cmd, "Поиск CRF") |
|
|
|
found_crf = None |
|
match = re.search(r"encode .*--crf ([\d.]+)", search_output) |
|
if match: |
|
found_crf = match.group(1) |
|
else: |
|
match = re.search(r"^crf ([\d.]+) VMAF", search_output, re.MULTILINE) |
|
if match: |
|
found_crf = match.group(1) |
|
|
|
if not found_crf: |
|
raise RuntimeError("Не удалось найти рекомендованный CRF в выводе crf-search.") |
|
_log_message(f"Найден оптимальный CRF: {found_crf}") |
|
|
|
_log_message("Шаг 3/4: Финальное кодирование") |
|
final_output_path = Path(task["output_path"]) |
|
encode_cmd = [ |
|
"ab-av1", "encode", "-i", str(crf_search_input), "-o", str(final_output_path), |
|
"--crf", found_crf, "-e", request.encoder.value, "--preset", preset_value |
|
] |
|
if request.extra_options: |
|
encode_cmd.extend(request.extra_options.split()) |
|
|
|
_run_sub_task(encode_cmd, "Финальное кодирование") |
|
|
|
task["status"] = "completed" |
|
_log_message("Шаг 4/4: Задача успешно завершена!") |
|
|
|
except Exception as e: |
|
task["status"] = "failed" |
|
error_message = f"Воркфлоу завершился с ошибкой: {e}" |
|
_log_message(error_message, level="ERROR") |
|
finally: |
|
if temp_cfr_path and temp_cfr_path.exists(): |
|
_log_message("Очистка временных файлов...") |
|
temp_cfr_path.unlink() |
|
task["finished_at"] = datetime.utcnow() |
|
task.pop('process', None) |
|
|
|
def create_task(task_type: str, request: BaseTaskRequest, background_tasks: BackgroundTasks): |
|
"""Общая функция для создания и запуска любого типа задачи.""" |
|
if request.input_hash not in FILES_DB: |
|
raise HTTPException(status_code=404, detail=f"Файл с хешем {request.input_hash} не найден.") |
|
|
|
input_path = Path(FILES_DB[request.input_hash]["path"]) |
|
if not input_path.exists(): |
|
raise HTTPException(status_code=404, detail=f"Исходный файл для хеша {request.input_hash} был удален.") |
|
|
|
task_id = str(uuid.uuid4()) |
|
output_path = OUTPUTS_DIR / f"{task_id}.mp4" |
|
log_path = LOGS_DIR / f"{task_id}.log" |
|
|
|
command = [] |
|
command_str = "" |
|
|
|
if task_type in ["encode", "crf-search"]: |
|
preset_value = request.preset.value if isinstance(request.preset, Enum) else str(request.preset) |
|
|
|
command = ["ab-av1", task_type, "-i", str(input_path)] |
|
command.extend(["-e", request.encoder.value, "--preset", preset_value]) |
|
if task_type == "encode": |
|
crf_value = request.crf if hasattr(request, 'crf') else 20 |
|
command.extend(["-o", str(output_path), "--crf", str(crf_value)]) |
|
else: |
|
min_vmaf_value = request.min_vmaf if hasattr(request, 'min_vmaf') else 96 |
|
command.extend(["--min-vmaf", str(min_vmaf_value)]) |
|
if request.extra_options: |
|
command.extend(request.extra_options.split()) |
|
command_str = " ".join(command) |
|
else: |
|
command_str = f"auto-encode workflow for {request.input_hash}" |
|
|
|
TASKS_DB[task_id] = { |
|
"task_id": task_id, |
|
"task_type": task_type, |
|
"status": "pending", |
|
"input_hash": request.input_hash, |
|
"command": command_str, |
|
"created_at": datetime.utcnow(), |
|
"started_at": None, |
|
"finished_at": None, |
|
"output_path": str(output_path) if task_type != "crf-search" else None, |
|
"log_path": str(log_path), |
|
"last_log_line": "Задача поставлена в очередь.", |
|
|
|
"encoder": request.encoder, |
|
"preset": request.preset, |
|
"extra_options": request.extra_options, |
|
"crf": getattr(request, 'crf', None), |
|
"min_vmaf": getattr(request, 'min_vmaf', None), |
|
} |
|
|
|
if task_type == "auto-encode": |
|
background_tasks.add_task(run_auto_encode_workflow, task_id, request) |
|
else: |
|
background_tasks.add_task(run_simple_task, task_id, command, log_path) |
|
|
|
return TaskCreateResponse( |
|
message=f"Задача '{task_type}' успешно создана.", |
|
task_id=task_id, status_url=f"/tasks/{task_id}/status", |
|
log_url=f"/tasks/{task_id}/log", manage_url="/manage" |
|
) |
|
|
|
|
|
|
|
@app.get("/", summary="Статус API") |
|
def read_root(): |
|
return {"status": "ok"} |
|
|
|
@app.post("/upload", response_model=UploadResponse, summary="Загрузка видеофайла", dependencies=[Depends(verify_token)]) |
|
async def upload_file(file: UploadFile = File(...)): |
|
contents = await file.read() |
|
file_hash = hashlib.sha256(contents).hexdigest() |
|
|
|
if file_hash in FILES_DB and Path(FILES_DB[file_hash]["path"]).exists(): |
|
return UploadResponse( |
|
message="Файл с таким хешем уже существует.", |
|
file_hash=file_hash, |
|
file_path=FILES_DB[file_hash]["path"], |
|
original_filename=FILES_DB[file_hash]["original_filename"], |
|
is_new=False |
|
) |
|
|
|
file_extension = Path(file.filename).suffix |
|
saved_path = UPLOADS_DIR / f"{file_hash}{file_extension}" |
|
|
|
with open(saved_path, "wb") as f: |
|
f.write(contents) |
|
|
|
FILES_DB[file_hash] = { |
|
"path": str(saved_path), |
|
"original_filename": file.filename, |
|
"uploaded_at": datetime.utcnow(), |
|
"size": len(contents) |
|
} |
|
|
|
return UploadResponse( |
|
message="Файл успешно загружен.", |
|
file_hash=file_hash, |
|
file_path=str(saved_path), |
|
original_filename=file.filename, |
|
is_new=True |
|
) |
|
|
|
@app.post("/tasks/auto-encode", response_model=TaskCreateResponse, status_code=202, summary="Запуск задачи auto-encode с логикой VFR", dependencies=[Depends(verify_token)]) |
|
async def task_auto_encode(request: AutoEncodeRequest, background_tasks: BackgroundTasks): |
|
return create_task("auto-encode", request, background_tasks) |
|
|
|
@app.post("/tasks/crf-search", response_model=TaskCreateResponse, status_code=202, summary="Запуск задачи crf-search", dependencies=[Depends(verify_token)]) |
|
async def task_crf_search(request: CrfSearchRequest, background_tasks: BackgroundTasks): |
|
return create_task("crf-search", request, background_tasks) |
|
|
|
@app.post("/tasks/encode", response_model=TaskCreateResponse, status_code=202, summary="Запуск задачи encode", dependencies=[Depends(verify_token)]) |
|
async def task_encode(request: EncodeRequest, background_tasks: BackgroundTasks): |
|
return create_task("encode", request, background_tasks) |
|
|
|
@app.get("/tasks/{task_id}/status", response_model=TaskStatusResponse, summary="Получить статус задачи", dependencies=[Depends(verify_token)]) |
|
def get_task_status(task_id: str = FastApiPath(..., description="ID задачи")): |
|
if task_id not in TASKS_DB: |
|
raise HTTPException(status_code=404, detail="Задача не найдена.") |
|
task_info = TASKS_DB[task_id].copy() |
|
task_info.pop('process', None) |
|
return task_info |
|
|
|
@app.get("/tasks/{task_id}/log", summary="Получить полный лог задачи", dependencies=[Depends(verify_token)]) |
|
def get_task_log(task_id: str = FastApiPath(..., description="ID задачи")): |
|
if task_id not in TASKS_DB: |
|
raise HTTPException(status_code=404, detail="Задача не найдена.") |
|
log_path = Path(TASKS_DB[task_id]["log_path"]) |
|
if not log_path.exists(): |
|
return JSONResponse(status_code=404, content={"detail": "Файл лога не найден."}) |
|
return FileResponse(log_path, media_type="text/plain", filename=log_path.name) |
|
|
|
@app.get("/download/{task_id}", summary="Скачать результат кодирования", dependencies=[Depends(verify_token)]) |
|
def download_result(task_id: str = FastApiPath(..., description="ID задачи")): |
|
if task_id not in TASKS_DB: |
|
raise HTTPException(status_code=404, detail="Задача не найдена.") |
|
task = TASKS_DB[task_id] |
|
if task["status"] != "completed": |
|
raise HTTPException(status_code=400, detail=f"Задача еще не завершена. Статус: {task['status']}.") |
|
output_path = task.get("output_path") |
|
if not output_path or not Path(output_path).exists(): |
|
raise HTTPException(status_code=404, detail="Выходной файл не найден.") |
|
|
|
output_path = Path(output_path) |
|
input_hash = task["input_hash"] |
|
original_filename = Path(FILES_DB[input_hash]["original_filename"]).stem |
|
|
|
return FileResponse(output_path, filename=f"{original_filename}_{task['task_type']}_{task_id[:8]}{output_path.suffix}") |
|
|
|
|
|
|
|
@app.get("/manage", summary="Получить списки задач и файлов (JSON)", dependencies=[Depends(verify_token)]) |
|
async def get_management_page(): |
|
""" |
|
Возвращает JSON со списками всех задач и загруженных файлов. |
|
""" |
|
tasks_list = [] |
|
for task_id, task_data in TASKS_DB.items(): |
|
task_info = task_data.copy() |
|
task_info.pop('process', None) |
|
tasks_list.append(task_info) |
|
|
|
sorted_tasks = sorted(tasks_list, key=lambda x: x['created_at'], reverse=True) |
|
|
|
return {"tasks": sorted_tasks, "files": FILES_DB} |
|
|
|
|
|
@app.post("/manage/task/{task_id}/cancel", summary="Прервать выполняющуюся задачу", dependencies=[Depends(verify_token)]) |
|
def cancel_task(task_id: str = FastApiPath(..., description="ID задачи для прерывания")): |
|
if task_id not in TASKS_DB: |
|
raise HTTPException(status_code=404, detail="Задача не найдена.") |
|
task = TASKS_DB[task_id] |
|
if task['status'] != 'running' or 'process' not in task: |
|
raise HTTPException(status_code=400, detail="Задачу нельзя прервать (она не выполняется).") |
|
|
|
print(f"Прерывание задачи {task_id} (PID: {task['process'].pid})...") |
|
task['was_cancelled'] = True |
|
os.killpg(os.getpgid(task['process'].pid), signal.SIGTERM) |
|
|
|
return {"message": f"Команда на прерывание задачи {task_id} отправлена."} |
|
|
|
@app.delete("/manage/task/{task_id}", summary="Удалить задачу и ее файлы", dependencies=[Depends(verify_token)]) |
|
def delete_task(task_id: str = FastApiPath(..., description="ID задачи для удаления")): |
|
if task_id not in TASKS_DB: |
|
raise HTTPException(status_code=404, detail="Задача не найдена.") |
|
|
|
task = TASKS_DB[task_id] |
|
if task['status'] == 'running': |
|
raise HTTPException(status_code=400, detail="Сначала прервите выполняющуюся задачу.") |
|
|
|
if task.get('output_path') and Path(task['output_path']).exists(): |
|
Path(task['output_path']).unlink(missing_ok=True) |
|
if task.get('log_path') and Path(task['log_path']).exists(): |
|
Path(task['log_path']).unlink(missing_ok=True) |
|
|
|
del TASKS_DB[task_id] |
|
|
|
return {"message": f"Задача {task_id} и ее файлы были удалены."} |
|
|
|
@app.delete("/manage/file/{file_hash}", summary="Удалить загруженный файл", dependencies=[Depends(verify_token)]) |
|
def delete_file(file_hash: str = FastApiPath(..., description="Хеш файла для удаления")): |
|
if file_hash not in FILES_DB: |
|
raise HTTPException(status_code=404, detail="Файл не найден.") |
|
|
|
for task in TASKS_DB.values(): |
|
if task['input_hash'] == file_hash and task['status'] in ['running', 'pending']: |
|
raise HTTPException(status_code=400, detail=f"Нельзя удалить файл, так как он используется активной задачей {task['task_id']}.") |
|
|
|
file_path = Path(FILES_DB[file_hash]['path']) |
|
if file_path.exists(): |
|
file_path.unlink() |
|
|
|
del FILES_DB[file_hash] |
|
|
|
return {"message": f"Файл {file_hash} был удален."} |
|
|
|
|