R-help-chat / index.py
jedick
Download data from S3 bucket
3b06efd
from langchain_text_splitters import TextSplitter, RecursiveCharacterTextSplitter
from langchain_community.document_loaders import TextLoader
from datetime import datetime
import tempfile
import os
# Local modules
from retriever import BuildRetriever, db_dir
from mods.bm25s_retriever import BM25SRetriever
def ProcessFile(file_path, search_type: str = "dense", compute_mode: str = "remote"):
"""
Wrapper function to process file for dense or sparse search
Args:
file_path: File to process
search_type: Type of search to use. Options: "dense", "sparse"
compute_mode: Compute mode for embeddings (remote or local)
"""
# Preprocess: remove quoted lines and handle email boundaries
temp_fd, cleaned_temp_file = tempfile.mkstemp(suffix=".txt", prefix="preproc_")
with open(file_path, "r", encoding="utf-8", errors="ignore") as infile, open(
cleaned_temp_file, "w", encoding="utf-8"
) as outfile:
for line in infile:
# Remove lines that start with '>' or whitespace before '>'
if line.lstrip().startswith(">"):
continue
outfile.write(line)
try:
os.close(temp_fd)
except Exception:
pass
# Truncate email line number and length to avoid error in openai/_base_client.py:
# BadRequestError: Error code: 400 - 'message': 'Requested 312872 tokens, max 300000 tokens per request', 'type': 'max_tokens_per_request'
temp_fd2, truncated_temp_file = tempfile.mkstemp(suffix=".txt", prefix="truncated_")
with open(cleaned_temp_file, "r", encoding="utf-8") as infile:
content = infile.read()
# Split into emails using '\n\n\nFrom' as the separator
emails = content.split("\n\n\nFrom")
processed_emails = []
for i, email in enumerate(emails):
lines = email.splitlines()
# Truncate each line to 1000 characters and each email to 200 lines
# NOTE: 1000 characters is reasonable for a long non-word-wrapped paragraph
truncated_lines = [line[:1000] for line in lines[:200]]
# Add [Email truncated] line to truncated emails
if len(lines) > len(truncated_lines):
truncated_lines.append("[Email truncated]")
processed_emails.append("\n".join(truncated_lines))
# Join emails back together with '\n\n\nFrom'
result = "\n\n\nFrom".join(processed_emails)
# Add two blank lines to the first email so all emails have the same formatting
# (needed for removing prepended source file names in evals)
result = "\n\n" + result
with open(truncated_temp_file, "w", encoding="utf-8") as outfile:
outfile.write(result)
try:
os.close(temp_fd2)
except Exception:
pass
try:
if search_type == "sparse":
# Handle sparse search with BM25
ProcessFileSparse(truncated_temp_file, file_path)
elif search_type == "dense":
# Handle dense search with ChromaDB
ProcessFileDense(truncated_temp_file, file_path, compute_mode)
else:
raise ValueError(f"Unsupported search type: {search_type}")
finally:
# Clean up the temporary files
try:
os.remove(cleaned_temp_file)
os.remove(truncated_temp_file)
except Exception:
pass
def ProcessFileDense(cleaned_temp_file, file_path, compute_mode):
"""
Process file for dense vector search using ChromaDB
"""
# Get a retriever instance
retriever = BuildRetriever(compute_mode, "dense")
# Load cleaned text file
loader = TextLoader(cleaned_temp_file)
documents = loader.load()
# Use original file path for "source" key in metadata
documents[0].metadata["source"] = file_path
# Add file timestamp to metadata
mod_time = os.path.getmtime(file_path)
timestamp = datetime.fromtimestamp(mod_time).isoformat()
documents[0].metadata["timestamp"] = timestamp
## Add documents to vectorstore
# retriever.add_documents(documents)
# Split the document into batches for addition to ChromaDB
# https://github.com/chroma-core/chroma/issues/1049
# https://cookbook.chromadb.dev/strategies/batching
batch_size = 1000
# Split emails
emails = documents[0].page_content.split("\n\n\nFrom")
documents_batch = documents
for i in range(0, len(emails), batch_size):
emails_batch = emails[i : i + batch_size]
# Join emails back together
page_content = "\n\n\nFrom".join(emails_batch)
documents_batch[0].page_content = page_content
# Add documents to vectorstore
retriever.add_documents(documents_batch)
def ProcessFileSparse(cleaned_temp_file, file_path):
"""
Process file for sparse search using BM25
"""
# Load text file to document
loader = TextLoader(cleaned_temp_file)
documents = loader.load()
# Split archive file into emails for BM25
# Using two blank lines followed by "From", and no limits on chunk size
splitter = RecursiveCharacterTextSplitter(
separators=["\n\n\nFrom"], chunk_size=1, chunk_overlap=0
)
## Using 'EmailFrom' as the separator (requires preprocesing)
# splitter = RecursiveCharacterTextSplitter(separators=["EmailFrom"])
emails = splitter.split_documents(documents)
# Use original file path for "source" key in metadata
for email in emails:
email.metadata["source"] = file_path
# Create or update BM25 index
try:
# Update BM25 index if it exists
bm25_persist_directory = f"{db_dir}/bm25"
retriever = BM25SRetriever.from_persisted_directory(bm25_persist_directory)
# Get new emails - ones which have not been indexed
new_emails = [email for email in emails if email not in retriever.docs]
if len(new_emails) > 0:
# Create new BM25 index with all emails
# NOTE: Adding new documents to an existing index is not possible:
# https://github.com/xhluca/bm25s/discussions/20
all_emails = retriever.docs + new_emails
BM25SRetriever.from_documents(
documents=all_emails,
persist_directory=bm25_persist_directory,
)
print(f"BM25S: added {len(new_emails)} new emails from {file_path}")
else:
print(f"BM25S: no change for {file_path}")
except (FileNotFoundError, OSError):
# Create new BM25 index
BM25SRetriever.from_documents(
documents=emails,
persist_directory=bm25_persist_directory,
)
print(f"BM25S: started with {len(emails)} emails from {file_path}")