Spaces:
Running
Running
import os | |
import logging | |
from typing import List, Optional | |
from langchain_core.documents import Document | |
from langchain_openai import OpenAIEmbeddings | |
from langchain_community.vectorstores import Pinecone | |
from pinecone import Pinecone, ServerlessSpec | |
# Setup logger | |
logger = logging.getLogger("app.utils.online_vector_store") | |
logger.setLevel(logging.INFO) | |
if not logger.handlers: | |
handler = logging.StreamHandler() | |
formatter = logging.Formatter("[%(asctime)s] %(levelname)s - %(message)s") | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
class OnlineVectorStore: | |
def __init__(self): | |
self.pinecone_api_key = os.getenv("PINECONE_API_KEY") | |
self.pinecone_environment = os.getenv("PINECONE_ENVIRONMENT", "gcp-starter") | |
self.index_name = "dubsway-video-ai" | |
if not self.pinecone_api_key: | |
logger.warning("PINECONE_API_KEY not found. Using fallback local storage.") | |
self.use_pinecone = False | |
else: | |
self.use_pinecone = True | |
self._initialize_pinecone() | |
def _initialize_pinecone(self): | |
"""Initialize Pinecone client and create index if needed.""" | |
try: | |
pc = Pinecone(api_key=self.pinecone_api_key) | |
# Check if index exists | |
if self.index_name not in pc.list_indexes().names(): | |
logger.info(f"Creating Pinecone index: {self.index_name}") | |
pc.create_index( | |
name=self.index_name, | |
dimension=1536, # OpenAI embeddings dimension | |
metric="cosine", | |
spec=ServerlessSpec( | |
cloud="aws", | |
region="us-east-1" | |
) | |
) | |
logger.info(f"Pinecone index {self.index_name} created successfully") | |
else: | |
logger.info(f"Using existing Pinecone index: {self.index_name}") | |
except Exception as e: | |
logger.error(f"Failed to initialize Pinecone: {e}") | |
self.use_pinecone = False | |
def add_documents(self, documents: List[Document], user_id: int) -> bool: | |
"""Add documents to the vector store.""" | |
try: | |
if not documents: | |
logger.warning("No documents to add") | |
return False | |
# Add user_id metadata to each document | |
for doc in documents: | |
if not hasattr(doc, 'metadata'): | |
doc.metadata = {} | |
doc.metadata['user_id'] = user_id | |
doc.metadata['source'] = 'video_analysis' | |
if self.use_pinecone: | |
return self._add_to_pinecone(documents, user_id) | |
else: | |
logger.warning("Pinecone not available, skipping vector storage") | |
return False | |
except Exception as e: | |
logger.error(f"Failed to add documents to vector store: {e}") | |
return False | |
def _add_to_pinecone(self, documents: List[Document], user_id: int) -> bool: | |
"""Add documents to Pinecone.""" | |
try: | |
embeddings = OpenAIEmbeddings() | |
# Create Pinecone vector store | |
vector_store = Pinecone.from_documents( | |
documents=documents, | |
embedding=embeddings, | |
index_name=self.index_name, | |
namespace=f"user_{user_id}" | |
) | |
logger.info(f"Successfully added {len(documents)} documents to Pinecone for user {user_id}") | |
return True | |
except Exception as e: | |
logger.error(f"Failed to add documents to Pinecone: {e}") | |
return False | |
def search(self, query: str, user_id: int, k: int = 5) -> List[Document]: | |
"""Search for similar documents.""" | |
try: | |
if not self.use_pinecone: | |
logger.warning("Pinecone not available, returning empty results") | |
return [] | |
embeddings = OpenAIEmbeddings() | |
# Create Pinecone vector store for searching | |
vector_store = PineconeVectorStore.from_existing_index( | |
index_name=self.index_name, | |
embedding=embeddings, | |
namespace=f"user_{user_id}" | |
) | |
# Search for similar documents | |
results = vector_store.similarity_search( | |
query=query, | |
k=k, | |
filter={"user_id": user_id} | |
) | |
logger.info(f"Found {len(results)} similar documents for user {user_id}") | |
return results | |
except Exception as e: | |
logger.error(f"Failed to search vector store: {e}") | |
return [] | |
def get_user_documents(self, user_id: int, limit: int = 50) -> List[Document]: | |
"""Get all documents for a specific user.""" | |
try: | |
if not self.use_pinecone: | |
logger.warning("Pinecone not available, returning empty results") | |
return [] | |
embeddings = OpenAIEmbeddings() | |
# Create Pinecone vector store for searching | |
vector_store = PineconeVectorStore.from_existing_index( | |
index_name=self.index_name, | |
embedding=embeddings, | |
namespace=f"user_{user_id}" | |
) | |
# Get all documents for the user | |
results = vector_store.similarity_search( | |
query="", # Empty query to get all documents | |
k=limit, | |
filter={"user_id": user_id} | |
) | |
logger.info(f"Retrieved {len(results)} documents for user {user_id}") | |
return results | |
except Exception as e: | |
logger.error(f"Failed to get user documents: {e}") | |
return [] | |
def delete_user_documents(self, user_id: int) -> bool: | |
"""Delete all documents for a specific user.""" | |
try: | |
if not self.use_pinecone: | |
logger.warning("Pinecone not available, skipping deletion") | |
return False | |
pc = Pinecone(api_key=self.pinecone_api_key) | |
index = pc.Index(self.index_name) | |
# Delete all vectors in the user's namespace | |
index.delete(namespace=f"user_{user_id}") | |
logger.info(f"Successfully deleted all documents for user {user_id}") | |
return True | |
except Exception as e: | |
logger.error(f"Failed to delete user documents: {e}") | |
return False | |
# Global instance | |
vector_store = OnlineVectorStore() |