import os import sys import logging import time import random import traceback from typing import List, Dict, Any from langchain_community.document_loaders import ( PyPDFLoader, TextLoader, CSVLoader, UnstructuredFileLoader, Docx2txtLoader ) from langchain.text_splitter import RecursiveCharacterTextSplitter # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Add project root to path for imports sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from app.config import CHUNK_SIZE, CHUNK_OVERLAP from app.core.memory import MemoryManager class DocumentProcessor: """Processes documents for ingestion into the vector database.""" def __init__(self, memory_manager: MemoryManager): self.memory_manager = memory_manager self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP ) logger.info(f"DocumentProcessor initialized with chunk size {CHUNK_SIZE}, overlap {CHUNK_OVERLAP}") def process_file(self, file_path: str) -> List[str]: """Process a file and return a list of document chunks.""" if not os.path.exists(file_path): raise FileNotFoundError(f"File not found: {file_path}") # Get the file extension _, extension = os.path.splitext(file_path) extension = extension.lower() logger.info(f"Processing file: {file_path} with extension {extension}") # Verify file is readable try: with open(file_path, 'rb') as f: # Just check if we can read from it f.read(1) except Exception as e: logger.error(f"Cannot read file {file_path}: {e}") raise IOError(f"File {file_path} exists but cannot be read: {str(e)}") # Load the file using the appropriate loader try: if extension == '.pdf': loader = PyPDFLoader(file_path) elif extension == '.txt': loader = TextLoader(file_path) elif extension == '.csv': loader = CSVLoader(file_path) elif extension in ['.doc', '.docx']: loader = Docx2txtLoader(file_path) elif extension in ['.md', '.html', '.htm', '.xml', '.json']: # Dedicated loaders could be added for these formats loader = TextLoader(file_path) else: # Try generic loader as fallback for unsupported types logger.warning(f"No specific loader for {extension}, trying UnstructuredFileLoader") loader = UnstructuredFileLoader(file_path) # Load and split the documents documents = loader.load() if not documents: logger.warning(f"No content extracted from {file_path}") # Create a minimal document if empty to avoid errors from langchain.schema import Document documents = [Document(page_content=f"Empty file: {os.path.basename(file_path)}", metadata={"source": file_path})] chunks = self.text_splitter.split_documents(documents) logger.info(f"Split file into {len(chunks)} chunks") return chunks except Exception as e: logger.error(f"Error in document processing: {str(e)}") logger.error(traceback.format_exc()) # Create a minimal document to represent the error from langchain.schema import Document error_doc = Document( page_content=f"Error processing file {os.path.basename(file_path)}: {str(e)}", metadata={"source": file_path, "error": str(e)} ) return [error_doc] def _retry_operation(self, operation, max_retries=3): """Retry an operation with exponential backoff.""" last_exception = None for attempt in range(max_retries): try: return operation() except Exception as e: last_exception = e if "already accessed by another instance" in str(e) and attempt < max_retries - 1: wait_time = random.uniform(0.5, 2.0) * (attempt + 1) logger.warning(f"Vector store access conflict, retrying ({attempt+1}/{max_retries}) in {wait_time:.2f}s...") time.sleep(wait_time) elif attempt < max_retries - 1: # For other errors, also retry but with different message wait_time = random.uniform(0.5, 2.0) * (attempt + 1) logger.warning(f"Operation failed ({str(e)}), retrying ({attempt+1}/{max_retries}) in {wait_time:.2f}s...") time.sleep(wait_time) else: # Different error or last attempt, re-raise raise # If we get here with a last_exception, re-raise it if last_exception: raise last_exception else: raise RuntimeError("Retry operation failed but no exception was captured") def ingest_file(self, file_path: str, metadata: Dict[str, Any] = None) -> List[str]: """Ingest a file into the vector database.""" try: # Process the file chunks = self.process_file(file_path) # Add metadata to each chunk if metadata is None: metadata = {} # Add file path to metadata base_metadata = { "source": file_path, "file_name": os.path.basename(file_path), "ingestion_time": time.strftime("%Y-%m-%d %H:%M:%S") } base_metadata.update(metadata) # Prepare chunks and metadatas texts = [chunk.page_content for chunk in chunks] metadatas = [] for i, chunk in enumerate(chunks): chunk_metadata = base_metadata.copy() if hasattr(chunk, 'metadata'): chunk_metadata.update(chunk.metadata) chunk_metadata["chunk_id"] = i chunk_metadata["total_chunks"] = len(chunks) metadatas.append(chunk_metadata) # Store in vector database with retry mechanism logger.info(f"Adding {len(texts)} chunks to vector database") # Handle empty texts to avoid errors if not texts: logger.warning("No text chunks extracted from file, adding placeholder") texts = [f"Empty file: {os.path.basename(file_path)}"] metadatas = [{"source": file_path, "file_name": os.path.basename(file_path), "empty_file": True}] def add_to_vectordb(): return self.memory_manager.add_texts(texts, metadatas) try: ids = self._retry_operation(add_to_vectordb) logger.info(f"Successfully added chunks with IDs: {ids[:3] if len(ids) > 3 else ids}...") except Exception as e: logger.error(f"All attempts to add to vector DB failed: {e}") # Return placeholder IDs ids = [f"error-{random.randint(1000, 9999)}" for _ in range(len(texts))] return ids except Exception as e: logger.error(f"Error ingesting file {file_path}: {str(e)}") logger.error(traceback.format_exc()) # Return placeholder IDs if there's an error return [f"error-{random.randint(1000, 9999)}"] def ingest_text(self, text: str, metadata: Dict[str, Any] = None) -> List[str]: """Ingest raw text into the vector database.""" try: if not text.strip(): logger.warning("Empty text provided for ingestion") return ["empty-text-error"] if metadata is None: metadata = {} # Split the text chunks = self.text_splitter.split_text(text) logger.info(f"Split text into {len(chunks)} chunks") # If text splitting produced no chunks (unusual), create one if not chunks: chunks = ["Empty text input"] # Prepare metadatas metadatas = [] for i in range(len(chunks)): chunk_metadata = metadata.copy() chunk_metadata["chunk_id"] = i chunk_metadata["total_chunks"] = len(chunks) chunk_metadata["source"] = "direct_input" chunk_metadata["ingestion_time"] = time.strftime("%Y-%m-%d %H:%M:%S") metadatas.append(chunk_metadata) # Store in vector database with retry mechanism def add_to_vectordb(): return self.memory_manager.add_texts(chunks, metadatas) try: ids = self._retry_operation(add_to_vectordb) logger.info(f"Successfully added text chunks with IDs: {ids[:3] if len(ids) > 3 else ids}...") except Exception as e: logger.error(f"All attempts to add text to vector DB failed: {e}") # Return placeholder IDs ids = [f"error-{random.randint(1000, 9999)}" for _ in range(len(chunks))] return ids except Exception as e: logger.error(f"Error ingesting text: {str(e)}") logger.error(traceback.format_exc()) # Return placeholder IDs if there's an error return [f"error-{random.randint(1000, 9999)}"]