Spaces:
Sleeping
Sleeping
from langchain_community.document_loaders import PyPDFLoader, UnstructuredWordDocumentLoader, TextLoader | |
from langchain_text_splitters import RecursiveCharacterTextSplitter | |
from langchain_core.documents import Document | |
from app.models import Embedder | |
from app.chunks import Chunk | |
import nltk # used for proper tokenizer workflow | |
from uuid import uuid4 # for generating unique id as hex (uuid4 is used as it generates ids form pseudo random numbers unlike uuid1 and others) | |
import numpy as np | |
from app.settings import logging, text_splitter_config, embedder_model | |
# TODO: replace PDFloader since it is completely unusable OR try to fix it | |
class DocumentProcessor: | |
''' | |
TODO: determine the most suitable chunk size | |
chunks -> the list of chunks from loaded files | |
chunks_unsaved -> the list of recently added chunks that have not been saved to db yet | |
processed -> the list of files that were already splitted into chunks | |
upprocessed -> !processed | |
text_splitter -> text splitting strategy | |
''' | |
def __init__(self): | |
self.chunks: list[Chunk] = [] | |
self.chunks_unsaved: list[Chunk] = [] | |
self.processed: list[Document] = [] | |
self.unprocessed: list[Document] = [] | |
self.embedder = Embedder(embedder_model) | |
self.text_splitter = RecursiveCharacterTextSplitter(**text_splitter_config) | |
''' | |
Measures cosine between two vectors | |
''' | |
def cosine_similarity(self, vec1, vec2): | |
return vec1 @ vec2 / (np.linalg.norm(vec1) * np.linalg.norm(vec2)) | |
''' | |
Updates a list of the most relevant chunks without interacting with db | |
''' | |
def update_most_relevant_chunk(self, chunk: list[np.float64, Chunk], relevant_chunks: list[list[np.float64, Chunk]], | |
mx_len=15): | |
relevant_chunks.append(chunk) | |
for i in range(len(relevant_chunks) - 1, 0, -1): | |
if relevant_chunks[i][0] > relevant_chunks[i - 1][0]: | |
relevant_chunks[i], relevant_chunks[i - 1] = relevant_chunks[i - 1], relevant_chunks[i] | |
else: | |
break | |
if len(relevant_chunks) > mx_len: | |
del relevant_chunks[-1] | |
''' | |
Loads one file - extracts text from file | |
TODO: Replace UnstructuredWordDocumentLoader with Docx2txtLoader | |
TODO: Play with .pdf and text from img extraction | |
TODO: Try chunking with llm | |
add_to_unprocessed -> used to add loaded file to the list of unprocessed(unchunked) files if true | |
''' | |
def load_document(self, filepath: str, add_to_unprocessed: bool = False) -> list[Document]: | |
loader = None | |
if filepath.endswith(".pdf"): | |
loader = PyPDFLoader( | |
file_path=filepath) # splits each presentation into slides and processes it as separate file | |
elif filepath.endswith(".docx") or filepath.endswith(".doc"): | |
# loader = Docx2txtLoader(file_path=filepath) ## try it later, since UnstructuredWordDocumentLoader is extremly slow | |
loader = UnstructuredWordDocumentLoader(file_path=filepath) | |
elif filepath.endswith(".txt"): | |
loader = TextLoader(file_path=filepath) | |
if loader is None: | |
raise RuntimeError("Unsupported type of file") | |
documents: list[ | |
Document] = [] # We can not assign a single value to the document since .pdf are splitted into several files | |
try: | |
documents = loader.load() | |
except Exception: | |
raise RuntimeError("File is corrupted") | |
if add_to_unprocessed: | |
for doc in documents: | |
self.unprocessed.append(doc) | |
return documents | |
''' | |
Similar to load_document, but for multiple files | |
add_to_unprocessed -> used to add loaded files to the list of unprocessed(unchunked) files if true | |
''' | |
def load_documents(self, documents: list[str], add_to_unprocessed: bool = False) -> list[Document]: | |
extracted_documents: list[Document] = [] | |
for doc in documents: | |
temp_storage: list[Document] = [] | |
try: | |
temp_storage = self.load_document(filepath=doc, | |
add_to_unprocessed=False) # In some cases it should be True, but i can not imagine any :( | |
except Exception as e: | |
logging.error("Error at load_documents while loading %s", doc, exc_info=e) | |
continue | |
for extrc_doc in temp_storage: | |
extracted_documents.append(extrc_doc) | |
if add_to_unprocessed: | |
self.unprocessed.append(extrc_doc) | |
return extracted_documents | |
''' | |
Generates chunks with recursive splitter from the list of unprocessed files, add files to the list of processed, and clears unprocessed | |
TODO: try to split text with other llm (not really needed, but we should at least try it) | |
''' | |
def generate_chunks(self, query: str = "", embedding: bool = False): | |
most_relevant = [] | |
if embedding: | |
query_embedded = self.embedder.encode(query) | |
for document in self.unprocessed: | |
self.processed.append(document) | |
text: list[str] = self.text_splitter.split_documents([document]) | |
lines: list[str] = document.page_content.split("\n") | |
for chunk in text: | |
start_l, end_l = self.get_start_end_lines( | |
splitted_text=lines, | |
start_char=chunk.metadata.get("start_index", 0), | |
end_char=chunk.metadata.get("start_index", 0) + len(chunk.page_content) | |
) | |
newChunk = Chunk( | |
id=uuid4(), | |
filename=document.metadata.get("source", ""), | |
page_number=document.metadata.get("page", 0), | |
start_index=chunk.metadata.get("start_index", 0), | |
start_line=start_l, | |
end_line=end_l, | |
text=chunk.page_content | |
) | |
if embedding: | |
chunk_embedded = self.embedder.encode(newChunk.text) | |
similarity = self.cosine_similarity(query_embedded, chunk_embedded) | |
self.update_most_relevant_chunk([similarity, newChunk], most_relevant) | |
self.chunks.append(newChunk) | |
self.chunks_unsaved.append(newChunk) | |
self.unprocessed = [] | |
print(len(self.chunks_unsaved)) | |
return most_relevant | |
''' | |
Determines the line, were the chunk starts and ends (1-based indexing) | |
Some magic stuff here. To be honest, i understood it after 7th attempt | |
TODO: invent more efficient way | |
splitted_text -> original text splitted by \n | |
start_char -> index of symbol, were current chunk starts | |
end_char -> index of symbol, were current chunk ends | |
debug_mode -> flag, which enables printing useful info about the process | |
''' | |
def get_start_end_lines(self, splitted_text: list[str], start_char: int, end_char: int, debug_mode: bool = False) -> \ | |
tuple[int, int]: | |
if debug_mode: | |
logging.info(splitted_text) | |
start, end, char_ct = 0, 0, 0 | |
iter_count = 1 | |
for i, line in enumerate(splitted_text): | |
if debug_mode: | |
logging.info( | |
f"start={start_char}, current={char_ct}, end_current={char_ct + len(line) + 1}, end={end_char}, len={len(line)}, iter={iter_count}\n") | |
if char_ct <= start_char <= char_ct + len(line) + 1: | |
start = i + 1 | |
if char_ct <= end_char <= char_ct + len(line) + 1: | |
end = i + 1 | |
break | |
iter_count += 1 | |
char_ct += len(line) + 1 | |
if debug_mode: | |
logging.info(f"result => {start} {end}\n\n\n") | |
return start, end | |
''' | |
Note: it should be used only once to download tokenizers, futher usage is not recommended | |
''' | |
def update_nltk(self) -> None: | |
nltk.download('punkt') | |
nltk.download('averaged_perceptron_tagger') | |
''' | |
For now the system works as follows: we save recently loaded chunks in two arrays: | |
chunks - for all chunks, even for that ones that havn't been saveed to db | |
chunks_unsaved - for chunks that have been added recently | |
I do not know weather we really need to store all chunks that were added in the | |
current session, but chunks_unsaved are used to avoid dublications while saving to db. | |
''' | |
def clear_unsaved_chunks(self): | |
self.chunks_unsaved = [] | |
def get_all_chunks(self) -> list[Chunk]: | |
return self.chunks | |
''' | |
If we want to save chunks to db, we need to clear the temp storage to avoid dublications | |
''' | |
def get_and_save_unsaved_chunks(self) -> list[Chunk]: | |
chunks_copy: list[Chunk] = self.chunks.copy() | |
self.clear_unsaved_chunks() | |
return chunks_copy | |