Andrchest's picture
Single commit for HF2
365de9c
from fastapi import (
FastAPI,
UploadFile,
Form,
File,
HTTPException,
Response,
Request,
Depends,
)
from fastapi.responses import (
FileResponse,
RedirectResponse,
StreamingResponse,
JSONResponse,
)
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from app.backend.controllers.users import (
create_user,
authenticate_user,
check_cookie,
clear_cookie,
get_current_user,
get_latest_chat,
)
from app.backend.controllers.chats import (
create_new_chat,
get_chat_with_messages,
update_title,
)
from app.backend.controllers.messages import register_message
from app.backend.schemas import SUser
from app.backend.models.users import User
from app.core.utils import (
TextHandler,
PDFHandler,
protect_chat,
extend_context,
initialize_rag,
save_documents,
construct_collection_name,
create_collection,
)
from app.settings import BASE_DIR, url_user_not_required
from app.core.document_validator import path_is_valid
from app.core.response_parser import add_links
from typing import Optional
import os
# TODO: implement a better TextHandler
# TODO: optionally implement DocHandler
api = FastAPI()
api.mount(
"/chats_storage",
StaticFiles(directory=os.path.join(BASE_DIR, "chats_storage")),
name="chats_storage",
)
api.mount(
"/static",
StaticFiles(directory=os.path.join(BASE_DIR, "app", "frontend", "static")),
name="static",
)
templates = Jinja2Templates(
directory=os.path.join(BASE_DIR, "app", "frontend", "templates")
)
rag = initialize_rag()
# NOTE: carefully read documentation to require_user
# <--------------------------------- Middleware --------------------------------->
"""
Special class to have an opportunity to redirect user to login page in middleware
"""
class AwaitableResponse:
def __init__(self, response: Response):
self.response = response
def __await__(self):
yield
return self.response
"""
TODO: remove KOSTYLY -> find better way to skip requesting to login while showing pdf
Middleware that requires user to log in into the system before accessing any utl
NOTE: For now it is applied to all routes, but if you want to skip any, add it to the
url_user_not_required list in settings.py (/ should be removed)
"""
@api.middleware("http")
async def require_user(request: Request, call_next):
print(request.url.path, request.method, request.url.port)
awaitable_response = AwaitableResponse(RedirectResponse("/login", status_code=303))
stripped_path = request.url.path.strip("/")
if (
stripped_path in url_user_not_required
or stripped_path.startswith("pdfs")
or "static/styles.css" in stripped_path
or "favicon.ico" in stripped_path
):
return await call_next(request)
user = get_current_user(request)
if user is None:
return await awaitable_response
response = await call_next(request)
return response
# <--------------------------------- Common routes --------------------------------->
@api.get("/health")
async def health_check():
return {"status": "ok"}
@api.get("/")
def root(request: Request):
current_template = "pages/main.html"
return templates.TemplateResponse(
current_template, extend_context({"request": request})
)
@api.post("/message_with_docs")
async def send_message(
request: Request,
files: list[UploadFile] = File(None),
prompt: str = Form(...),
chat_id=Form(None),
user: User = Depends(get_current_user),
) -> StreamingResponse:
# response = ""
status = 200
try:
collection_name = construct_collection_name(user, chat_id)
register_message(content=prompt, sender="user", chat_id=int(chat_id))
await save_documents(
collection_name, files=files, RAG=rag, user=user, chat_id=chat_id
)
# response = rag.generate_response_stream(collection_name=collection_name, user_prompt=prompt, stream=True)
# async def stream_response():
# async for chunk in response:
# yield chunk.json()
return StreamingResponse(
rag.generate_response_stream(
collection_name=collection_name, user_prompt=prompt, stream=True
),
status,
media_type="text/event-stream",
)
except Exception as e:
status = 500
print(e)
@api.post("/replace_message")
async def replace_message(request: Request):
data = await request.json()
updated_message = add_links(data.get("message", ""))
register_message(
content=updated_message, sender="assistant", chat_id=int(data.get("chat_id", 0))
)
return JSONResponse({"updated_message": updated_message})
@api.get("/viewer")
def show_document(
request: Request,
path: str,
page: Optional[int] = 1,
lines: Optional[str] = "1-1",
start: Optional[int] = 0,
):
if not path_is_valid(path):
return HTTPException(status_code=404, detail="Document not found")
ext = path.split(".")[-1]
if ext == "pdf":
return PDFHandler(request, path=path, page=page, templates=templates)
elif ext in ("txt", "csv", "md", "json"):
return TextHandler(request, path=path, lines=lines, templates=templates)
elif ext in ("docx", "doc"):
return TextHandler(
request, path=path, lines=lines, templates=templates
) # should be a bit different handler
else:
return FileResponse(path=path)
# <--------------------------------- Get --------------------------------->
@api.get("/new_user")
def new_user_post(request: Request):
current_template = "pages/registration.html"
return templates.TemplateResponse(
current_template, extend_context({"request": request})
)
@api.get("/login")
def login_get(request: Request):
current_template = "pages/login.html"
return templates.TemplateResponse(
current_template, extend_context({"request": request})
)
@api.get("/cookie_test")
def test_cookie(request: Request):
return check_cookie(request)
"""
Use only for testing. For now, provides user info for logged ones, and redirects to
login in other case
"""
@api.get("/test")
def test(request: Request, user: User = Depends(get_current_user)):
return {
"user": {
"email": user.email,
"password_hash": user.password_hash,
# "chats": user.chats, # Note: it will rise error since due to the optimization associated fields are not loaded
# it is just a reference, but the session is closed, however you are trying to get access to the data through this session
}
}
@api.post("/chats/id={chat_id}/history")
def show_chat_history(request: Request, chat_id: int):
chat = get_chat_with_messages(chat_id)
user = get_current_user(request)
update_title(chat["chat_id"])
if not protect_chat(user, chat_id):
raise HTTPException(401, "Yod do not have rights to use this chat!")
context = chat
return context
@api.get("/chats/id={chat_id}")
def show_chat(request: Request, chat_id: int):
current_template = "pages/chat.html"
chat = get_chat_with_messages(chat_id)
user = get_current_user(request)
update_title(chat["chat_id"])
if not protect_chat(user, chat_id):
raise HTTPException(401, "Yod do not have rights to use this chat!")
context = extend_context({"request": request, "user": user}, selected=chat_id)
context.update(chat)
return templates.TemplateResponse(current_template, context)
@api.get("/logout")
def logout(response: Response):
return clear_cookie(response)
@api.get("/last_user_chat")
def last_user_chat(request: Request, user: User = Depends(get_current_user)):
chat = get_latest_chat(user)
url = None
if chat is None:
print("new_chat")
new_chat = create_new_chat("new chat", user)
url = new_chat.get("url")
try:
create_collection(user, new_chat.get("chat_id"), rag)
except Exception as e:
raise HTTPException(500, e)
else:
url = f"/chats/id={chat.id}"
return RedirectResponse(url, status_code=303)
# <--------------------------------- Post --------------------------------->
@api.post("/new_user")
def new_user(response: Response, user: SUser):
return create_user(response, user.email, user.password)
class LoginData(BaseModel):
email: str
password: str
@api.post("/login")
def login_post(response: Response, user_data: LoginData):
try:
# Validate the user data against the SUser schema for regular users
# This enforces email format and password complexity for non-admins
user_schema = SUser(email=user_data.email, password=user_data.password)
except ValueError as e:
# If validation fails, return a detailed error
raise HTTPException(status_code=422, detail=f"Validation error: {e}")
# If validation passes, proceed with the standard authentication process
return authenticate_user(response, user_schema.email, user_schema.password)
@api.post("/new_chat")
def create_chat(
request: Request,
title: Optional[str] = "new chat",
user: User = Depends(get_current_user),
):
new_chat = create_new_chat(title, user)
url = new_chat.get("url")
chat_id = new_chat.get("chat_id")
if url is None or chat_id is None:
raise HTTPException(500, "New chat was not created")
try:
create_collection(user, chat_id, rag)
except Exception as e:
raise HTTPException(500, e)
return RedirectResponse(url, status_code=303)
if __name__ == "__main__":
pass