Spaces:
Running
Running
from langchain_community.document_loaders import UnstructuredWordDocumentLoader, TextLoader, CSVLoader, UnstructuredMarkdownLoader | |
from langchain_text_splitters import RecursiveCharacterTextSplitter | |
from concurrent.futures import ProcessPoolExecutor | |
from langchain_core.documents import Document | |
from app.settings import logger, settings | |
from app.core.chunks import Chunk | |
from datetime import datetime | |
from uuid import uuid4 | |
import asyncio | |
import nltk | |
import fitz | |
import os | |
class PDFLoader: | |
def __init__(self, file_path: str): | |
self.file_path = file_path | |
def load(self) -> list[Document]: | |
docs = [] | |
with fitz.open(self.file_path) as doc: | |
for page in doc: | |
text = page.get_text("text") | |
metadata = { | |
"source": self.file_path, | |
"page": page.number, | |
} | |
docs.append(Document(page_content=text, metadata=metadata)) | |
return docs | |
def find_line_sync(splitted_text: list[dict], char) -> int: | |
left, right = 0, len(splitted_text) - 1 | |
while left <= right: | |
mid = (left + right) // 2 | |
line = splitted_text[mid] | |
if line["start"] <= char < line["end"]: | |
return mid + 1 | |
elif char < line["start"]: | |
right = mid - 1 | |
else: | |
left = mid + 1 | |
return right | |
def get_start_end_lines_sync(splitted_text: list[dict], start_char: int, end_char: int ) -> tuple[int, int]: | |
start = find_line_sync(splitted_text=splitted_text, char=start_char) | |
end = find_line_sync(splitted_text=splitted_text, char=end_char) | |
return (start, end) | |
def _chunkinize_sync(document: Document, text: list[str], lines: list[dict]) -> list[Chunk]: | |
output: list[Chunk] = [] | |
for chunk in text: | |
start_l, end_l = get_start_end_lines_sync( | |
splitted_text=lines, | |
start_char=chunk.metadata.get("start_index", 0), | |
end_char=chunk.metadata.get("start_index", 0) | |
+ len(chunk.page_content), | |
) | |
new_chunk = Chunk( | |
id=uuid4(), | |
filename=document.metadata.get("source", ""), | |
page_number=document.metadata.get("page", 0), | |
start_index=chunk.metadata.get("start_index", 0), | |
start_line=start_l, | |
end_line=end_l, | |
text=chunk.page_content, | |
) | |
# print(new_chunk) | |
output.append(new_chunk) | |
return output | |
class DocumentProcessor: | |
def __init__(self): | |
self.chunks_unsaved: list[Chunk] = [] | |
self.unprocessed: asyncio.Queue[Document] = asyncio.Queue() | |
self.max_workers = min(16, os.cpu_count() or 1) | |
self.text_splitter = RecursiveCharacterTextSplitter( | |
**settings.text_splitter.model_dump() | |
) | |
self.chunk_executor = ProcessPoolExecutor(max_workers=self.max_workers) | |
async def check_size(self, file_path: str = "") -> bool: | |
try: | |
size = os.path.getsize(filename=file_path) | |
except Exception: | |
size = 0 | |
if size > 1000000: | |
return True | |
return False | |
async def document_multiplexer(self, filepath: str, get_loader: bool = False, get_chunking_strategy: bool = False): | |
loader = None | |
parallelization = False | |
if filepath.endswith(".pdf"): | |
loader = PDFLoader( | |
file_path=filepath | |
) # splits each presentation into slides and processes it as separate file | |
parallelization = False | |
elif filepath.endswith(".docx") or filepath.endswith(".doc"): | |
loader = UnstructuredWordDocumentLoader(file_path=filepath) | |
elif filepath.endswith(".txt"): | |
loader = TextLoader(file_path=filepath) | |
elif filepath.endswith(".csv"): | |
loader = CSVLoader(file_path=filepath) | |
elif filepath.endswith(".json"): | |
loader = TextLoader(file_path=filepath) | |
elif filepath.endswith(".md"): | |
loader = UnstructuredMarkdownLoader(file_path=filepath) | |
if filepath.endswith(".pdf"): | |
parallelization = False | |
else: | |
parallelization = await self.check_size(file_path=filepath) | |
if get_loader: | |
return loader | |
elif get_chunking_strategy: | |
return parallelization | |
else: | |
raise RuntimeError("What to do, my lord?") | |
async def load_document(self, filepath: str, add_to_unprocessed: bool = False) -> None: | |
if settings.debug: | |
await logger.info(f"Document {os.path.basename(filepath)} is loaded, time - {datetime.now()}") | |
loader = await self.document_multiplexer(filepath=filepath, get_loader=True) | |
loop = asyncio.get_event_loop() | |
if loader is None: | |
raise RuntimeError("Unsupported type of file") | |
documents: list[Document] = [] | |
try: | |
documents = await loop.run_in_executor(None, loader.load) | |
except Exception as e: | |
raise RuntimeError(f"File is corrupted - {e}") | |
if add_to_unprocessed: | |
for doc in documents: | |
await self.unprocessed.put({"document": doc, "path": filepath}) | |
async def load_documents(self, documents: list[str]) -> None: | |
for doc in documents: | |
try: | |
await self.load_document(filepath=doc, add_to_unprocessed=True) | |
except Exception as e: | |
await logger.error(f"Error at load_documents while loading {e}") | |
async def split_into_groups(self, original_list: list[any], split_by: int = 15) -> list[list[any]]: | |
output = [] | |
for i in range(0, len(original_list), split_by): | |
new_group = original_list[i: i + split_by] | |
output.append(new_group) | |
return output | |
async def _chunkinize(self, document: Document, text: list[str], lines: list[dict]) -> list[Chunk]: | |
output: list[Chunk] = [] | |
for chunk in text: | |
start_l, end_l = await self.get_start_end_lines( | |
splitted_text=lines, | |
start_char=chunk.metadata.get("start_index", 0), | |
end_char=chunk.metadata.get("start_index", 0) | |
+ len(chunk.page_content), | |
) | |
new_chunk = Chunk( | |
id=uuid4(), | |
filename=document.metadata.get("source", ""), | |
page_number=document.metadata.get("page", 0), | |
start_index=chunk.metadata.get("start_index", 0), | |
start_line=start_l, | |
end_line=end_l, | |
text=chunk.page_content, | |
) | |
output.append(new_chunk) | |
return output | |
async def precompute_lines(self, splitted_document: list[str]) -> list[dict]: | |
loop = asyncio.get_running_loop() | |
def compute_lines(): | |
current_start = 0 | |
output: list[dict] = [] | |
for i, line in enumerate(splitted_document): | |
output.append({"id": i + 1, "start": current_start, "end": current_start + len(line) + 1, "text": line}) | |
current_start += len(line) + 1 | |
return output | |
return await loop.run_in_executor(None, compute_lines) | |
async def generate_chunks(self): | |
intermediate: list[Chunk] = [] | |
loop = asyncio.get_event_loop() | |
while not self.unprocessed.empty(): | |
entity = await self.unprocessed.get() | |
try: | |
document, filepath = entity["document"], entity["path"] | |
parallelization = await self.document_multiplexer(filepath=filepath, get_chunking_strategy=True) | |
if settings.debug: | |
await logger.info(f"Strategy --> {"P" if parallelization else "S"}") | |
text = await loop.run_in_executor(None, self.text_splitter.split_documents, [document]) | |
lines: list[dict] = await self.precompute_lines(splitted_document=document.page_content.splitlines()) | |
if parallelization: | |
if settings.debug: | |
await logger.info("<------- Apply Parallel Execution ------->") | |
await logger.info(f"Document - {os.path.basename(filepath)}") | |
groups = await self.split_into_groups(original_list=text, split_by=50) | |
tasks = [ | |
loop.run_in_executor( | |
self.chunk_executor, | |
_chunkinize_sync, | |
document, | |
group, | |
lines | |
) | |
for group in groups | |
] | |
results = await asyncio.gather(*tasks) | |
for chunks in results: | |
intermediate.extend(chunks) | |
if settings.debug: | |
await logger.info("<---------------- Done ----------------->") | |
else: | |
chunks = await loop.run_in_executor(None, _chunkinize_sync, document, text, lines) | |
intermediate.extend(chunks) | |
finally: | |
self.unprocessed.task_done() | |
self.chunks_unsaved.extend(intermediate) | |
async def find_line(self, splitted_text: list[dict], char) -> int: | |
loop = asyncio.get_running_loop() | |
return await loop.run_in_executor(None, find_line_sync, splitted_text, char) | |
async def get_start_end_lines(self, splitted_text: list[dict], start_char: int, end_char: int,) -> tuple[int, int]: | |
loop = asyncio.get_running_loop() | |
return await loop.run_in_executor(None, get_start_end_lines_sync, splitted_text, start_char, end_char) | |
async def update_nltk(self) -> None: | |
nltk.download("punkt") | |
nltk.download("averaged_perceptron_tagger") | |
async def get_and_save_unsaved_chunks(self) -> list[Chunk]: | |
chunks_copy: list[Chunk] = self.chunks_unsaved.copy() | |
await self.clear_unsaved_chunks() | |
return chunks_copy | |
async def clear_unsaved_chunks(self): | |
self.chunks_unsaved = [] | |
async def get_all_chunks(self) -> list[Chunk]: | |
return self.chunks_unsaved | |
# async def main(): | |
# print(f"Start time - {datetime.now()}") | |
# proc = DocumentProcessor() | |
# base = "/home/danil/Downloads/Tests/test" | |
# docs = [] | |
# for i in range(8): | |
# docs.append(base + str(i) + ".txt") | |
# await proc.load_documents(docs) | |
# await proc.generate_chunks() | |
# chunks = await proc.get_and_save_unsaved_chunks() | |
# print(len(chunks)) | |
# print(f"End time - {datetime.now()}") | |
# asyncio.run(main()) |