""" Refactored preprocessing pipeline for all RAG methods. Uses utils.py functions and supports multiple retrieval methods. Directory Layout: /data/ # Original PDFs, HTML /embeddings/ # FAISS, Chroma, DPR vector stores /graph/ # Graph database files /metadata/ # Image metadata (SQLite or MongoDB) """ import logging from pathlib import Path from config import * from utils import ( DocumentLoader, TextPreprocessor, VectorStoreManager, ImageProcessor, ImageData ) logger = logging.getLogger(__name__) # Ensure all directories exist ensure_directories() def preprocess_for_method(method: str, documents: list): """Preprocess documents for a specific retrieval method.""" print(f"\n{'='*50}") print(f"Preprocessing for method: {method}") print(f"{'='*50}") try: # Initialize processors text_processor = TextPreprocessor() vector_manager = VectorStoreManager() # Preprocess text chunks for this method chunks = text_processor.preprocess_for_method(documents, method) if method == 'vanilla': # Build FAISS index with OpenAI embeddings index, metadata = vector_manager.build_faiss_index(chunks, method="vanilla") vector_manager.save_index(index, metadata, method) elif method == 'dpr': # Build FAISS index with sentence transformer embeddings index, metadata = vector_manager.build_faiss_index(chunks, method="dpr") vector_manager.save_index(index, metadata, method) elif method == 'bm25': # Build BM25 index bm25_index = vector_manager.build_bm25_index(chunks) vector_manager.save_index(bm25_index, chunks, method) elif method == 'graph': # Build NetworkX graph graph = vector_manager.build_graph_index(chunks) vector_manager.save_index(graph, None, method) elif method == 'context_stuffing': # Save full documents for context stuffing vector_manager.save_index(None, chunks, method) else: raise ValueError(f"Unknown method: {method}") print(f"Successfully preprocessed for method '{method}'") except Exception as e: logger.error(f"Error preprocessing for {method}: {e}") raise def extract_and_process_images(documents: list): """Extract images from documents and process them.""" print("\n" + "="*50) print("Extracting and processing images...") print("="*50) image_processor = ImageProcessor() processed_count = 0 filtered_count = 0 filter_reasons = {} for doc in documents: if 'images' in doc and doc['images']: for image_info in doc['images']: try: # Check if image should be filtered out should_filter, reason = image_processor.should_filter_image(image_info['image_path']) if should_filter: filtered_count += 1 filter_reasons[reason] = filter_reasons.get(reason, 0) + 1 print(f" Filtered: {image_info['image_id']} - {reason}") # Optionally delete the filtered image file try: import os os.remove(image_info['image_path']) print(f" Deleted: {image_info['image_path']}") except Exception as e: logger.warning(f"Could not delete filtered image {image_info['image_path']}: {e}") continue # Classify image classification = image_processor.classify_image(image_info['image_path']) # Generate embedding (placeholder for now) # embedding = embed_image_clip([image_info['image_path']])[0] # Create ImageData object image_data = ImageData( image_path=image_info['image_path'], image_id=image_info['image_id'], classification=classification, metadata={ 'source': doc['source'], 'page': image_info.get('page'), 'extracted_from': doc['path'] } ) # Store in database image_processor.store_image_metadata(image_data) processed_count += 1 except Exception as e: logger.error(f"Error processing image {image_info['image_id']}: {e}") continue # Print filtering summary if filtered_count > 0: print(f"\nImage Filtering Summary:") print(f" Total filtered: {filtered_count}") for reason, count in filter_reasons.items(): print(f" {reason}: {count}") print() if processed_count > 0: print(f"Processed and stored metadata for {processed_count} images") else: print("No images found in documents") def main(): """Main preprocessing pipeline.""" # Validate configuration try: validate_api_key() except ValueError as e: print(f"Error: {e}") return # Print configuration print_config() print("\nStarting preprocessing pipeline...") # Load documents using utils print("\nLoading documents...") loader = DocumentLoader() documents = loader.load_text_documents() print(f"Loaded {len(documents)} documents") # Define methods to preprocess methods = ['vanilla', 'dpr', 'bm25', 'graph', 'context_stuffing'] # Preprocess for each method for method in methods: try: preprocess_for_method(method, documents) except Exception as e: print(f"Error preprocessing for {method}: {e}") import traceback traceback.print_exc() # Extract and process images try: extract_and_process_images(documents) except Exception as e: print(f"Error processing images: {e}") import traceback traceback.print_exc() print("\n" + "="*50) print("Preprocessing complete!") print("="*50) if __name__ == "__main__": main()