Spaces:
Sleeping
Sleeping
File size: 8,993 Bytes
e53c2d7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
from langchain_community.document_loaders import PyPDFLoader, UnstructuredWordDocumentLoader, TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from app.models import Embedder
from app.chunks import Chunk
import nltk # used for proper tokenizer workflow
from uuid import uuid4 # for generating unique id as hex (uuid4 is used as it generates ids form pseudo random numbers unlike uuid1 and others)
import numpy as np
from app.settings import logging, text_splitter_config, embedder_model
# TODO: replace PDFloader since it is completely unusable OR try to fix it
class DocumentProcessor:
'''
TODO: determine the most suitable chunk size
chunks -> the list of chunks from loaded files
chunks_unsaved -> the list of recently added chunks that have not been saved to db yet
processed -> the list of files that were already splitted into chunks
upprocessed -> !processed
text_splitter -> text splitting strategy
'''
def __init__(self):
self.chunks: list[Chunk] = []
self.chunks_unsaved: list[Chunk] = []
self.processed: list[Document] = []
self.unprocessed: list[Document] = []
self.embedder = Embedder(embedder_model)
self.text_splitter = RecursiveCharacterTextSplitter(**text_splitter_config)
'''
Measures cosine between two vectors
'''
def cosine_similarity(self, vec1, vec2):
return vec1 @ vec2 / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
'''
Updates a list of the most relevant chunks without interacting with db
'''
def update_most_relevant_chunk(self, chunk: list[np.float64, Chunk], relevant_chunks: list[list[np.float64, Chunk]],
mx_len=15):
relevant_chunks.append(chunk)
for i in range(len(relevant_chunks) - 1, 0, -1):
if relevant_chunks[i][0] > relevant_chunks[i - 1][0]:
relevant_chunks[i], relevant_chunks[i - 1] = relevant_chunks[i - 1], relevant_chunks[i]
else:
break
if len(relevant_chunks) > mx_len:
del relevant_chunks[-1]
'''
Loads one file - extracts text from file
TODO: Replace UnstructuredWordDocumentLoader with Docx2txtLoader
TODO: Play with .pdf and text from img extraction
TODO: Try chunking with llm
add_to_unprocessed -> used to add loaded file to the list of unprocessed(unchunked) files if true
'''
def load_document(self, filepath: str, add_to_unprocessed: bool = False) -> list[Document]:
loader = None
if filepath.endswith(".pdf"):
loader = PyPDFLoader(
file_path=filepath) # splits each presentation into slides and processes it as separate file
elif filepath.endswith(".docx") or filepath.endswith(".doc"):
# loader = Docx2txtLoader(file_path=filepath) ## try it later, since UnstructuredWordDocumentLoader is extremly slow
loader = UnstructuredWordDocumentLoader(file_path=filepath)
elif filepath.endswith(".txt"):
loader = TextLoader(file_path=filepath)
if loader is None:
raise RuntimeError("Unsupported type of file")
documents: list[
Document] = [] # We can not assign a single value to the document since .pdf are splitted into several files
try:
documents = loader.load()
except Exception:
raise RuntimeError("File is corrupted")
if add_to_unprocessed:
for doc in documents:
self.unprocessed.append(doc)
return documents
'''
Similar to load_document, but for multiple files
add_to_unprocessed -> used to add loaded files to the list of unprocessed(unchunked) files if true
'''
def load_documents(self, documents: list[str], add_to_unprocessed: bool = False) -> list[Document]:
extracted_documents: list[Document] = []
for doc in documents:
temp_storage: list[Document] = []
try:
temp_storage = self.load_document(filepath=doc,
add_to_unprocessed=False) # In some cases it should be True, but i can not imagine any :(
except Exception as e:
logging.error("Error at load_documents while loading %s", doc, exc_info=e)
continue
for extrc_doc in temp_storage:
extracted_documents.append(extrc_doc)
if add_to_unprocessed:
self.unprocessed.append(extrc_doc)
return extracted_documents
'''
Generates chunks with recursive splitter from the list of unprocessed files, add files to the list of processed, and clears unprocessed
TODO: try to split text with other llm (not really needed, but we should at least try it)
'''
def generate_chunks(self, query: str = "", embedding: bool = False):
most_relevant = []
if embedding:
query_embedded = self.embedder.encode(query)
for document in self.unprocessed:
self.processed.append(document)
text: list[str] = self.text_splitter.split_documents([document])
lines: list[str] = document.page_content.split("\n")
for chunk in text:
start_l, end_l = self.get_start_end_lines(
splitted_text=lines,
start_char=chunk.metadata.get("start_index", 0),
end_char=chunk.metadata.get("start_index", 0) + len(chunk.page_content)
)
newChunk = Chunk(
id=uuid4(),
filename=document.metadata.get("source", ""),
page_number=document.metadata.get("page", 0),
start_index=chunk.metadata.get("start_index", 0),
start_line=start_l,
end_line=end_l,
text=chunk.page_content
)
if embedding:
chunk_embedded = self.embedder.encode(newChunk.text)
similarity = self.cosine_similarity(query_embedded, chunk_embedded)
self.update_most_relevant_chunk([similarity, newChunk], most_relevant)
self.chunks.append(newChunk)
self.chunks_unsaved.append(newChunk)
self.unprocessed = []
print(len(self.chunks_unsaved))
return most_relevant
'''
Determines the line, were the chunk starts and ends (1-based indexing)
Some magic stuff here. To be honest, i understood it after 7th attempt
TODO: invent more efficient way
splitted_text -> original text splitted by \n
start_char -> index of symbol, were current chunk starts
end_char -> index of symbol, were current chunk ends
debug_mode -> flag, which enables printing useful info about the process
'''
def get_start_end_lines(self, splitted_text: list[str], start_char: int, end_char: int, debug_mode: bool = False) -> \
tuple[int, int]:
if debug_mode:
logging.info(splitted_text)
start, end, char_ct = 0, 0, 0
iter_count = 1
for i, line in enumerate(splitted_text):
if debug_mode:
logging.info(
f"start={start_char}, current={char_ct}, end_current={char_ct + len(line) + 1}, end={end_char}, len={len(line)}, iter={iter_count}\n")
if char_ct <= start_char <= char_ct + len(line) + 1:
start = i + 1
if char_ct <= end_char <= char_ct + len(line) + 1:
end = i + 1
break
iter_count += 1
char_ct += len(line) + 1
if debug_mode:
logging.info(f"result => {start} {end}\n\n\n")
return start, end
'''
Note: it should be used only once to download tokenizers, futher usage is not recommended
'''
def update_nltk(self) -> None:
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')
'''
For now the system works as follows: we save recently loaded chunks in two arrays:
chunks - for all chunks, even for that ones that havn't been saveed to db
chunks_unsaved - for chunks that have been added recently
I do not know weather we really need to store all chunks that were added in the
current session, but chunks_unsaved are used to avoid dublications while saving to db.
'''
def clear_unsaved_chunks(self):
self.chunks_unsaved = []
def get_all_chunks(self) -> list[Chunk]:
return self.chunks
'''
If we want to save chunks to db, we need to clear the temp storage to avoid dublications
'''
def get_and_save_unsaved_chunks(self) -> list[Chunk]:
chunks_copy: list[Chunk] = self.chunks.copy()
self.clear_unsaved_chunks()
return chunks_copy
|