2B / app /core /ingestion.py
37-AN
Update for Hugging Face Space deployment
2a735cc
import os
import sys
import logging
import time
import random
import traceback
from typing import List, Dict, Any
from langchain_community.document_loaders import (
PyPDFLoader,
TextLoader,
CSVLoader,
UnstructuredFileLoader,
Docx2txtLoader
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Add project root to path for imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from app.config import CHUNK_SIZE, CHUNK_OVERLAP
from app.core.memory import MemoryManager
class DocumentProcessor:
"""Processes documents for ingestion into the vector database."""
def __init__(self, memory_manager: MemoryManager):
self.memory_manager = memory_manager
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=CHUNK_SIZE,
chunk_overlap=CHUNK_OVERLAP
)
logger.info(f"DocumentProcessor initialized with chunk size {CHUNK_SIZE}, overlap {CHUNK_OVERLAP}")
def process_file(self, file_path: str) -> List[str]:
"""Process a file and return a list of document chunks."""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
# Get the file extension
_, extension = os.path.splitext(file_path)
extension = extension.lower()
logger.info(f"Processing file: {file_path} with extension {extension}")
# Verify file is readable
try:
with open(file_path, 'rb') as f:
# Just check if we can read from it
f.read(1)
except Exception as e:
logger.error(f"Cannot read file {file_path}: {e}")
raise IOError(f"File {file_path} exists but cannot be read: {str(e)}")
# Load the file using the appropriate loader
try:
if extension == '.pdf':
loader = PyPDFLoader(file_path)
elif extension == '.txt':
loader = TextLoader(file_path)
elif extension == '.csv':
loader = CSVLoader(file_path)
elif extension in ['.doc', '.docx']:
loader = Docx2txtLoader(file_path)
elif extension in ['.md', '.html', '.htm', '.xml', '.json']:
# Dedicated loaders could be added for these formats
loader = TextLoader(file_path)
else:
# Try generic loader as fallback for unsupported types
logger.warning(f"No specific loader for {extension}, trying UnstructuredFileLoader")
loader = UnstructuredFileLoader(file_path)
# Load and split the documents
documents = loader.load()
if not documents:
logger.warning(f"No content extracted from {file_path}")
# Create a minimal document if empty to avoid errors
from langchain.schema import Document
documents = [Document(page_content=f"Empty file: {os.path.basename(file_path)}",
metadata={"source": file_path})]
chunks = self.text_splitter.split_documents(documents)
logger.info(f"Split file into {len(chunks)} chunks")
return chunks
except Exception as e:
logger.error(f"Error in document processing: {str(e)}")
logger.error(traceback.format_exc())
# Create a minimal document to represent the error
from langchain.schema import Document
error_doc = Document(
page_content=f"Error processing file {os.path.basename(file_path)}: {str(e)}",
metadata={"source": file_path, "error": str(e)}
)
return [error_doc]
def _retry_operation(self, operation, max_retries=3):
"""Retry an operation with exponential backoff."""
last_exception = None
for attempt in range(max_retries):
try:
return operation()
except Exception as e:
last_exception = e
if "already accessed by another instance" in str(e) and attempt < max_retries - 1:
wait_time = random.uniform(0.5, 2.0) * (attempt + 1)
logger.warning(f"Vector store access conflict, retrying ({attempt+1}/{max_retries}) in {wait_time:.2f}s...")
time.sleep(wait_time)
elif attempt < max_retries - 1:
# For other errors, also retry but with different message
wait_time = random.uniform(0.5, 2.0) * (attempt + 1)
logger.warning(f"Operation failed ({str(e)}), retrying ({attempt+1}/{max_retries}) in {wait_time:.2f}s...")
time.sleep(wait_time)
else:
# Different error or last attempt, re-raise
raise
# If we get here with a last_exception, re-raise it
if last_exception:
raise last_exception
else:
raise RuntimeError("Retry operation failed but no exception was captured")
def ingest_file(self, file_path: str, metadata: Dict[str, Any] = None) -> List[str]:
"""Ingest a file into the vector database."""
try:
# Process the file
chunks = self.process_file(file_path)
# Add metadata to each chunk
if metadata is None:
metadata = {}
# Add file path to metadata
base_metadata = {
"source": file_path,
"file_name": os.path.basename(file_path),
"ingestion_time": time.strftime("%Y-%m-%d %H:%M:%S")
}
base_metadata.update(metadata)
# Prepare chunks and metadatas
texts = [chunk.page_content for chunk in chunks]
metadatas = []
for i, chunk in enumerate(chunks):
chunk_metadata = base_metadata.copy()
if hasattr(chunk, 'metadata'):
chunk_metadata.update(chunk.metadata)
chunk_metadata["chunk_id"] = i
chunk_metadata["total_chunks"] = len(chunks)
metadatas.append(chunk_metadata)
# Store in vector database with retry mechanism
logger.info(f"Adding {len(texts)} chunks to vector database")
# Handle empty texts to avoid errors
if not texts:
logger.warning("No text chunks extracted from file, adding placeholder")
texts = [f"Empty file: {os.path.basename(file_path)}"]
metadatas = [{"source": file_path, "file_name": os.path.basename(file_path), "empty_file": True}]
def add_to_vectordb():
return self.memory_manager.add_texts(texts, metadatas)
try:
ids = self._retry_operation(add_to_vectordb)
logger.info(f"Successfully added chunks with IDs: {ids[:3] if len(ids) > 3 else ids}...")
except Exception as e:
logger.error(f"All attempts to add to vector DB failed: {e}")
# Return placeholder IDs
ids = [f"error-{random.randint(1000, 9999)}" for _ in range(len(texts))]
return ids
except Exception as e:
logger.error(f"Error ingesting file {file_path}: {str(e)}")
logger.error(traceback.format_exc())
# Return placeholder IDs if there's an error
return [f"error-{random.randint(1000, 9999)}"]
def ingest_text(self, text: str, metadata: Dict[str, Any] = None) -> List[str]:
"""Ingest raw text into the vector database."""
try:
if not text.strip():
logger.warning("Empty text provided for ingestion")
return ["empty-text-error"]
if metadata is None:
metadata = {}
# Split the text
chunks = self.text_splitter.split_text(text)
logger.info(f"Split text into {len(chunks)} chunks")
# If text splitting produced no chunks (unusual), create one
if not chunks:
chunks = ["Empty text input"]
# Prepare metadatas
metadatas = []
for i in range(len(chunks)):
chunk_metadata = metadata.copy()
chunk_metadata["chunk_id"] = i
chunk_metadata["total_chunks"] = len(chunks)
chunk_metadata["source"] = "direct_input"
chunk_metadata["ingestion_time"] = time.strftime("%Y-%m-%d %H:%M:%S")
metadatas.append(chunk_metadata)
# Store in vector database with retry mechanism
def add_to_vectordb():
return self.memory_manager.add_texts(chunks, metadatas)
try:
ids = self._retry_operation(add_to_vectordb)
logger.info(f"Successfully added text chunks with IDs: {ids[:3] if len(ids) > 3 else ids}...")
except Exception as e:
logger.error(f"All attempts to add text to vector DB failed: {e}")
# Return placeholder IDs
ids = [f"error-{random.randint(1000, 9999)}" for _ in range(len(chunks))]
return ids
except Exception as e:
logger.error(f"Error ingesting text: {str(e)}")
logger.error(traceback.format_exc())
# Return placeholder IDs if there's an error
return [f"error-{random.randint(1000, 9999)}"]