the-ultimate-rag / app /core /processor.py
PopovDanil's picture
try 1
48ec4db
from langchain_community.document_loaders import UnstructuredWordDocumentLoader, TextLoader, CSVLoader, UnstructuredMarkdownLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from concurrent.futures import ProcessPoolExecutor
from langchain_core.documents import Document
from app.settings import logger, settings
from app.core.chunks import Chunk
from datetime import datetime
from uuid import uuid4
import asyncio
import nltk
import fitz
import os
class PDFLoader:
def __init__(self, file_path: str):
self.file_path = file_path
def load(self) -> list[Document]:
docs = []
with fitz.open(self.file_path) as doc:
for page in doc:
text = page.get_text("text")
metadata = {
"source": self.file_path,
"page": page.number,
}
docs.append(Document(page_content=text, metadata=metadata))
return docs
def find_line_sync(splitted_text: list[dict], char) -> int:
left, right = 0, len(splitted_text) - 1
while left <= right:
mid = (left + right) // 2
line = splitted_text[mid]
if line["start"] <= char < line["end"]:
return mid + 1
elif char < line["start"]:
right = mid - 1
else:
left = mid + 1
return right
def get_start_end_lines_sync(splitted_text: list[dict], start_char: int, end_char: int ) -> tuple[int, int]:
start = find_line_sync(splitted_text=splitted_text, char=start_char)
end = find_line_sync(splitted_text=splitted_text, char=end_char)
return (start, end)
def _chunkinize_sync(document: Document, text: list[str], lines: list[dict]) -> list[Chunk]:
output: list[Chunk] = []
for chunk in text:
start_l, end_l = get_start_end_lines_sync(
splitted_text=lines,
start_char=chunk.metadata.get("start_index", 0),
end_char=chunk.metadata.get("start_index", 0)
+ len(chunk.page_content),
)
new_chunk = Chunk(
id=uuid4(),
filename=document.metadata.get("source", ""),
page_number=document.metadata.get("page", 0),
start_index=chunk.metadata.get("start_index", 0),
start_line=start_l,
end_line=end_l,
text=chunk.page_content,
)
# print(new_chunk)
output.append(new_chunk)
return output
class DocumentProcessor:
def __init__(self):
self.chunks_unsaved: list[Chunk] = []
self.unprocessed: asyncio.Queue[Document] = asyncio.Queue()
self.max_workers = min(16, os.cpu_count() or 1)
self.text_splitter = RecursiveCharacterTextSplitter(
**settings.text_splitter.model_dump()
)
self.chunk_executor = ProcessPoolExecutor(max_workers=self.max_workers)
async def check_size(self, file_path: str = "") -> bool:
try:
size = os.path.getsize(filename=file_path)
except Exception:
size = 0
if size > 1000000:
return True
return False
async def document_multiplexer(self, filepath: str, get_loader: bool = False, get_chunking_strategy: bool = False):
loader = None
parallelization = False
if filepath.endswith(".pdf"):
loader = PDFLoader(
file_path=filepath
) # splits each presentation into slides and processes it as separate file
parallelization = False
elif filepath.endswith(".docx") or filepath.endswith(".doc"):
loader = UnstructuredWordDocumentLoader(file_path=filepath)
elif filepath.endswith(".txt"):
loader = TextLoader(file_path=filepath)
elif filepath.endswith(".csv"):
loader = CSVLoader(file_path=filepath)
elif filepath.endswith(".json"):
loader = TextLoader(file_path=filepath)
elif filepath.endswith(".md"):
loader = UnstructuredMarkdownLoader(file_path=filepath)
if filepath.endswith(".pdf"):
parallelization = False
else:
parallelization = await self.check_size(file_path=filepath)
if get_loader:
return loader
elif get_chunking_strategy:
return parallelization
else:
raise RuntimeError("What to do, my lord?")
async def load_document(self, filepath: str, add_to_unprocessed: bool = False) -> None:
if settings.debug:
await logger.info(f"Document {os.path.basename(filepath)} is loaded, time - {datetime.now()}")
loader = await self.document_multiplexer(filepath=filepath, get_loader=True)
loop = asyncio.get_event_loop()
if loader is None:
raise RuntimeError("Unsupported type of file")
documents: list[Document] = []
try:
documents = await loop.run_in_executor(None, loader.load)
except Exception as e:
raise RuntimeError(f"File is corrupted - {e}")
if add_to_unprocessed:
for doc in documents:
await self.unprocessed.put({"document": doc, "path": filepath})
async def load_documents(self, documents: list[str]) -> None:
for doc in documents:
try:
await self.load_document(filepath=doc, add_to_unprocessed=True)
except Exception as e:
await logger.error(f"Error at load_documents while loading {e}")
async def split_into_groups(self, original_list: list[any], split_by: int = 15) -> list[list[any]]:
output = []
for i in range(0, len(original_list), split_by):
new_group = original_list[i: i + split_by]
output.append(new_group)
return output
async def _chunkinize(self, document: Document, text: list[str], lines: list[dict]) -> list[Chunk]:
output: list[Chunk] = []
for chunk in text:
start_l, end_l = await self.get_start_end_lines(
splitted_text=lines,
start_char=chunk.metadata.get("start_index", 0),
end_char=chunk.metadata.get("start_index", 0)
+ len(chunk.page_content),
)
new_chunk = Chunk(
id=uuid4(),
filename=document.metadata.get("source", ""),
page_number=document.metadata.get("page", 0),
start_index=chunk.metadata.get("start_index", 0),
start_line=start_l,
end_line=end_l,
text=chunk.page_content,
)
output.append(new_chunk)
return output
async def precompute_lines(self, splitted_document: list[str]) -> list[dict]:
loop = asyncio.get_running_loop()
def compute_lines():
current_start = 0
output: list[dict] = []
for i, line in enumerate(splitted_document):
output.append({"id": i + 1, "start": current_start, "end": current_start + len(line) + 1, "text": line})
current_start += len(line) + 1
return output
return await loop.run_in_executor(None, compute_lines)
async def generate_chunks(self):
intermediate: list[Chunk] = []
loop = asyncio.get_event_loop()
while not self.unprocessed.empty():
entity = await self.unprocessed.get()
try:
document, filepath = entity["document"], entity["path"]
parallelization = await self.document_multiplexer(filepath=filepath, get_chunking_strategy=True)
if settings.debug:
await logger.info(f"Strategy --> {"P" if parallelization else "S"}")
text = await loop.run_in_executor(None, self.text_splitter.split_documents, [document])
lines: list[dict] = await self.precompute_lines(splitted_document=document.page_content.splitlines())
if parallelization:
if settings.debug:
await logger.info("<------- Apply Parallel Execution ------->")
await logger.info(f"Document - {os.path.basename(filepath)}")
groups = await self.split_into_groups(original_list=text, split_by=50)
tasks = [
loop.run_in_executor(
self.chunk_executor,
_chunkinize_sync,
document,
group,
lines
)
for group in groups
]
results = await asyncio.gather(*tasks)
for chunks in results:
intermediate.extend(chunks)
if settings.debug:
await logger.info("<---------------- Done ----------------->")
else:
chunks = await loop.run_in_executor(None, _chunkinize_sync, document, text, lines)
intermediate.extend(chunks)
finally:
self.unprocessed.task_done()
self.chunks_unsaved.extend(intermediate)
async def find_line(self, splitted_text: list[dict], char) -> int:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, find_line_sync, splitted_text, char)
async def get_start_end_lines(self, splitted_text: list[dict], start_char: int, end_char: int,) -> tuple[int, int]:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, get_start_end_lines_sync, splitted_text, start_char, end_char)
async def update_nltk(self) -> None:
nltk.download("punkt")
nltk.download("averaged_perceptron_tagger")
async def get_and_save_unsaved_chunks(self) -> list[Chunk]:
chunks_copy: list[Chunk] = self.chunks_unsaved.copy()
await self.clear_unsaved_chunks()
return chunks_copy
async def clear_unsaved_chunks(self):
self.chunks_unsaved = []
async def get_all_chunks(self) -> list[Chunk]:
return self.chunks_unsaved
# async def main():
# print(f"Start time - {datetime.now()}")
# proc = DocumentProcessor()
# base = "/home/danil/Downloads/Tests/test"
# docs = []
# for i in range(8):
# docs.append(base + str(i) + ".txt")
# await proc.load_documents(docs)
# await proc.generate_chunks()
# chunks = await proc.get_and_save_unsaved_chunks()
# print(len(chunks))
# print(f"End time - {datetime.now()}")
# asyncio.run(main())