dubswayAgenticV2 / app /utils /online_vector_store.py
peace2024's picture
app clean
a408f4b
import os
import logging
from typing import List, Optional
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Pinecone
from pinecone import Pinecone, ServerlessSpec
# Setup logger
logger = logging.getLogger("app.utils.online_vector_store")
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter("[%(asctime)s] %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
class OnlineVectorStore:
def __init__(self):
self.pinecone_api_key = os.getenv("PINECONE_API_KEY")
self.pinecone_environment = os.getenv("PINECONE_ENVIRONMENT", "gcp-starter")
self.index_name = "dubsway-video-ai"
if not self.pinecone_api_key:
logger.warning("PINECONE_API_KEY not found. Using fallback local storage.")
self.use_pinecone = False
else:
self.use_pinecone = True
self._initialize_pinecone()
def _initialize_pinecone(self):
"""Initialize Pinecone client and create index if needed."""
try:
pc = Pinecone(api_key=self.pinecone_api_key)
# Check if index exists
if self.index_name not in pc.list_indexes().names():
logger.info(f"Creating Pinecone index: {self.index_name}")
pc.create_index(
name=self.index_name,
dimension=1536, # OpenAI embeddings dimension
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
)
)
logger.info(f"Pinecone index {self.index_name} created successfully")
else:
logger.info(f"Using existing Pinecone index: {self.index_name}")
except Exception as e:
logger.error(f"Failed to initialize Pinecone: {e}")
self.use_pinecone = False
def add_documents(self, documents: List[Document], user_id: int) -> bool:
"""Add documents to the vector store."""
try:
if not documents:
logger.warning("No documents to add")
return False
# Add user_id metadata to each document
for doc in documents:
if not hasattr(doc, 'metadata'):
doc.metadata = {}
doc.metadata['user_id'] = user_id
doc.metadata['source'] = 'video_analysis'
if self.use_pinecone:
return self._add_to_pinecone(documents, user_id)
else:
logger.warning("Pinecone not available, skipping vector storage")
return False
except Exception as e:
logger.error(f"Failed to add documents to vector store: {e}")
return False
def _add_to_pinecone(self, documents: List[Document], user_id: int) -> bool:
"""Add documents to Pinecone."""
try:
embeddings = OpenAIEmbeddings()
# Create Pinecone vector store
vector_store = Pinecone.from_documents(
documents=documents,
embedding=embeddings,
index_name=self.index_name,
namespace=f"user_{user_id}"
)
logger.info(f"Successfully added {len(documents)} documents to Pinecone for user {user_id}")
return True
except Exception as e:
logger.error(f"Failed to add documents to Pinecone: {e}")
return False
def search(self, query: str, user_id: int, k: int = 5) -> List[Document]:
"""Search for similar documents."""
try:
if not self.use_pinecone:
logger.warning("Pinecone not available, returning empty results")
return []
embeddings = OpenAIEmbeddings()
# Create Pinecone vector store for searching
vector_store = PineconeVectorStore.from_existing_index(
index_name=self.index_name,
embedding=embeddings,
namespace=f"user_{user_id}"
)
# Search for similar documents
results = vector_store.similarity_search(
query=query,
k=k,
filter={"user_id": user_id}
)
logger.info(f"Found {len(results)} similar documents for user {user_id}")
return results
except Exception as e:
logger.error(f"Failed to search vector store: {e}")
return []
def get_user_documents(self, user_id: int, limit: int = 50) -> List[Document]:
"""Get all documents for a specific user."""
try:
if not self.use_pinecone:
logger.warning("Pinecone not available, returning empty results")
return []
embeddings = OpenAIEmbeddings()
# Create Pinecone vector store for searching
vector_store = PineconeVectorStore.from_existing_index(
index_name=self.index_name,
embedding=embeddings,
namespace=f"user_{user_id}"
)
# Get all documents for the user
results = vector_store.similarity_search(
query="", # Empty query to get all documents
k=limit,
filter={"user_id": user_id}
)
logger.info(f"Retrieved {len(results)} documents for user {user_id}")
return results
except Exception as e:
logger.error(f"Failed to get user documents: {e}")
return []
def delete_user_documents(self, user_id: int) -> bool:
"""Delete all documents for a specific user."""
try:
if not self.use_pinecone:
logger.warning("Pinecone not available, skipping deletion")
return False
pc = Pinecone(api_key=self.pinecone_api_key)
index = pc.Index(self.index_name)
# Delete all vectors in the user's namespace
index.delete(namespace=f"user_{user_id}")
logger.info(f"Successfully deleted all documents for user {user_id}")
return True
except Exception as e:
logger.error(f"Failed to delete user documents: {e}")
return False
# Global instance
vector_store = OnlineVectorStore()