ab-av1 / app /main.py
opex792's picture
Update app/main.py
862949d verified
import os
import uuid
import hashlib
import subprocess
import shutil
import signal
import re
import math
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, Any, List, Union
from enum import Enum
from fastapi import FastAPI, File, UploadFile, BackgroundTasks, HTTPException, Path as FastApiPath, Depends, Query, status
from fastapi.responses import JSONResponse, FileResponse
from pydantic import BaseModel, Field, validator
from apscheduler.schedulers.asyncio import AsyncIOScheduler
# --- Конфигурация и глобальные переменные ---
# Ключ API для доступа
API_KEY = os.getenv("API_KEY")
# --- ИЗМЕНЕНИЕ: Параметры для VFR -> CFR вынесены сюда ---
# Настройки для конвертации видео с переменной частотой кадров (VFR)
# в видео с постоянной частотой кадров (CFR).
# Этот шаг выполняется для стабилизации видео перед основным кодированием.
VFR_TO_CFR_CRF = 17 # Качество (чем ниже, тем лучше). 17 - очень высокое.
VFR_TO_CFR_PRESET = 4 # Скорость (чем ниже, тем качественнее, но медленнее).
# Директории для хранения данных
BASE_DATA_DIR = Path("data")
UPLOADS_DIR = BASE_DATA_DIR / "uploads"
OUTPUTS_DIR = BASE_DATA_DIR / "outputs"
LOGS_DIR = BASE_DATA_DIR / "logs"
# "Базы данных" в памяти
FILES_DB: Dict[str, Dict[str, Any]] = {}
TASKS_DB: Dict[str, Dict[str, Any]] = {}
# Настройки для очистки
MAX_AGE_HOURS = 24
MAX_TOTAL_SIZE_GB = 50
CLEANUP_INTERVAL_MINUTES = 30
# Планировщик для задачи очистки
scheduler = AsyncIOScheduler()
# --- Зависимость для проверки токена ---
async def verify_token(token: str = Query(..., description="Токен для доступа к API.")):
"""Проверяет, соответствует ли предоставленный токен ключу API сервера."""
if not API_KEY:
print("КРИТИЧЕСКАЯ ОШИБКА: Переменная окружения API_KEY не установлена!")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Ключ API не сконфигурирован на сервере."
)
if token != API_KEY:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверный или отсутствующий токен API."
)
# --- Перечисления и константы ---
class Encoder(str, Enum):
av1 = "svt-av1"
x265 = "libx265"
x264 = "libx264"
class X265Preset(str, Enum):
ultrafast = "ultrafast"
superfast = "superfast"
veryfast = "veryfast"
faster = "faster"
fast = "fast"
medium = "medium"
slow = "slow"
slower = "slower"
veryslow = "veryslow"
placebo = "placebo"
# --- Модели данных для API (Pydantic) ---
class UploadResponse(BaseModel):
message: str
file_hash: str
file_path: str
original_filename: str
is_new: bool
class BaseTaskRequest(BaseModel):
input_hash: str = Field(..., description="SHA256 хеш загруженного файла.")
encoder: Encoder = Field(Encoder.av1, description="Выбор кодека.")
extra_options: str = Field("", description="Дополнительные опции для ab-av1.")
class AutoEncodeRequest(BaseTaskRequest):
preset: Union[int, X265Preset] = Field(3, description="Пресет для кодировщика. Для AV1: 0-12 (int). Для x265: 'slow', 'medium' и т.д. (str).")
min_vmaf: int = Field(96, description="Целевой минимальный VMAF.", ge=0, le=100)
@validator('preset')
def preset_must_match_encoder(cls, v, values):
encoder = values.get('encoder')
if encoder == Encoder.av1 and not isinstance(v, int):
raise ValueError('Для AV1 пресет должен быть числом (0-12).')
if encoder == Encoder.x265 and not isinstance(v, X265Preset):
raise ValueError("Для x265 пресет должен быть строкой (например, 'slow').")
if isinstance(v, int) and (v < 0 or v > 12):
raise ValueError('Пресет для AV1 должен быть в диапазоне от 0 до 12.')
return v
class CrfSearchRequest(AutoEncodeRequest):
pass
class EncodeRequest(BaseTaskRequest):
preset: Union[int, X265Preset] = Field(3, description="Пресет для кодировщика. Для AV1: 0-12 (int). Для x265: 'slow', 'medium' и т.д. (str).")
crf: float = Field(20, description="Значение CRF (Constant Rate Factor). Допускаются дробные значения.")
@validator('preset')
def preset_must_match_encoder(cls, v, values):
encoder = values.get('encoder')
if encoder == Encoder.av1 and not isinstance(v, int):
raise ValueError('Для AV1 пресет должен быть числом (0-12).')
if encoder == Encoder.x265 and not isinstance(v, X265Preset):
raise ValueError("Для x265 пресет должен быть строкой (например, 'slow').")
if isinstance(v, int) and (v < 0 or v > 12):
raise ValueError('Пресет для AV1 должен быть в диапазоне от 0 до 12.')
return v
class TaskStatusResponse(BaseModel):
task_id: str
task_type: str
status: str
input_hash: str
command: str
created_at: datetime
started_at: datetime | None
finished_at: datetime | None
output_path: str | None
log_path: str
last_log_line: str | None
encoder: Encoder | None = None
preset: Union[int, str] | None = None
crf: float | None = None
min_vmaf: int | None = None
extra_options: str | None = None
class TaskCreateResponse(BaseModel):
message: str
task_id: str
status_url: str
log_url: str
manage_url: str
# --- Логика очистки (Janitor) ---
async def cleanup_files():
"""Периодическая задача для удаления старых файлов и контроля размера хранилища."""
print(f"[{datetime.utcnow()}] Запуск задачи очистки...")
now = datetime.utcnow()
max_age_limit = now - timedelta(hours=MAX_AGE_HOURS)
max_size_bytes = MAX_TOTAL_SIZE_GB * (1024**3)
all_managed_files: List[Path] = []
for dir_path in [UPLOADS_DIR, OUTPUTS_DIR, LOGS_DIR]:
all_managed_files.extend(list(dir_path.glob("**/*")))
active_files = set()
for task_id, task in TASKS_DB.items():
if task['status'] in ['pending', 'running']:
if task['input_hash'] in FILES_DB:
active_files.add(Path(FILES_DB[task['input_hash']]['path']))
if task.get('output_path'):
active_files.add(Path(task['output_path']))
if task.get('log_path'):
active_files.add(Path(task['log_path']))
files_to_delete = set()
files_with_meta = []
for file_path in all_managed_files:
if not file_path.is_file() or file_path in active_files:
continue
try:
mtime = datetime.utcfromtimestamp(file_path.stat().st_mtime)
if mtime < max_age_limit:
files_to_delete.add(file_path)
else:
files_with_meta.append({'path': file_path, 'mtime': mtime, 'size': file_path.stat().st_size})
except FileNotFoundError:
continue
current_total_size = sum(f.stat().st_size for f in all_managed_files if f.is_file())
if current_total_size > max_size_bytes:
files_with_meta.sort(key=lambda x: x['mtime'])
size_to_free = current_total_size - max_size_bytes
freed_size = 0
for file_meta in files_with_meta:
if freed_size >= size_to_free:
break
if file_meta['path'] not in files_to_delete and file_meta['path'] not in active_files:
files_to_delete.add(file_meta['path'])
freed_size += file_meta['size']
deleted_count = 0
for file_path in files_to_delete:
try:
file_path.unlink()
deleted_count += 1
if str(file_path).startswith(str(UPLOADS_DIR)):
file_hash_to_del = file_path.stem
FILES_DB.pop(file_hash_to_del, None)
elif str(file_path).startswith(str(OUTPUTS_DIR)) or str(file_path).startswith(str(LOGS_DIR)):
task_id_to_del = file_path.stem
TASKS_DB.pop(task_id_to_del, None)
except Exception as e:
print(f"Ошибка при удалении файла {file_path}: {e}")
if deleted_count > 0:
print(f"Очистка завершена. Удалено {deleted_count} файлов.")
# --- Инициализация FastAPI приложения ---
app = FastAPI(
title="ab-av1 API Server",
description="REST API для асинхронного кодирования видео. **Интерактивная документация: `/docs`**.",
version="2.8.0", # Версия обновлена
)
@app.on_event("startup")
async def on_startup():
"""Создаем директории, проверяем API_KEY и запускаем планировщик."""
if not API_KEY:
print("КРИТИЧЕСКАЯ ОШИБКА: Переменная окружения API_KEY не установлена. Сервер не сможет авторизовать запросы.")
else:
print("Ключ API успешно загружен.")
for dir_path in [UPLOADS_DIR, OUTPUTS_DIR, LOGS_DIR]:
dir_path.mkdir(parents=True, exist_ok=True)
scheduler.add_job(cleanup_files, 'interval', minutes=CLEANUP_INTERVAL_MINUTES)
scheduler.start()
print("Планировщик очистки файлов запущен.")
@app.on_event("shutdown")
async def on_shutdown():
"""Останавливаем планировщик и активные процессы при выключении."""
scheduler.shutdown()
print("Планировщик остановлен.")
for task_id, task in TASKS_DB.items():
if task.get('process') and task['status'] == 'running':
print(f"Остановка процесса для задачи {task_id}...")
task['process'].terminate()
try:
task['process'].wait(timeout=5)
except subprocess.TimeoutExpired:
task['process'].kill()
# --- Вспомогательные функции ---
def run_simple_task(task_id: str, command: list[str], log_path: Path):
"""Выполняет простую одношаговую команду в фоне (для encode и crf-search)."""
task = TASKS_DB[task_id]
task["started_at"] = datetime.utcnow()
task["status"] = "running"
try:
process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
text=True, encoding='utf-8', bufsize=1, preexec_fn=os.setsid
)
task['process'] = process
with open(log_path, "w", encoding='utf-8') as log_file:
log_file.write(f"Executing command: {' '.join(command)}\n\n")
for line in iter(process.stdout.readline, ''):
log_file.write(line)
log_file.flush()
task["last_log_line"] = line.strip()
process.wait()
if task.get('was_cancelled'):
task["status"] = "cancelled"
elif process.returncode == 0:
task["status"] = "completed"
else:
task["status"] = "failed"
task["last_log_line"] = f"Процесс завершился с кодом {process.returncode}"
except Exception as e:
task["status"] = "failed"
error_message = f"Произошло исключение Python: {e}"
task["last_log_line"] = error_message
with open(log_path, "a", encoding='utf-8') as log_file:
log_file.write(f"\n--- PYTHON EXCEPTION ---\n{error_message}")
finally:
task["finished_at"] = datetime.utcnow()
task.pop('process', None)
def run_auto_encode_workflow(task_id: str, request: AutoEncodeRequest):
"""
Выполняет сложный воркфлоу для auto-encode с проверкой VFR.
"""
task = TASKS_DB[task_id]
log_path = Path(task["log_path"])
original_input_path = Path(FILES_DB[request.input_hash]["path"])
temp_cfr_path = None
def _log_message(message: str, level: str = "INFO"):
line = f"[{datetime.utcnow().strftime('%H:%M:%S')}] [{level}] {message}\n"
print(f"Task {task_id}: {line.strip()}")
with open(log_path, "a", encoding='utf-8') as log_file:
log_file.write(line)
task["last_log_line"] = message
def _run_sub_task(command: list[str], sub_task_name: str) -> str:
_log_message(f"Запуск подзадачи: {sub_task_name}")
_log_message(f"Команда: {' '.join(command)}")
process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
text=True, encoding='utf-8', bufsize=1, preexec_fn=os.setsid
)
task['process'] = process
full_output = []
with open(log_path, "a", encoding='utf-8') as log_file:
for line in iter(process.stdout.readline, ''):
log_file.write(line)
log_file.flush()
full_output.append(line)
if line.strip():
task["last_log_line"] = line.strip()
process.wait()
task.pop('process', None)
if process.returncode != 0:
raise RuntimeError(f"Подзадача '{sub_task_name}' завершилась с ошибкой (код: {process.returncode}). Смотрите лог для деталей.")
return "".join(full_output)
try:
task["status"] = "running"
task["started_at"] = datetime.utcnow()
with open(log_path, "w", encoding='utf-8') as f:
f.write(f"Starting auto-encode workflow for task {task_id}\n")
if not shutil.which("mediainfo"):
raise RuntimeError("Утилита 'mediainfo' не найдена в системном PATH. Пожалуйста, установите ее.")
_log_message("Шаг 1/4: Проверка на Variable Frame Rate (VFR)")
vfr_check_cmd = ["mediainfo", "--Inform=Video;%FrameRate_Mode%", str(original_input_path)]
vfr_check = subprocess.run(vfr_check_cmd, capture_output=True, text=True, check=True)
frame_rate_mode = vfr_check.stdout.strip()
crf_search_input = original_input_path
if "VFR" in frame_rate_mode.upper():
_log_message(f"Обнаружен VFR ({frame_rate_mode}). Начало конвертации в CFR.")
max_fps_res_cmd = ["mediainfo", "--Inform=Video;%FrameRate_Maximum%", str(original_input_path)]
max_fps_res = subprocess.run(max_fps_res_cmd, capture_output=True, text=True, check=True)
target_fps = math.ceil(float(max_fps_res.stdout.strip()))
_log_message(f"Целевой FPS для CFR: {target_fps}")
temp_cfr_path = OUTPUTS_DIR / f"{task_id}_temp_cfr.mp4"
# --- ИЗМЕНЕНИЕ: Используются переменные из конфигурации ---
cfr_cmd = [
"ffmpeg", "-i", str(original_input_path), "-vf", f"fps={target_fps}",
"-c:v", "libsvtav1",
"-crf", str(VFR_TO_CFR_CRF),
"-preset", str(VFR_TO_CFR_PRESET),
"-c:a", "copy", "-c:s", "copy", str(temp_cfr_path)
]
_run_sub_task(cfr_cmd, "VFR в CFR конвертация")
crf_search_input = temp_cfr_path
_log_message("Конвертация в CFR успешно завершена.")
else:
_log_message(f"Обнаружен CFR ({frame_rate_mode}). Конвертация не требуется.")
_log_message("Шаг 2/4: Поиск оптимального CRF (crf-search)")
preset_value = request.preset.value if isinstance(request.preset, Enum) else str(request.preset)
search_cmd = [
"ab-av1", "crf-search", "-i", str(crf_search_input),
"-e", request.encoder.value, "--preset", preset_value,
"--min-vmaf", str(request.min_vmaf)
]
if request.extra_options:
search_cmd.extend(request.extra_options.split())
search_output = _run_sub_task(search_cmd, "Поиск CRF")
found_crf = None
match = re.search(r"encode .*--crf ([\d.]+)", search_output)
if match:
found_crf = match.group(1)
else:
match = re.search(r"^crf ([\d.]+) VMAF", search_output, re.MULTILINE)
if match:
found_crf = match.group(1)
if not found_crf:
raise RuntimeError("Не удалось найти рекомендованный CRF в выводе crf-search.")
_log_message(f"Найден оптимальный CRF: {found_crf}")
_log_message("Шаг 3/4: Финальное кодирование")
final_output_path = Path(task["output_path"])
encode_cmd = [
"ab-av1", "encode", "-i", str(crf_search_input), "-o", str(final_output_path),
"--crf", found_crf, "-e", request.encoder.value, "--preset", preset_value
]
if request.extra_options:
encode_cmd.extend(request.extra_options.split())
_run_sub_task(encode_cmd, "Финальное кодирование")
task["status"] = "completed"
_log_message("Шаг 4/4: Задача успешно завершена!")
except Exception as e:
task["status"] = "failed"
error_message = f"Воркфлоу завершился с ошибкой: {e}"
_log_message(error_message, level="ERROR")
finally:
if temp_cfr_path and temp_cfr_path.exists():
_log_message("Очистка временных файлов...")
temp_cfr_path.unlink()
task["finished_at"] = datetime.utcnow()
task.pop('process', None)
def create_task(task_type: str, request: BaseTaskRequest, background_tasks: BackgroundTasks):
"""Общая функция для создания и запуска любого типа задачи."""
if request.input_hash not in FILES_DB:
raise HTTPException(status_code=404, detail=f"Файл с хешем {request.input_hash} не найден.")
input_path = Path(FILES_DB[request.input_hash]["path"])
if not input_path.exists():
raise HTTPException(status_code=404, detail=f"Исходный файл для хеша {request.input_hash} был удален.")
task_id = str(uuid.uuid4())
output_path = OUTPUTS_DIR / f"{task_id}.mp4"
log_path = LOGS_DIR / f"{task_id}.log"
command = []
command_str = ""
if task_type in ["encode", "crf-search"]:
preset_value = request.preset.value if isinstance(request.preset, Enum) else str(request.preset)
command = ["ab-av1", task_type, "-i", str(input_path)]
command.extend(["-e", request.encoder.value, "--preset", preset_value])
if task_type == "encode":
crf_value = request.crf if hasattr(request, 'crf') else 20
command.extend(["-o", str(output_path), "--crf", str(crf_value)])
else: # crf-search
min_vmaf_value = request.min_vmaf if hasattr(request, 'min_vmaf') else 96
command.extend(["--min-vmaf", str(min_vmaf_value)])
if request.extra_options:
command.extend(request.extra_options.split())
command_str = " ".join(command)
else:
command_str = f"auto-encode workflow for {request.input_hash}"
TASKS_DB[task_id] = {
"task_id": task_id,
"task_type": task_type,
"status": "pending",
"input_hash": request.input_hash,
"command": command_str,
"created_at": datetime.utcnow(),
"started_at": None,
"finished_at": None,
"output_path": str(output_path) if task_type != "crf-search" else None,
"log_path": str(log_path),
"last_log_line": "Задача поставлена в очередь.",
"encoder": request.encoder,
"preset": request.preset,
"extra_options": request.extra_options,
"crf": getattr(request, 'crf', None),
"min_vmaf": getattr(request, 'min_vmaf', None),
}
if task_type == "auto-encode":
background_tasks.add_task(run_auto_encode_workflow, task_id, request)
else:
background_tasks.add_task(run_simple_task, task_id, command, log_path)
return TaskCreateResponse(
message=f"Задача '{task_type}' успешно создана.",
task_id=task_id, status_url=f"/tasks/{task_id}/status",
log_url=f"/tasks/{task_id}/log", manage_url="/manage"
)
# --- Эндпоинты API ---
@app.get("/", summary="Статус API")
def read_root():
return {"status": "ok"}
@app.post("/upload", response_model=UploadResponse, summary="Загрузка видеофайла", dependencies=[Depends(verify_token)])
async def upload_file(file: UploadFile = File(...)):
contents = await file.read()
file_hash = hashlib.sha256(contents).hexdigest()
if file_hash in FILES_DB and Path(FILES_DB[file_hash]["path"]).exists():
return UploadResponse(
message="Файл с таким хешем уже существует.",
file_hash=file_hash,
file_path=FILES_DB[file_hash]["path"],
original_filename=FILES_DB[file_hash]["original_filename"],
is_new=False
)
file_extension = Path(file.filename).suffix
saved_path = UPLOADS_DIR / f"{file_hash}{file_extension}"
with open(saved_path, "wb") as f:
f.write(contents)
FILES_DB[file_hash] = {
"path": str(saved_path),
"original_filename": file.filename,
"uploaded_at": datetime.utcnow(),
"size": len(contents)
}
return UploadResponse(
message="Файл успешно загружен.",
file_hash=file_hash,
file_path=str(saved_path),
original_filename=file.filename,
is_new=True
)
@app.post("/tasks/auto-encode", response_model=TaskCreateResponse, status_code=202, summary="Запуск задачи auto-encode с логикой VFR", dependencies=[Depends(verify_token)])
async def task_auto_encode(request: AutoEncodeRequest, background_tasks: BackgroundTasks):
return create_task("auto-encode", request, background_tasks)
@app.post("/tasks/crf-search", response_model=TaskCreateResponse, status_code=202, summary="Запуск задачи crf-search", dependencies=[Depends(verify_token)])
async def task_crf_search(request: CrfSearchRequest, background_tasks: BackgroundTasks):
return create_task("crf-search", request, background_tasks)
@app.post("/tasks/encode", response_model=TaskCreateResponse, status_code=202, summary="Запуск задачи encode", dependencies=[Depends(verify_token)])
async def task_encode(request: EncodeRequest, background_tasks: BackgroundTasks):
return create_task("encode", request, background_tasks)
@app.get("/tasks/{task_id}/status", response_model=TaskStatusResponse, summary="Получить статус задачи", dependencies=[Depends(verify_token)])
def get_task_status(task_id: str = FastApiPath(..., description="ID задачи")):
if task_id not in TASKS_DB:
raise HTTPException(status_code=404, detail="Задача не найдена.")
task_info = TASKS_DB[task_id].copy()
task_info.pop('process', None)
return task_info
@app.get("/tasks/{task_id}/log", summary="Получить полный лог задачи", dependencies=[Depends(verify_token)])
def get_task_log(task_id: str = FastApiPath(..., description="ID задачи")):
if task_id not in TASKS_DB:
raise HTTPException(status_code=404, detail="Задача не найдена.")
log_path = Path(TASKS_DB[task_id]["log_path"])
if not log_path.exists():
return JSONResponse(status_code=404, content={"detail": "Файл лога не найден."})
return FileResponse(log_path, media_type="text/plain", filename=log_path.name)
@app.get("/download/{task_id}", summary="Скачать результат кодирования", dependencies=[Depends(verify_token)])
def download_result(task_id: str = FastApiPath(..., description="ID задачи")):
if task_id not in TASKS_DB:
raise HTTPException(status_code=404, detail="Задача не найдена.")
task = TASKS_DB[task_id]
if task["status"] != "completed":
raise HTTPException(status_code=400, detail=f"Задача еще не завершена. Статус: {task['status']}.")
output_path = task.get("output_path")
if not output_path or not Path(output_path).exists():
raise HTTPException(status_code=404, detail="Выходной файл не найден.")
output_path = Path(output_path)
input_hash = task["input_hash"]
original_filename = Path(FILES_DB[input_hash]["original_filename"]).stem
return FileResponse(output_path, filename=f"{original_filename}_{task['task_type']}_{task_id[:8]}{output_path.suffix}")
# --- Эндпоинты для управления ---
@app.get("/manage", summary="Получить списки задач и файлов (JSON)", dependencies=[Depends(verify_token)])
async def get_management_page():
"""
Возвращает JSON со списками всех задач и загруженных файлов.
"""
tasks_list = []
for task_id, task_data in TASKS_DB.items():
task_info = task_data.copy()
task_info.pop('process', None)
tasks_list.append(task_info)
sorted_tasks = sorted(tasks_list, key=lambda x: x['created_at'], reverse=True)
return {"tasks": sorted_tasks, "files": FILES_DB}
@app.post("/manage/task/{task_id}/cancel", summary="Прервать выполняющуюся задачу", dependencies=[Depends(verify_token)])
def cancel_task(task_id: str = FastApiPath(..., description="ID задачи для прерывания")):
if task_id not in TASKS_DB:
raise HTTPException(status_code=404, detail="Задача не найдена.")
task = TASKS_DB[task_id]
if task['status'] != 'running' or 'process' not in task:
raise HTTPException(status_code=400, detail="Задачу нельзя прервать (она не выполняется).")
print(f"Прерывание задачи {task_id} (PID: {task['process'].pid})...")
task['was_cancelled'] = True
os.killpg(os.getpgid(task['process'].pid), signal.SIGTERM)
return {"message": f"Команда на прерывание задачи {task_id} отправлена."}
@app.delete("/manage/task/{task_id}", summary="Удалить задачу и ее файлы", dependencies=[Depends(verify_token)])
def delete_task(task_id: str = FastApiPath(..., description="ID задачи для удаления")):
if task_id not in TASKS_DB:
raise HTTPException(status_code=404, detail="Задача не найдена.")
task = TASKS_DB[task_id]
if task['status'] == 'running':
raise HTTPException(status_code=400, detail="Сначала прервите выполняющуюся задачу.")
if task.get('output_path') and Path(task['output_path']).exists():
Path(task['output_path']).unlink(missing_ok=True)
if task.get('log_path') and Path(task['log_path']).exists():
Path(task['log_path']).unlink(missing_ok=True)
del TASKS_DB[task_id]
return {"message": f"Задача {task_id} и ее файлы были удалены."}
@app.delete("/manage/file/{file_hash}", summary="Удалить загруженный файл", dependencies=[Depends(verify_token)])
def delete_file(file_hash: str = FastApiPath(..., description="Хеш файла для удаления")):
if file_hash not in FILES_DB:
raise HTTPException(status_code=404, detail="Файл не найден.")
for task in TASKS_DB.values():
if task['input_hash'] == file_hash and task['status'] in ['running', 'pending']:
raise HTTPException(status_code=400, detail=f"Нельзя удалить файл, так как он используется активной задачей {task['task_id']}.")
file_path = Path(FILES_DB[file_hash]['path'])
if file_path.exists():
file_path.unlink()
del FILES_DB[file_hash]
return {"message": f"Файл {file_hash} был удален."}