PopovDanil's picture
try 1
48ec4db
from app.backend.controllers.chats import list_user_chats, verify_ownership_rights
from app.backend.controllers.users import get_current_user
from app.settings import BASE_DIR, logger, settings
from fastapi.templating import Jinja2Templates
from app.core.rag_generator import RagSystem
from app.backend.models.users import User
from fastapi import Request, UploadFile
from uuid import uuid4
import markdown
import aiofiles
import asyncio
import shutil
import os
rag = None
def initialize_rag() -> RagSystem:
global rag
if rag is None:
print("Start RAG initialization")
rag = RagSystem()
return rag
async def extend_context(context: dict, selected: int = None):
user = await get_current_user(context.get("request"))
navbar = {
"navbar": False,
"navbar_path": "components/navbar.html",
"navbar_context": {
"chats": [],
"user": {"role": "user" if user else "guest", "instance": user},
},
}
sidebar = {
"sidebar": True,
"sidebar_path": "components/sidebar.html",
"sidebar_context": {
"selected": selected if selected is not None else None,
"chat_groups": await list_user_chats(user.id) if user else [],
},
}
footer = {"footer": False, "footer_context": None}
context.update(**navbar)
context.update(**footer)
context.update(**sidebar)
return context
async def protect_chat(user: User, chat_id: str) -> bool:
return await verify_ownership_rights(user, chat_id)
async def save_documents(
files: list[UploadFile],
user: User,
chat_id: str,
) -> list[str]:
storage = os.path.join(
BASE_DIR,
"chats_storage",
f"user_id={user.id}",
f"chat_id={chat_id}",
"documents",
)
docs = []
if files is None or len(files) == 0:
return
await aiofiles.os.makedirs(os.path.join(storage, "pdfs"), exist_ok=True)
if settings.debug:
await logger.info(f"Documents for saving: {len(files)}")
for file in files:
content = await file.read()
if file.filename.endswith(".pdf"):
saved_file = os.path.join(storage, "pdfs", str(uuid4()) + ".pdf")
else:
saved_file = os.path.join(
storage, str(uuid4()) + "." + file.filename.split(".")[-1]
)
async with aiofiles.open(saved_file, "wb") as f:
await f.write(content)
docs.append(saved_file)
return docs
async def get_pdf_path(path: str) -> str:
parts = path.split("chats_storage")
if len(parts) < 2:
return ""
return "chats_storage" + "".join(parts[1:])
async def construct_collection_name(user: User, chat_id: int) -> str:
return f"user_id_{user.id}_chat_id_{chat_id}"
async def create_collection(user: User, chat_id: int, RAG: RagSystem) -> None:
if RAG is None:
raise RuntimeError("RAG was not initialized")
await RAG.create_new_collection(await construct_collection_name(user, chat_id))
if settings.debug:
for collection in await rag.get_collections_names():
await logger.info(collection)
async def lines_to_markdown(lines: list[str]) -> list[str]:
loop = asyncio.get_running_loop()
return await asyncio.gather(*[
loop.run_in_executor(None, markdown.markdown, line)
for line in lines
])
# <----------------------- Handlers ----------------------->
async def PDFHandler(
request: Request, path: str, page: int, templates
) -> Jinja2Templates.TemplateResponse:
url_path = await get_pdf_path(path=path)
if settings.debug:
await logger.info(f"PDF path - {path}, url-path - {url_path}")
current_template = "pages/show_pdf.html"
return templates.TemplateResponse(
current_template,
await extend_context(
{
"request": request,
"page": str(page or 1),
"url_path": url_path,
"user": await get_current_user(request),
}
),
)
async def TextHandler(
request: Request, path: str, lines: str, templates
) -> Jinja2Templates.TemplateResponse:
file_content = ""
async with aiofiles.open(path, "r") as f:
file_content = await f.read()
start_line, end_line = map(int, lines.split("-"))
text_before_citation = []
text_after_citation = []
citation = []
anchor_added = False
for index, line in enumerate(file_content.split("\n")):
if line == "" or line == "\n":
continue
if index + 1 < start_line:
text_before_citation.append(line)
elif end_line < index + 1:
text_after_citation.append(line)
else:
anchor_added = True
citation.append(line)
current_template = "pages/show_text.html"
return templates.TemplateResponse(
current_template,
await extend_context(
{
"request": request,
"text_before_citation": await lines_to_markdown(text_before_citation),
"text_after_citation": await lines_to_markdown(text_after_citation),
"citation": await lines_to_markdown(citation),
"anchor_added": anchor_added,
"user": await get_current_user(request),
}
),
)