Spaces:
Running
on
Zero
Running
on
Zero
File size: 6,683 Bytes
08fac87 3b06efd 08fac87 3b06efd 08fac87 3b06efd 08fac87 3b06efd 08fac87 100d2c7 08fac87 100d2c7 08fac87 100d2c7 08fac87 3b06efd 08fac87 3b06efd 08fac87 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
from langchain_text_splitters import TextSplitter, RecursiveCharacterTextSplitter
from langchain_community.document_loaders import TextLoader
from datetime import datetime
import tempfile
import os
# Local modules
from retriever import BuildRetriever, db_dir
from mods.bm25s_retriever import BM25SRetriever
def ProcessFile(file_path, search_type: str = "dense", compute_mode: str = "remote"):
"""
Wrapper function to process file for dense or sparse search
Args:
file_path: File to process
search_type: Type of search to use. Options: "dense", "sparse"
compute_mode: Compute mode for embeddings (remote or local)
"""
# Preprocess: remove quoted lines and handle email boundaries
temp_fd, cleaned_temp_file = tempfile.mkstemp(suffix=".txt", prefix="preproc_")
with open(file_path, "r", encoding="utf-8", errors="ignore") as infile, open(
cleaned_temp_file, "w", encoding="utf-8"
) as outfile:
for line in infile:
# Remove lines that start with '>' or whitespace before '>'
if line.lstrip().startswith(">"):
continue
outfile.write(line)
try:
os.close(temp_fd)
except Exception:
pass
# Truncate email line number and length to avoid error in openai/_base_client.py:
# BadRequestError: Error code: 400 - 'message': 'Requested 312872 tokens, max 300000 tokens per request', 'type': 'max_tokens_per_request'
temp_fd2, truncated_temp_file = tempfile.mkstemp(suffix=".txt", prefix="truncated_")
with open(cleaned_temp_file, "r", encoding="utf-8") as infile:
content = infile.read()
# Split into emails using '\n\n\nFrom' as the separator
emails = content.split("\n\n\nFrom")
processed_emails = []
for i, email in enumerate(emails):
lines = email.splitlines()
# Truncate each line to 1000 characters and each email to 200 lines
# NOTE: 1000 characters is reasonable for a long non-word-wrapped paragraph
truncated_lines = [line[:1000] for line in lines[:200]]
# Add [Email truncated] line to truncated emails
if len(lines) > len(truncated_lines):
truncated_lines.append("[Email truncated]")
processed_emails.append("\n".join(truncated_lines))
# Join emails back together with '\n\n\nFrom'
result = "\n\n\nFrom".join(processed_emails)
# Add two blank lines to the first email so all emails have the same formatting
# (needed for removing prepended source file names in evals)
result = "\n\n" + result
with open(truncated_temp_file, "w", encoding="utf-8") as outfile:
outfile.write(result)
try:
os.close(temp_fd2)
except Exception:
pass
try:
if search_type == "sparse":
# Handle sparse search with BM25
ProcessFileSparse(truncated_temp_file, file_path)
elif search_type == "dense":
# Handle dense search with ChromaDB
ProcessFileDense(truncated_temp_file, file_path, compute_mode)
else:
raise ValueError(f"Unsupported search type: {search_type}")
finally:
# Clean up the temporary files
try:
os.remove(cleaned_temp_file)
os.remove(truncated_temp_file)
except Exception:
pass
def ProcessFileDense(cleaned_temp_file, file_path, compute_mode):
"""
Process file for dense vector search using ChromaDB
"""
# Get a retriever instance
retriever = BuildRetriever(compute_mode, "dense")
# Load cleaned text file
loader = TextLoader(cleaned_temp_file)
documents = loader.load()
# Use original file path for "source" key in metadata
documents[0].metadata["source"] = file_path
# Add file timestamp to metadata
mod_time = os.path.getmtime(file_path)
timestamp = datetime.fromtimestamp(mod_time).isoformat()
documents[0].metadata["timestamp"] = timestamp
## Add documents to vectorstore
# retriever.add_documents(documents)
# Split the document into batches for addition to ChromaDB
# https://github.com/chroma-core/chroma/issues/1049
# https://cookbook.chromadb.dev/strategies/batching
batch_size = 1000
# Split emails
emails = documents[0].page_content.split("\n\n\nFrom")
documents_batch = documents
for i in range(0, len(emails), batch_size):
emails_batch = emails[i : i + batch_size]
# Join emails back together
page_content = "\n\n\nFrom".join(emails_batch)
documents_batch[0].page_content = page_content
# Add documents to vectorstore
retriever.add_documents(documents_batch)
def ProcessFileSparse(cleaned_temp_file, file_path):
"""
Process file for sparse search using BM25
"""
# Load text file to document
loader = TextLoader(cleaned_temp_file)
documents = loader.load()
# Split archive file into emails for BM25
# Using two blank lines followed by "From", and no limits on chunk size
splitter = RecursiveCharacterTextSplitter(
separators=["\n\n\nFrom"], chunk_size=1, chunk_overlap=0
)
## Using 'EmailFrom' as the separator (requires preprocesing)
# splitter = RecursiveCharacterTextSplitter(separators=["EmailFrom"])
emails = splitter.split_documents(documents)
# Use original file path for "source" key in metadata
for email in emails:
email.metadata["source"] = file_path
# Create or update BM25 index
try:
# Update BM25 index if it exists
bm25_persist_directory = f"{db_dir}/bm25"
retriever = BM25SRetriever.from_persisted_directory(bm25_persist_directory)
# Get new emails - ones which have not been indexed
new_emails = [email for email in emails if email not in retriever.docs]
if len(new_emails) > 0:
# Create new BM25 index with all emails
# NOTE: Adding new documents to an existing index is not possible:
# https://github.com/xhluca/bm25s/discussions/20
all_emails = retriever.docs + new_emails
BM25SRetriever.from_documents(
documents=all_emails,
persist_directory=bm25_persist_directory,
)
print(f"BM25S: added {len(new_emails)} new emails from {file_path}")
else:
print(f"BM25S: no change for {file_path}")
except (FileNotFoundError, OSError):
# Create new BM25 index
BM25SRetriever.from_documents(
documents=emails,
persist_directory=bm25_persist_directory,
)
print(f"BM25S: started with {len(emails)} emails from {file_path}")
|