File size: 6,683 Bytes
08fac87
 
 
 
 
 
 
 
 
 
 
3b06efd
08fac87
 
 
 
 
 
3b06efd
08fac87
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3b06efd
 
08fac87
 
 
 
 
 
 
 
3b06efd
 
 
08fac87
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
100d2c7
08fac87
 
 
 
 
 
 
 
 
 
 
100d2c7
08fac87
 
 
 
100d2c7
08fac87
 
 
 
 
 
 
 
 
3b06efd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
08fac87
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3b06efd
 
 
08fac87
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
from langchain_text_splitters import TextSplitter, RecursiveCharacterTextSplitter
from langchain_community.document_loaders import TextLoader
from datetime import datetime
import tempfile
import os

# Local modules
from retriever import BuildRetriever, db_dir
from mods.bm25s_retriever import BM25SRetriever


def ProcessFile(file_path, search_type: str = "dense", compute_mode: str = "remote"):
    """
    Wrapper function to process file for dense or sparse search

    Args:
        file_path: File to process
        search_type: Type of search to use. Options: "dense", "sparse"
        compute_mode: Compute mode for embeddings (remote or local)
    """

    # Preprocess: remove quoted lines and handle email boundaries
    temp_fd, cleaned_temp_file = tempfile.mkstemp(suffix=".txt", prefix="preproc_")
    with open(file_path, "r", encoding="utf-8", errors="ignore") as infile, open(
        cleaned_temp_file, "w", encoding="utf-8"
    ) as outfile:
        for line in infile:
            # Remove lines that start with '>' or whitespace before '>'
            if line.lstrip().startswith(">"):
                continue
            outfile.write(line)
    try:
        os.close(temp_fd)
    except Exception:
        pass

    # Truncate email line number and length to avoid error in openai/_base_client.py:
    # BadRequestError: Error code: 400 - 'message': 'Requested 312872 tokens, max 300000 tokens per request', 'type': 'max_tokens_per_request'
    temp_fd2, truncated_temp_file = tempfile.mkstemp(suffix=".txt", prefix="truncated_")
    with open(cleaned_temp_file, "r", encoding="utf-8") as infile:
        content = infile.read()
    # Split into emails using '\n\n\nFrom' as the separator
    emails = content.split("\n\n\nFrom")
    processed_emails = []
    for i, email in enumerate(emails):
        lines = email.splitlines()
        # Truncate each line to 1000 characters and each email to 200 lines
        # NOTE: 1000 characters is reasonable for a long non-word-wrapped paragraph
        truncated_lines = [line[:1000] for line in lines[:200]]
        # Add [Email truncated] line to truncated emails
        if len(lines) > len(truncated_lines):
            truncated_lines.append("[Email truncated]")
        processed_emails.append("\n".join(truncated_lines))
    # Join emails back together with '\n\n\nFrom'
    result = "\n\n\nFrom".join(processed_emails)
    # Add two blank lines to the first email so all emails have the same formatting
    # (needed for removing prepended source file names in evals)
    result = "\n\n" + result
    with open(truncated_temp_file, "w", encoding="utf-8") as outfile:
        outfile.write(result)
    try:
        os.close(temp_fd2)
    except Exception:
        pass

    try:
        if search_type == "sparse":
            # Handle sparse search with BM25
            ProcessFileSparse(truncated_temp_file, file_path)
        elif search_type == "dense":
            # Handle dense search with ChromaDB
            ProcessFileDense(truncated_temp_file, file_path, compute_mode)
        else:
            raise ValueError(f"Unsupported search type: {search_type}")
    finally:
        # Clean up the temporary files
        try:
            os.remove(cleaned_temp_file)
            os.remove(truncated_temp_file)
        except Exception:
            pass


def ProcessFileDense(cleaned_temp_file, file_path, compute_mode):
    """
    Process file for dense vector search using ChromaDB
    """
    # Get a retriever instance
    retriever = BuildRetriever(compute_mode, "dense")
    # Load cleaned text file
    loader = TextLoader(cleaned_temp_file)
    documents = loader.load()
    # Use original file path for "source" key in metadata
    documents[0].metadata["source"] = file_path
    # Add file timestamp to metadata
    mod_time = os.path.getmtime(file_path)
    timestamp = datetime.fromtimestamp(mod_time).isoformat()
    documents[0].metadata["timestamp"] = timestamp
    ## Add documents to vectorstore
    # retriever.add_documents(documents)
    # Split the document into batches for addition to ChromaDB
    #   https://github.com/chroma-core/chroma/issues/1049
    #   https://cookbook.chromadb.dev/strategies/batching
    batch_size = 1000
    # Split emails
    emails = documents[0].page_content.split("\n\n\nFrom")
    documents_batch = documents
    for i in range(0, len(emails), batch_size):
        emails_batch = emails[i : i + batch_size]
        # Join emails back together
        page_content = "\n\n\nFrom".join(emails_batch)
        documents_batch[0].page_content = page_content
        # Add documents to vectorstore
        retriever.add_documents(documents_batch)


def ProcessFileSparse(cleaned_temp_file, file_path):
    """
    Process file for sparse search using BM25
    """
    # Load text file to document
    loader = TextLoader(cleaned_temp_file)
    documents = loader.load()

    # Split archive file into emails for BM25
    # Using two blank lines followed by "From", and no limits on chunk size
    splitter = RecursiveCharacterTextSplitter(
        separators=["\n\n\nFrom"], chunk_size=1, chunk_overlap=0
    )
    ## Using 'EmailFrom' as the separator (requires preprocesing)
    # splitter = RecursiveCharacterTextSplitter(separators=["EmailFrom"])
    emails = splitter.split_documents(documents)

    # Use original file path for "source" key in metadata
    for email in emails:
        email.metadata["source"] = file_path

    # Create or update BM25 index
    try:
        # Update BM25 index if it exists
        bm25_persist_directory = f"{db_dir}/bm25"
        retriever = BM25SRetriever.from_persisted_directory(bm25_persist_directory)
        # Get new emails - ones which have not been indexed
        new_emails = [email for email in emails if email not in retriever.docs]
        if len(new_emails) > 0:
            # Create new BM25 index with all emails
            # NOTE: Adding new documents to an existing index is not possible:
            # https://github.com/xhluca/bm25s/discussions/20
            all_emails = retriever.docs + new_emails
            BM25SRetriever.from_documents(
                documents=all_emails,
                persist_directory=bm25_persist_directory,
            )
            print(f"BM25S: added {len(new_emails)} new emails from {file_path}")
        else:
            print(f"BM25S: no change for {file_path}")
    except (FileNotFoundError, OSError):
        # Create new BM25 index
        BM25SRetriever.from_documents(
            documents=emails,
            persist_directory=bm25_persist_directory,
        )
        print(f"BM25S: started with {len(emails)} emails from {file_path}")