|
import os |
|
import sys |
|
import logging |
|
import time |
|
import random |
|
import traceback |
|
from typing import List, Dict, Any |
|
from langchain_community.document_loaders import ( |
|
PyPDFLoader, |
|
TextLoader, |
|
CSVLoader, |
|
UnstructuredFileLoader, |
|
Docx2txtLoader |
|
) |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) |
|
from app.config import CHUNK_SIZE, CHUNK_OVERLAP |
|
from app.core.memory import MemoryManager |
|
|
|
class DocumentProcessor: |
|
"""Processes documents for ingestion into the vector database.""" |
|
|
|
def __init__(self, memory_manager: MemoryManager): |
|
self.memory_manager = memory_manager |
|
self.text_splitter = RecursiveCharacterTextSplitter( |
|
chunk_size=CHUNK_SIZE, |
|
chunk_overlap=CHUNK_OVERLAP |
|
) |
|
logger.info(f"DocumentProcessor initialized with chunk size {CHUNK_SIZE}, overlap {CHUNK_OVERLAP}") |
|
|
|
def process_file(self, file_path: str) -> List[str]: |
|
"""Process a file and return a list of document chunks.""" |
|
if not os.path.exists(file_path): |
|
raise FileNotFoundError(f"File not found: {file_path}") |
|
|
|
|
|
_, extension = os.path.splitext(file_path) |
|
extension = extension.lower() |
|
|
|
logger.info(f"Processing file: {file_path} with extension {extension}") |
|
|
|
|
|
try: |
|
with open(file_path, 'rb') as f: |
|
|
|
f.read(1) |
|
except Exception as e: |
|
logger.error(f"Cannot read file {file_path}: {e}") |
|
raise IOError(f"File {file_path} exists but cannot be read: {str(e)}") |
|
|
|
|
|
try: |
|
if extension == '.pdf': |
|
loader = PyPDFLoader(file_path) |
|
elif extension == '.txt': |
|
loader = TextLoader(file_path) |
|
elif extension == '.csv': |
|
loader = CSVLoader(file_path) |
|
elif extension in ['.doc', '.docx']: |
|
loader = Docx2txtLoader(file_path) |
|
elif extension in ['.md', '.html', '.htm', '.xml', '.json']: |
|
|
|
loader = TextLoader(file_path) |
|
else: |
|
|
|
logger.warning(f"No specific loader for {extension}, trying UnstructuredFileLoader") |
|
loader = UnstructuredFileLoader(file_path) |
|
|
|
|
|
documents = loader.load() |
|
|
|
if not documents: |
|
logger.warning(f"No content extracted from {file_path}") |
|
|
|
from langchain.schema import Document |
|
documents = [Document(page_content=f"Empty file: {os.path.basename(file_path)}", |
|
metadata={"source": file_path})] |
|
|
|
chunks = self.text_splitter.split_documents(documents) |
|
|
|
logger.info(f"Split file into {len(chunks)} chunks") |
|
return chunks |
|
|
|
except Exception as e: |
|
logger.error(f"Error in document processing: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
|
|
|
|
from langchain.schema import Document |
|
error_doc = Document( |
|
page_content=f"Error processing file {os.path.basename(file_path)}: {str(e)}", |
|
metadata={"source": file_path, "error": str(e)} |
|
) |
|
return [error_doc] |
|
|
|
def _retry_operation(self, operation, max_retries=3): |
|
"""Retry an operation with exponential backoff.""" |
|
last_exception = None |
|
for attempt in range(max_retries): |
|
try: |
|
return operation() |
|
except Exception as e: |
|
last_exception = e |
|
if "already accessed by another instance" in str(e) and attempt < max_retries - 1: |
|
wait_time = random.uniform(0.5, 2.0) * (attempt + 1) |
|
logger.warning(f"Vector store access conflict, retrying ({attempt+1}/{max_retries}) in {wait_time:.2f}s...") |
|
time.sleep(wait_time) |
|
elif attempt < max_retries - 1: |
|
|
|
wait_time = random.uniform(0.5, 2.0) * (attempt + 1) |
|
logger.warning(f"Operation failed ({str(e)}), retrying ({attempt+1}/{max_retries}) in {wait_time:.2f}s...") |
|
time.sleep(wait_time) |
|
else: |
|
|
|
raise |
|
|
|
|
|
if last_exception: |
|
raise last_exception |
|
else: |
|
raise RuntimeError("Retry operation failed but no exception was captured") |
|
|
|
def ingest_file(self, file_path: str, metadata: Dict[str, Any] = None) -> List[str]: |
|
"""Ingest a file into the vector database.""" |
|
try: |
|
|
|
chunks = self.process_file(file_path) |
|
|
|
|
|
if metadata is None: |
|
metadata = {} |
|
|
|
|
|
base_metadata = { |
|
"source": file_path, |
|
"file_name": os.path.basename(file_path), |
|
"ingestion_time": time.strftime("%Y-%m-%d %H:%M:%S") |
|
} |
|
base_metadata.update(metadata) |
|
|
|
|
|
texts = [chunk.page_content for chunk in chunks] |
|
metadatas = [] |
|
|
|
for i, chunk in enumerate(chunks): |
|
chunk_metadata = base_metadata.copy() |
|
if hasattr(chunk, 'metadata'): |
|
chunk_metadata.update(chunk.metadata) |
|
chunk_metadata["chunk_id"] = i |
|
chunk_metadata["total_chunks"] = len(chunks) |
|
metadatas.append(chunk_metadata) |
|
|
|
|
|
logger.info(f"Adding {len(texts)} chunks to vector database") |
|
|
|
|
|
if not texts: |
|
logger.warning("No text chunks extracted from file, adding placeholder") |
|
texts = [f"Empty file: {os.path.basename(file_path)}"] |
|
metadatas = [{"source": file_path, "file_name": os.path.basename(file_path), "empty_file": True}] |
|
|
|
def add_to_vectordb(): |
|
return self.memory_manager.add_texts(texts, metadatas) |
|
|
|
try: |
|
ids = self._retry_operation(add_to_vectordb) |
|
logger.info(f"Successfully added chunks with IDs: {ids[:3] if len(ids) > 3 else ids}...") |
|
except Exception as e: |
|
logger.error(f"All attempts to add to vector DB failed: {e}") |
|
|
|
ids = [f"error-{random.randint(1000, 9999)}" for _ in range(len(texts))] |
|
|
|
return ids |
|
except Exception as e: |
|
logger.error(f"Error ingesting file {file_path}: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
|
|
return [f"error-{random.randint(1000, 9999)}"] |
|
|
|
def ingest_text(self, text: str, metadata: Dict[str, Any] = None) -> List[str]: |
|
"""Ingest raw text into the vector database.""" |
|
try: |
|
if not text.strip(): |
|
logger.warning("Empty text provided for ingestion") |
|
return ["empty-text-error"] |
|
|
|
if metadata is None: |
|
metadata = {} |
|
|
|
|
|
chunks = self.text_splitter.split_text(text) |
|
logger.info(f"Split text into {len(chunks)} chunks") |
|
|
|
|
|
if not chunks: |
|
chunks = ["Empty text input"] |
|
|
|
|
|
metadatas = [] |
|
for i in range(len(chunks)): |
|
chunk_metadata = metadata.copy() |
|
chunk_metadata["chunk_id"] = i |
|
chunk_metadata["total_chunks"] = len(chunks) |
|
chunk_metadata["source"] = "direct_input" |
|
chunk_metadata["ingestion_time"] = time.strftime("%Y-%m-%d %H:%M:%S") |
|
metadatas.append(chunk_metadata) |
|
|
|
|
|
def add_to_vectordb(): |
|
return self.memory_manager.add_texts(chunks, metadatas) |
|
|
|
try: |
|
ids = self._retry_operation(add_to_vectordb) |
|
logger.info(f"Successfully added text chunks with IDs: {ids[:3] if len(ids) > 3 else ids}...") |
|
except Exception as e: |
|
logger.error(f"All attempts to add text to vector DB failed: {e}") |
|
|
|
ids = [f"error-{random.randint(1000, 9999)}" for _ in range(len(chunks))] |
|
|
|
return ids |
|
except Exception as e: |
|
logger.error(f"Error ingesting text: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
|
|
return [f"error-{random.randint(1000, 9999)}"] |