File size: 10,043 Bytes
a33458e 48a1a2b 28ff371 a33458e 2a735cc a33458e 28ff371 a33458e 48a1a2b a33458e 48a1a2b a33458e 48a1a2b 28ff371 a33458e 28ff371 a33458e 48a1a2b 28ff371 48a1a2b 28ff371 48a1a2b 28ff371 48a1a2b 28ff371 48a1a2b a33458e 48a1a2b 28ff371 48a1a2b 28ff371 48a1a2b 28ff371 48a1a2b 28ff371 48a1a2b 28ff371 48a1a2b 28ff371 a33458e 48a1a2b 28ff371 48a1a2b 28ff371 48a1a2b 28ff371 48a1a2b 28ff371 48a1a2b 28ff371 48a1a2b 28ff371 48a1a2b 28ff371 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
import os
import sys
import logging
import time
import random
import traceback
from typing import List, Dict, Any
from langchain_community.document_loaders import (
PyPDFLoader,
TextLoader,
CSVLoader,
UnstructuredFileLoader,
Docx2txtLoader
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Add project root to path for imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from app.config import CHUNK_SIZE, CHUNK_OVERLAP
from app.core.memory import MemoryManager
class DocumentProcessor:
"""Processes documents for ingestion into the vector database."""
def __init__(self, memory_manager: MemoryManager):
self.memory_manager = memory_manager
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=CHUNK_SIZE,
chunk_overlap=CHUNK_OVERLAP
)
logger.info(f"DocumentProcessor initialized with chunk size {CHUNK_SIZE}, overlap {CHUNK_OVERLAP}")
def process_file(self, file_path: str) -> List[str]:
"""Process a file and return a list of document chunks."""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
# Get the file extension
_, extension = os.path.splitext(file_path)
extension = extension.lower()
logger.info(f"Processing file: {file_path} with extension {extension}")
# Verify file is readable
try:
with open(file_path, 'rb') as f:
# Just check if we can read from it
f.read(1)
except Exception as e:
logger.error(f"Cannot read file {file_path}: {e}")
raise IOError(f"File {file_path} exists but cannot be read: {str(e)}")
# Load the file using the appropriate loader
try:
if extension == '.pdf':
loader = PyPDFLoader(file_path)
elif extension == '.txt':
loader = TextLoader(file_path)
elif extension == '.csv':
loader = CSVLoader(file_path)
elif extension in ['.doc', '.docx']:
loader = Docx2txtLoader(file_path)
elif extension in ['.md', '.html', '.htm', '.xml', '.json']:
# Dedicated loaders could be added for these formats
loader = TextLoader(file_path)
else:
# Try generic loader as fallback for unsupported types
logger.warning(f"No specific loader for {extension}, trying UnstructuredFileLoader")
loader = UnstructuredFileLoader(file_path)
# Load and split the documents
documents = loader.load()
if not documents:
logger.warning(f"No content extracted from {file_path}")
# Create a minimal document if empty to avoid errors
from langchain.schema import Document
documents = [Document(page_content=f"Empty file: {os.path.basename(file_path)}",
metadata={"source": file_path})]
chunks = self.text_splitter.split_documents(documents)
logger.info(f"Split file into {len(chunks)} chunks")
return chunks
except Exception as e:
logger.error(f"Error in document processing: {str(e)}")
logger.error(traceback.format_exc())
# Create a minimal document to represent the error
from langchain.schema import Document
error_doc = Document(
page_content=f"Error processing file {os.path.basename(file_path)}: {str(e)}",
metadata={"source": file_path, "error": str(e)}
)
return [error_doc]
def _retry_operation(self, operation, max_retries=3):
"""Retry an operation with exponential backoff."""
last_exception = None
for attempt in range(max_retries):
try:
return operation()
except Exception as e:
last_exception = e
if "already accessed by another instance" in str(e) and attempt < max_retries - 1:
wait_time = random.uniform(0.5, 2.0) * (attempt + 1)
logger.warning(f"Vector store access conflict, retrying ({attempt+1}/{max_retries}) in {wait_time:.2f}s...")
time.sleep(wait_time)
elif attempt < max_retries - 1:
# For other errors, also retry but with different message
wait_time = random.uniform(0.5, 2.0) * (attempt + 1)
logger.warning(f"Operation failed ({str(e)}), retrying ({attempt+1}/{max_retries}) in {wait_time:.2f}s...")
time.sleep(wait_time)
else:
# Different error or last attempt, re-raise
raise
# If we get here with a last_exception, re-raise it
if last_exception:
raise last_exception
else:
raise RuntimeError("Retry operation failed but no exception was captured")
def ingest_file(self, file_path: str, metadata: Dict[str, Any] = None) -> List[str]:
"""Ingest a file into the vector database."""
try:
# Process the file
chunks = self.process_file(file_path)
# Add metadata to each chunk
if metadata is None:
metadata = {}
# Add file path to metadata
base_metadata = {
"source": file_path,
"file_name": os.path.basename(file_path),
"ingestion_time": time.strftime("%Y-%m-%d %H:%M:%S")
}
base_metadata.update(metadata)
# Prepare chunks and metadatas
texts = [chunk.page_content for chunk in chunks]
metadatas = []
for i, chunk in enumerate(chunks):
chunk_metadata = base_metadata.copy()
if hasattr(chunk, 'metadata'):
chunk_metadata.update(chunk.metadata)
chunk_metadata["chunk_id"] = i
chunk_metadata["total_chunks"] = len(chunks)
metadatas.append(chunk_metadata)
# Store in vector database with retry mechanism
logger.info(f"Adding {len(texts)} chunks to vector database")
# Handle empty texts to avoid errors
if not texts:
logger.warning("No text chunks extracted from file, adding placeholder")
texts = [f"Empty file: {os.path.basename(file_path)}"]
metadatas = [{"source": file_path, "file_name": os.path.basename(file_path), "empty_file": True}]
def add_to_vectordb():
return self.memory_manager.add_texts(texts, metadatas)
try:
ids = self._retry_operation(add_to_vectordb)
logger.info(f"Successfully added chunks with IDs: {ids[:3] if len(ids) > 3 else ids}...")
except Exception as e:
logger.error(f"All attempts to add to vector DB failed: {e}")
# Return placeholder IDs
ids = [f"error-{random.randint(1000, 9999)}" for _ in range(len(texts))]
return ids
except Exception as e:
logger.error(f"Error ingesting file {file_path}: {str(e)}")
logger.error(traceback.format_exc())
# Return placeholder IDs if there's an error
return [f"error-{random.randint(1000, 9999)}"]
def ingest_text(self, text: str, metadata: Dict[str, Any] = None) -> List[str]:
"""Ingest raw text into the vector database."""
try:
if not text.strip():
logger.warning("Empty text provided for ingestion")
return ["empty-text-error"]
if metadata is None:
metadata = {}
# Split the text
chunks = self.text_splitter.split_text(text)
logger.info(f"Split text into {len(chunks)} chunks")
# If text splitting produced no chunks (unusual), create one
if not chunks:
chunks = ["Empty text input"]
# Prepare metadatas
metadatas = []
for i in range(len(chunks)):
chunk_metadata = metadata.copy()
chunk_metadata["chunk_id"] = i
chunk_metadata["total_chunks"] = len(chunks)
chunk_metadata["source"] = "direct_input"
chunk_metadata["ingestion_time"] = time.strftime("%Y-%m-%d %H:%M:%S")
metadatas.append(chunk_metadata)
# Store in vector database with retry mechanism
def add_to_vectordb():
return self.memory_manager.add_texts(chunks, metadatas)
try:
ids = self._retry_operation(add_to_vectordb)
logger.info(f"Successfully added text chunks with IDs: {ids[:3] if len(ids) > 3 else ids}...")
except Exception as e:
logger.error(f"All attempts to add text to vector DB failed: {e}")
# Return placeholder IDs
ids = [f"error-{random.randint(1000, 9999)}" for _ in range(len(chunks))]
return ids
except Exception as e:
logger.error(f"Error ingesting text: {str(e)}")
logger.error(traceback.format_exc())
# Return placeholder IDs if there's an error
return [f"error-{random.randint(1000, 9999)}"] |