test / app /processor.py
Andrchest's picture
final try 1
e53c2d7
from langchain_community.document_loaders import PyPDFLoader, UnstructuredWordDocumentLoader, TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from app.models import Embedder
from app.chunks import Chunk
import nltk # used for proper tokenizer workflow
from uuid import uuid4 # for generating unique id as hex (uuid4 is used as it generates ids form pseudo random numbers unlike uuid1 and others)
import numpy as np
from app.settings import logging, text_splitter_config, embedder_model
# TODO: replace PDFloader since it is completely unusable OR try to fix it
class DocumentProcessor:
'''
TODO: determine the most suitable chunk size
chunks -> the list of chunks from loaded files
chunks_unsaved -> the list of recently added chunks that have not been saved to db yet
processed -> the list of files that were already splitted into chunks
upprocessed -> !processed
text_splitter -> text splitting strategy
'''
def __init__(self):
self.chunks: list[Chunk] = []
self.chunks_unsaved: list[Chunk] = []
self.processed: list[Document] = []
self.unprocessed: list[Document] = []
self.embedder = Embedder(embedder_model)
self.text_splitter = RecursiveCharacterTextSplitter(**text_splitter_config)
'''
Measures cosine between two vectors
'''
def cosine_similarity(self, vec1, vec2):
return vec1 @ vec2 / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
'''
Updates a list of the most relevant chunks without interacting with db
'''
def update_most_relevant_chunk(self, chunk: list[np.float64, Chunk], relevant_chunks: list[list[np.float64, Chunk]],
mx_len=15):
relevant_chunks.append(chunk)
for i in range(len(relevant_chunks) - 1, 0, -1):
if relevant_chunks[i][0] > relevant_chunks[i - 1][0]:
relevant_chunks[i], relevant_chunks[i - 1] = relevant_chunks[i - 1], relevant_chunks[i]
else:
break
if len(relevant_chunks) > mx_len:
del relevant_chunks[-1]
'''
Loads one file - extracts text from file
TODO: Replace UnstructuredWordDocumentLoader with Docx2txtLoader
TODO: Play with .pdf and text from img extraction
TODO: Try chunking with llm
add_to_unprocessed -> used to add loaded file to the list of unprocessed(unchunked) files if true
'''
def load_document(self, filepath: str, add_to_unprocessed: bool = False) -> list[Document]:
loader = None
if filepath.endswith(".pdf"):
loader = PyPDFLoader(
file_path=filepath) # splits each presentation into slides and processes it as separate file
elif filepath.endswith(".docx") or filepath.endswith(".doc"):
# loader = Docx2txtLoader(file_path=filepath) ## try it later, since UnstructuredWordDocumentLoader is extremly slow
loader = UnstructuredWordDocumentLoader(file_path=filepath)
elif filepath.endswith(".txt"):
loader = TextLoader(file_path=filepath)
if loader is None:
raise RuntimeError("Unsupported type of file")
documents: list[
Document] = [] # We can not assign a single value to the document since .pdf are splitted into several files
try:
documents = loader.load()
except Exception:
raise RuntimeError("File is corrupted")
if add_to_unprocessed:
for doc in documents:
self.unprocessed.append(doc)
return documents
'''
Similar to load_document, but for multiple files
add_to_unprocessed -> used to add loaded files to the list of unprocessed(unchunked) files if true
'''
def load_documents(self, documents: list[str], add_to_unprocessed: bool = False) -> list[Document]:
extracted_documents: list[Document] = []
for doc in documents:
temp_storage: list[Document] = []
try:
temp_storage = self.load_document(filepath=doc,
add_to_unprocessed=False) # In some cases it should be True, but i can not imagine any :(
except Exception as e:
logging.error("Error at load_documents while loading %s", doc, exc_info=e)
continue
for extrc_doc in temp_storage:
extracted_documents.append(extrc_doc)
if add_to_unprocessed:
self.unprocessed.append(extrc_doc)
return extracted_documents
'''
Generates chunks with recursive splitter from the list of unprocessed files, add files to the list of processed, and clears unprocessed
TODO: try to split text with other llm (not really needed, but we should at least try it)
'''
def generate_chunks(self, query: str = "", embedding: bool = False):
most_relevant = []
if embedding:
query_embedded = self.embedder.encode(query)
for document in self.unprocessed:
self.processed.append(document)
text: list[str] = self.text_splitter.split_documents([document])
lines: list[str] = document.page_content.split("\n")
for chunk in text:
start_l, end_l = self.get_start_end_lines(
splitted_text=lines,
start_char=chunk.metadata.get("start_index", 0),
end_char=chunk.metadata.get("start_index", 0) + len(chunk.page_content)
)
newChunk = Chunk(
id=uuid4(),
filename=document.metadata.get("source", ""),
page_number=document.metadata.get("page", 0),
start_index=chunk.metadata.get("start_index", 0),
start_line=start_l,
end_line=end_l,
text=chunk.page_content
)
if embedding:
chunk_embedded = self.embedder.encode(newChunk.text)
similarity = self.cosine_similarity(query_embedded, chunk_embedded)
self.update_most_relevant_chunk([similarity, newChunk], most_relevant)
self.chunks.append(newChunk)
self.chunks_unsaved.append(newChunk)
self.unprocessed = []
print(len(self.chunks_unsaved))
return most_relevant
'''
Determines the line, were the chunk starts and ends (1-based indexing)
Some magic stuff here. To be honest, i understood it after 7th attempt
TODO: invent more efficient way
splitted_text -> original text splitted by \n
start_char -> index of symbol, were current chunk starts
end_char -> index of symbol, were current chunk ends
debug_mode -> flag, which enables printing useful info about the process
'''
def get_start_end_lines(self, splitted_text: list[str], start_char: int, end_char: int, debug_mode: bool = False) -> \
tuple[int, int]:
if debug_mode:
logging.info(splitted_text)
start, end, char_ct = 0, 0, 0
iter_count = 1
for i, line in enumerate(splitted_text):
if debug_mode:
logging.info(
f"start={start_char}, current={char_ct}, end_current={char_ct + len(line) + 1}, end={end_char}, len={len(line)}, iter={iter_count}\n")
if char_ct <= start_char <= char_ct + len(line) + 1:
start = i + 1
if char_ct <= end_char <= char_ct + len(line) + 1:
end = i + 1
break
iter_count += 1
char_ct += len(line) + 1
if debug_mode:
logging.info(f"result => {start} {end}\n\n\n")
return start, end
'''
Note: it should be used only once to download tokenizers, futher usage is not recommended
'''
def update_nltk(self) -> None:
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')
'''
For now the system works as follows: we save recently loaded chunks in two arrays:
chunks - for all chunks, even for that ones that havn't been saveed to db
chunks_unsaved - for chunks that have been added recently
I do not know weather we really need to store all chunks that were added in the
current session, but chunks_unsaved are used to avoid dublications while saving to db.
'''
def clear_unsaved_chunks(self):
self.chunks_unsaved = []
def get_all_chunks(self) -> list[Chunk]:
return self.chunks
'''
If we want to save chunks to db, we need to clear the temp storage to avoid dublications
'''
def get_and_save_unsaved_chunks(self) -> list[Chunk]:
chunks_copy: list[Chunk] = self.chunks.copy()
self.clear_unsaved_chunks()
return chunks_copy