import logging import asyncio from typing import Dict, Any, Optional import tempfile import os from pathlib import Path import uuid from core.document_parser import DocumentParser from core.chunker import TextChunker from core.text_preprocessor import TextPreprocessor from services.vector_store_service import VectorStoreService from services.document_store_service import DocumentStoreService from services.embedding_service import EmbeddingService from services.ocr_service import OCRService logger = logging.getLogger(__name__) class IngestionTool: def __init__(self, vector_store: VectorStoreService, document_store: DocumentStoreService, embedding_service: EmbeddingService, ocr_service: OCRService): self.vector_store = vector_store self.document_store = document_store self.embedding_service = embedding_service self.ocr_service = ocr_service self.document_parser = DocumentParser() # Pass OCR service to document parser self.document_parser.ocr_service = ocr_service self.text_chunker = TextChunker() self.text_preprocessor = TextPreprocessor() async def process_document(self, file_path: str, file_type: str, task_id: Optional[str] = None) -> Dict[str, Any]: """Process a document through the full ingestion pipeline""" if task_id is None: task_id = str(uuid.uuid4()) try: logger.info(f"Starting document processing for {file_path}") # Step 1: Parse the document filename = Path(file_path).name document = await self.document_parser.parse_document(file_path, filename) if not document.content: logger.warning(f"No content extracted from document {filename}") return { "success": False, "error": "No content could be extracted from the document", "task_id": task_id } # Step 2: Store the document await self.document_store.store_document(document) # Step 3: Process content for embeddings chunks = await self._create_and_embed_chunks(document) if not chunks: logger.warning(f"No chunks created for document {document.id}") return { "success": False, "error": "Failed to create text chunks", "task_id": task_id, "document_id": document.id } # Step 4: Store embeddings success = await self.vector_store.add_chunks(chunks) if not success: logger.error(f"Failed to store embeddings for document {document.id}") return { "success": False, "error": "Failed to store embeddings", "task_id": task_id, "document_id": document.id } logger.info(f"Successfully processed document {document.id} with {len(chunks)} chunks") return { "success": True, "task_id": task_id, "document_id": document.id, "filename": document.filename, "chunks_created": len(chunks), "content_length": len(document.content), "doc_type": document.doc_type.value, "message": f"Successfully processed {filename}" } except Exception as e: logger.error(f"Error processing document {file_path}: {str(e)}") return { "success": False, "error": str(e), "task_id": task_id, "message": f"Failed to process document: {str(e)}" } async def _create_and_embed_chunks(self, document) -> list: """Create chunks and generate embeddings""" try: # Step 1: Create chunks chunks = self.text_chunker.chunk_document( document.id, document.content, method="recursive" ) if not chunks: return [] # Step 2: Optimize chunks for embedding optimized_chunks = self.text_chunker.optimize_chunks_for_embedding(chunks) # Step 3: Generate embeddings texts = [chunk.content for chunk in optimized_chunks] embeddings = await self.embedding_service.generate_embeddings(texts) # Step 4: Add embeddings to chunks embedded_chunks = [] for i, chunk in enumerate(optimized_chunks): if i < len(embeddings): chunk.embedding = embeddings[i] embedded_chunks.append(chunk) return embedded_chunks except Exception as e: logger.error(f"Error creating and embedding chunks: {str(e)}") return [] async def process_url(self, url: str, task_id: Optional[str] = None) -> Dict[str, Any]: """Process a document from a URL""" try: import requests from urllib.parse import urlparse # Download the file response = requests.get(url, timeout=30) response.raise_for_status() # Determine file type from URL or content-type parsed_url = urlparse(url) filename = Path(parsed_url.path).name or "downloaded_file" # Create temporary file with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{filename}") as tmp_file: tmp_file.write(response.content) tmp_file_path = tmp_file.name try: # Process the downloaded file result = await self.process_document(tmp_file_path, "", task_id) result["source_url"] = url return result finally: # Clean up temporary file if os.path.exists(tmp_file_path): os.unlink(tmp_file_path) except Exception as e: logger.error(f"Error processing URL {url}: {str(e)}") return { "success": False, "error": str(e), "task_id": task_id or str(uuid.uuid4()), "source_url": url } async def process_text_content(self, content: str, filename: str = "text_content.txt", task_id: Optional[str] = None) -> Dict[str, Any]: """Process raw text content directly""" try: from core.models import Document, DocumentType from datetime import datetime # Create document object document = Document( id=str(uuid.uuid4()), filename=filename, content=content, doc_type=DocumentType.TEXT, file_size=len(content.encode('utf-8')), created_at=datetime.utcnow(), metadata={ "source": "direct_text_input", "content_length": len(content), "word_count": len(content.split()) } ) # Store the document await self.document_store.store_document(document) # Process content for embeddings chunks = await self._create_and_embed_chunks(document) if chunks: await self.vector_store.add_chunks(chunks) return { "success": True, "task_id": task_id or str(uuid.uuid4()), "document_id": document.id, "filename": filename, "chunks_created": len(chunks), "content_length": len(content), "message": f"Successfully processed text content" } except Exception as e: logger.error(f"Error processing text content: {str(e)}") return { "success": False, "error": str(e), "task_id": task_id or str(uuid.uuid4()) } async def reprocess_document(self, document_id: str, task_id: Optional[str] = None) -> Dict[str, Any]: """Reprocess an existing document (useful for updating embeddings)""" try: # Get the document document = await self.document_store.get_document(document_id) if not document: return { "success": False, "error": f"Document {document_id} not found", "task_id": task_id or str(uuid.uuid4()) } # Remove existing chunks from vector store await self.vector_store.delete_document(document_id) # Recreate and embed chunks chunks = await self._create_and_embed_chunks(document) if chunks: await self.vector_store.add_chunks(chunks) return { "success": True, "task_id": task_id or str(uuid.uuid4()), "document_id": document_id, "filename": document.filename, "chunks_created": len(chunks), "message": f"Successfully reprocessed {document.filename}" } except Exception as e: logger.error(f"Error reprocessing document {document_id}: {str(e)}") return { "success": False, "error": str(e), "task_id": task_id or str(uuid.uuid4()), "document_id": document_id } async def batch_process_directory(self, directory_path: str, task_id: Optional[str] = None) -> Dict[str, Any]: """Process multiple documents from a directory""" try: directory = Path(directory_path) if not directory.exists() or not directory.is_dir(): return { "success": False, "error": f"Directory {directory_path} does not exist", "task_id": task_id or str(uuid.uuid4()) } # Supported file extensions supported_extensions = {'.txt', '.pdf', '.docx', '.png', '.jpg', '.jpeg', '.bmp', '.tiff'} # Find all supported files files_to_process = [] for ext in supported_extensions: files_to_process.extend(directory.glob(f"*{ext}")) files_to_process.extend(directory.glob(f"*{ext.upper()}")) if not files_to_process: return { "success": False, "error": "No supported files found in directory", "task_id": task_id or str(uuid.uuid4()) } # Process files results = [] successful = 0 failed = 0 for file_path in files_to_process: try: result = await self.process_document(str(file_path), file_path.suffix) results.append(result) if result.get("success"): successful += 1 else: failed += 1 except Exception as e: failed += 1 results.append({ "success": False, "error": str(e), "filename": file_path.name }) return { "success": True, "task_id": task_id or str(uuid.uuid4()), "directory": str(directory), "total_files": len(files_to_process), "successful": successful, "failed": failed, "results": results, "message": f"Processed {successful}/{len(files_to_process)} files successfully" } except Exception as e: logger.error(f"Error batch processing directory {directory_path}: {str(e)}") return { "success": False, "error": str(e), "task_id": task_id or str(uuid.uuid4()) }