test / app /api.py
Andrchest's picture
final try 67
145f400
from fastapi import FastAPI, UploadFile, Form, File, HTTPException, Response, Request, Depends
import uuid
from app.backend.models.users import User
from fastapi.staticfiles import StaticFiles
import os
from app.rag_generator import RagSystem
from fastapi.responses import HTMLResponse, FileResponse, RedirectResponse
from app.settings import base_path, url_user_not_required
from typing import Optional
from app.response_parser import add_links
from app.document_validator import path_is_valid
from app.backend.controllers.users import create_user, authenticate_user, check_cookie, clear_cookie, get_current_user
from app.backend.controllers.schemas import SUser
from app.backend.controllers.chats import create_new_chat
from fastapi.templating import Jinja2Templates
from app.backend.models.db_service import check_tables
# TODO: implement a better TextHandler
# TODO: optionally implement DocHandler
print(3333333333333333333333333333333333333333333333333333)
print(os.environ['DATABASE_URL'])
print(os.environ['GEMINI_API_KEY'])
api = FastAPI()
rag = None
api.mount("/pdfs", StaticFiles(directory=os.path.join(base_path, "temp_storage", "pdfs")), name="pdfs")
api.mount("/static", StaticFiles(directory=os.path.join(base_path, "frontend", "static")), name="static")
templates = Jinja2Templates(directory=os.path.join(base_path, "frontend", "templates"))
print(check_tables())
def initialize_rag() -> RagSystem:
global rag
if rag is None:
rag = RagSystem()
return rag
'''
Updates response context and adds context of navbar (role, instance(or none)) and footer (none)
'''
def extend_context(context: dict):
user = get_current_user(context.get("request"))
navbar = {
"navbar": True,
"navbar_path": "components/navbar.html",
"navbar_context": {
"user": {
"role": "user" if user else "guest",
"instance": user
}
}
}
footer = {
"footer": False,
"footer_context": None
}
context.update(**navbar)
context.update(**footer)
return context
def PDFHandler(request: Request, path: str, page: int) -> HTMLResponse:
filename = os.path.basename(path)
url_path = f"/pdfs/{filename}"
current_template = "pages/show_pdf.html"
return templates.TemplateResponse(
current_template,
extend_context({
"request": request,
"page": str(page or 1),
"url_path": url_path,
"user": get_current_user(request)
})
)
def TextHandler(request: Request, path: str, lines: str) -> HTMLResponse:
file_content = ""
with open(path, "r") as f:
file_content = f.read()
start_line, end_line = map(int, lines.split('-'))
text_before_citation = []
text_after_citation = []
citation = []
anchor_added = False
for index, line in enumerate(file_content.split('\n')):
if line == "" or line == "\n":
continue
if index + 1 < start_line:
text_before_citation.append(line)
elif end_line < index + 1:
text_after_citation.append(line)
else:
anchor_added = True
citation.append(line)
current_template = "pages/show_text.html"
return templates.TemplateResponse(
current_template,
extend_context({
"request": request,
"text_before_citation": text_before_citation,
"text_after_citation": text_after_citation,
"citation": citation,
"anchor_added": anchor_added,
"user": get_current_user(request)
})
)
'''
Optional handler
'''
def DocHandler():
pass
# # <--------------------------------- Middleware --------------------------------->
# # NOTE: carefully read documentation to require_user
#
# '''
# Special class to have an opportunity to redirect user to login page in middleware
# '''
# class AwaitableResponse:
# def __init__(self, response: Response):
# self.response = response
#
# def __await__(self):
# yield
# return self.response
#
#
# '''
# TODO: remove KOSTYLY -> find better way to skip requesting to login while showing pdf
#
# Middleware that requires user to log in into the system before accessing any utl
#
# NOTE: For now it is applied to all routes, but if you want to skip any, add it to the
# url_user_not_required list in settings.py (/ should be removed)
# '''
# @api.middleware("http")
# async def require_user(request: Request, call_next):
# print(request.url.path, request.method)
#
# awaitable_response = AwaitableResponse(RedirectResponse("/login", status_code=303))
# stripped_path = request.url.path.strip('/')
#
# if stripped_path in url_user_not_required \
# or stripped_path.startswith("pdfs") \
# or "static/styles.css" in stripped_path \
# or "favicon.ico" in stripped_path:
# return await call_next(request)
#
# user = get_current_user(request)
# if user is None:
# return await awaitable_response
#
# response = await call_next(request)
# return response
@api.middleware("http")
def print_rout(request: Request, call_next):
print(request.url, request.method)
return call_next(request)
# <--------------------------------- Common routes --------------------------------->
# @api.get("/")
# def root(request: Request):
# current_template = "pages/main.html"
# return templates.TemplateResponse(current_template, extend_context({"request": request}))
@api.get("/")
def root(request: Request):
current_template = "pages/chat.html"
return templates.TemplateResponse(current_template,
extend_context({
"request": request,
"user": get_current_user(request)
})
)
@api.post("/message_with_docs")
async def create_prompt(request: Request, files: list[UploadFile] = File(None), prompt: str = Form(...)):
docs = []
rag = initialize_rag()
print(444444444444444444444444444444444444444444444)
try:
for file in files:
content = await file.read()
temp_storage = os.path.join(base_path, "temp_storage")
os.makedirs(temp_storage, exist_ok=True)
if file.filename.endswith('.pdf'):
saved_file = os.path.join(temp_storage, "pdfs", str(uuid.uuid4()) + ".pdf")
else:
saved_file = os.path.join(temp_storage, str(uuid.uuid4()) + "." + file.filename.split('.')[-1])
with open(saved_file, "wb") as f:
f.write(content)
docs.append(saved_file)
if len(files) > 0:
rag.upload_documents(docs)
response_raw = rag.generate_response(user_prompt=prompt)
response = add_links(response_raw)
print(response)
return {"response": response, "status": 200}
except Exception as e:
print("!!!ERROR!!!")
print(e)
# finally:
# for file in files:
# temp_storage = os.path.join(base_path, "temp_storage")
# saved_file = os.path.join(temp_storage, file.filename)
# os.remove(saved_file)
@api.get("/viewer")
def show_document(request: Request, path: str, page: Optional[int] = 1, lines: Optional[str] = "1-1", start: Optional[int] = 0):
if not path_is_valid(path):
return HTTPException(status_code=404, detail="Document not found")
ext = path.split(".")[-1]
if ext == 'pdf':
return PDFHandler(request, path=path, page=page)
elif ext in ('txt', 'csv', 'md'):
return TextHandler(request, path=path, lines=lines)
elif ext in ('docx', 'doc'):
return TextHandler(request, path=path, lines=lines) # should be a bit different handler
else:
return FileResponse(path=path)
# <--------------------------------- Get --------------------------------->
@api.get("/new_user")
def new_user(request: Request):
current_template = "pages/registration.html"
return templates.TemplateResponse(current_template, extend_context({"request": request}))
@api.get("/login")
def login(request: Request):
current_template = "pages/login.html"
return templates.TemplateResponse(current_template, extend_context({"request": request}))
@api.get("/cookie_test")
def test_cookie(request: Request):
return check_cookie(request)
'''
Use only for testing. For now, provides user info for logged ones, and redirects to
login in other case
'''
@api.get("/test")
def test(request: Request, user: User = Depends(get_current_user)):
return {
"user": {
"email": user.email,
"password_hash": user.password_hash,
# "chats": user.chats, # Note: it will rise error since due to the optimization associated fields are not loaded
# it is just a reference, but the session is closed, however you are trying to get access to the data through this session
}
}
@api.get("/chats/id={chat_id}")
def show_chat(chat_id: int):
return {"chat_id": chat_id}
@api.get("/logout")
def logout(response: Response):
return clear_cookie(response)
# <--------------------------------- Post --------------------------------->
@api.post("/new_user")
def new_user(response: Response, user: SUser):
return create_user(response, user.email, user.password)
@api.post("/login")
def login(response: Response, user: SUser):
return authenticate_user(response, user.email, user.password)
@api.post("/new_chat")
def create_chat(request: Request, title: Optional[str] = "new chat", user: User = Depends(get_current_user)):
url = create_new_chat(title, user)
return RedirectResponse(url, status_code=303)