import os import logging from typing import List, Optional from langchain_core.documents import Document from langchain_openai import OpenAIEmbeddings from langchain_community.vectorstores import Pinecone from pinecone import Pinecone, ServerlessSpec # Setup logger logger = logging.getLogger("app.utils.online_vector_store") logger.setLevel(logging.INFO) if not logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter("[%(asctime)s] %(levelname)s - %(message)s") handler.setFormatter(formatter) logger.addHandler(handler) class OnlineVectorStore: def __init__(self): self.pinecone_api_key = os.getenv("PINECONE_API_KEY") self.pinecone_environment = os.getenv("PINECONE_ENVIRONMENT", "gcp-starter") self.index_name = "dubsway-video-ai" if not self.pinecone_api_key: logger.warning("PINECONE_API_KEY not found. Using fallback local storage.") self.use_pinecone = False else: self.use_pinecone = True self._initialize_pinecone() def _initialize_pinecone(self): """Initialize Pinecone client and create index if needed.""" try: pc = Pinecone(api_key=self.pinecone_api_key) # Check if index exists if self.index_name not in pc.list_indexes().names(): logger.info(f"Creating Pinecone index: {self.index_name}") pc.create_index( name=self.index_name, dimension=1536, # OpenAI embeddings dimension metric="cosine", spec=ServerlessSpec( cloud="aws", region="us-east-1" ) ) logger.info(f"Pinecone index {self.index_name} created successfully") else: logger.info(f"Using existing Pinecone index: {self.index_name}") except Exception as e: logger.error(f"Failed to initialize Pinecone: {e}") self.use_pinecone = False def add_documents(self, documents: List[Document], user_id: int) -> bool: """Add documents to the vector store.""" try: if not documents: logger.warning("No documents to add") return False # Add user_id metadata to each document for doc in documents: if not hasattr(doc, 'metadata'): doc.metadata = {} doc.metadata['user_id'] = user_id doc.metadata['source'] = 'video_analysis' if self.use_pinecone: return self._add_to_pinecone(documents, user_id) else: logger.warning("Pinecone not available, skipping vector storage") return False except Exception as e: logger.error(f"Failed to add documents to vector store: {e}") return False def _add_to_pinecone(self, documents: List[Document], user_id: int) -> bool: """Add documents to Pinecone.""" try: embeddings = OpenAIEmbeddings() # Create Pinecone vector store vector_store = Pinecone.from_documents( documents=documents, embedding=embeddings, index_name=self.index_name, namespace=f"user_{user_id}" ) logger.info(f"Successfully added {len(documents)} documents to Pinecone for user {user_id}") return True except Exception as e: logger.error(f"Failed to add documents to Pinecone: {e}") return False def search(self, query: str, user_id: int, k: int = 5) -> List[Document]: """Search for similar documents.""" try: if not self.use_pinecone: logger.warning("Pinecone not available, returning empty results") return [] embeddings = OpenAIEmbeddings() # Create Pinecone vector store for searching vector_store = PineconeVectorStore.from_existing_index( index_name=self.index_name, embedding=embeddings, namespace=f"user_{user_id}" ) # Search for similar documents results = vector_store.similarity_search( query=query, k=k, filter={"user_id": user_id} ) logger.info(f"Found {len(results)} similar documents for user {user_id}") return results except Exception as e: logger.error(f"Failed to search vector store: {e}") return [] def get_user_documents(self, user_id: int, limit: int = 50) -> List[Document]: """Get all documents for a specific user.""" try: if not self.use_pinecone: logger.warning("Pinecone not available, returning empty results") return [] embeddings = OpenAIEmbeddings() # Create Pinecone vector store for searching vector_store = PineconeVectorStore.from_existing_index( index_name=self.index_name, embedding=embeddings, namespace=f"user_{user_id}" ) # Get all documents for the user results = vector_store.similarity_search( query="", # Empty query to get all documents k=limit, filter={"user_id": user_id} ) logger.info(f"Retrieved {len(results)} documents for user {user_id}") return results except Exception as e: logger.error(f"Failed to get user documents: {e}") return [] def delete_user_documents(self, user_id: int) -> bool: """Delete all documents for a specific user.""" try: if not self.use_pinecone: logger.warning("Pinecone not available, skipping deletion") return False pc = Pinecone(api_key=self.pinecone_api_key) index = pc.Index(self.index_name) # Delete all vectors in the user's namespace index.delete(namespace=f"user_{user_id}") logger.info(f"Successfully deleted all documents for user {user_id}") return True except Exception as e: logger.error(f"Failed to delete user documents: {e}") return False # Global instance vector_store = OnlineVectorStore()