import os import hashlib from typing import List from io import BytesIO from fastapi import APIRouter, UploadFile, File, HTTPException, Depends, BackgroundTasks from fastapi.concurrency import run_in_threadpool from services.document_service import check_if_hash_exists from services.document_service import full_process_and_ingest_pipeline,convert_to_text_content from config import ALLOWED_EXTENSIONS from dependencies import get_current_user from schemas.user import UserOut import logging logger = logging.getLogger(__name__) router = APIRouter() def calculate_hash_from_content(content: bytes) -> str: """Tính toán SHA256 hash từ nội dung bytes của file.""" return hashlib.sha256(content).hexdigest() @router.post("/upload", status_code=202) async def upload_and_ingest_documents( background_tasks: BackgroundTasks, current_user: UserOut = Depends(get_current_user), files: List[UploadFile] = File(..., description="Một hoặc nhiều file tài liệu cần upload.") ): """ Endpoint đã được tối ưu để upload và xử lý tài liệu. - Xử lý file trong bộ nhớ để tránh lỗi permission và tăng tốc. - Trích xuất text ngay trong request để báo lỗi sớm (fail-fast). - Đẩy các tác vụ xử lý nặng (NLP, embedding, ingest) vào nền. """ if not files: raise HTTPException(status_code=400, detail="No files were uploaded.") accepted_files = [] skipped_files = [] for file in files: try: # 1. KIỂM TRA ĐỊNH DẠNG file_extension = os.path.splitext(file.filename)[1].lower() if file_extension not in ALLOWED_EXTENSIONS: skipped_files.append({"filename": file.filename, "reason": "Unsupported file type"}) continue # 2. ĐỌC NỘI DUNG FILE VÀO BỘ NHỚ file_content = await file.read() if not file_content: skipped_files.append({"filename": file.filename, "reason": "Empty file"}) continue # 3. TÍNH HASH & KIỂM TRA TRÙNG LẶP file_hash = calculate_hash_from_content(file_content) if await check_if_hash_exists(file_hash): skipped_files.append({"filename": file.filename, "reason": "Duplicate file (content already exists)"}) continue # 4. TRÍCH XUẤT TEXT (FAIL-FAST) - Chạy trong thread pool để không block # Hàm convert_to_text_content cần được sửa để nhận BytesIO try: raw_content = await run_in_threadpool(convert_to_text_content, BytesIO(file_content), file.filename) if not raw_content or not raw_content.strip(): raise ValueError("Extracted content is empty.") except Exception as extraction_error: logger.error(f"Failed to extract text from {file.filename}: {extraction_error}") skipped_files.append({"filename": file.filename, "reason": f"Failed to extract content: {extraction_error}"}) continue # 5. THÊM TÁC VỤ NỀN VỚI DỮ LIỆU TEXT # Chỉ truyền dữ liệu cơ bản, không truyền đối tượng phức tạp background_tasks.add_task( full_process_and_ingest_pipeline, raw_content=raw_content, filename=file.filename, file_hash=file_hash ) accepted_files.append({"filename": file.filename, "hash": file_hash}) except Exception as e: logger.error(f"Error preparing {file.filename} for background processing: {e}", exc_info=True) skipped_files.append({"filename": file.filename, "reason": f"Unexpected server error: {str(e)}"}) if not accepted_files: raise HTTPException( status_code=400, detail={"message": "No valid new files were accepted for processing.", "skipped_files": skipped_files} ) return { "message": f"Request received. Accepted {len(accepted_files)} files for background processing.", "accepted_files": accepted_files, "skipped_files": skipped_files }