Spaces:
Building
Building
File size: 6,867 Bytes
a408f4b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
import os
import logging
from typing import List, Optional
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Pinecone
from pinecone import Pinecone, ServerlessSpec
# Setup logger
logger = logging.getLogger("app.utils.online_vector_store")
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter("[%(asctime)s] %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
class OnlineVectorStore:
def __init__(self):
self.pinecone_api_key = os.getenv("PINECONE_API_KEY")
self.pinecone_environment = os.getenv("PINECONE_ENVIRONMENT", "gcp-starter")
self.index_name = "dubsway-video-ai"
if not self.pinecone_api_key:
logger.warning("PINECONE_API_KEY not found. Using fallback local storage.")
self.use_pinecone = False
else:
self.use_pinecone = True
self._initialize_pinecone()
def _initialize_pinecone(self):
"""Initialize Pinecone client and create index if needed."""
try:
pc = Pinecone(api_key=self.pinecone_api_key)
# Check if index exists
if self.index_name not in pc.list_indexes().names():
logger.info(f"Creating Pinecone index: {self.index_name}")
pc.create_index(
name=self.index_name,
dimension=1536, # OpenAI embeddings dimension
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
)
)
logger.info(f"Pinecone index {self.index_name} created successfully")
else:
logger.info(f"Using existing Pinecone index: {self.index_name}")
except Exception as e:
logger.error(f"Failed to initialize Pinecone: {e}")
self.use_pinecone = False
def add_documents(self, documents: List[Document], user_id: int) -> bool:
"""Add documents to the vector store."""
try:
if not documents:
logger.warning("No documents to add")
return False
# Add user_id metadata to each document
for doc in documents:
if not hasattr(doc, 'metadata'):
doc.metadata = {}
doc.metadata['user_id'] = user_id
doc.metadata['source'] = 'video_analysis'
if self.use_pinecone:
return self._add_to_pinecone(documents, user_id)
else:
logger.warning("Pinecone not available, skipping vector storage")
return False
except Exception as e:
logger.error(f"Failed to add documents to vector store: {e}")
return False
def _add_to_pinecone(self, documents: List[Document], user_id: int) -> bool:
"""Add documents to Pinecone."""
try:
embeddings = OpenAIEmbeddings()
# Create Pinecone vector store
vector_store = Pinecone.from_documents(
documents=documents,
embedding=embeddings,
index_name=self.index_name,
namespace=f"user_{user_id}"
)
logger.info(f"Successfully added {len(documents)} documents to Pinecone for user {user_id}")
return True
except Exception as e:
logger.error(f"Failed to add documents to Pinecone: {e}")
return False
def search(self, query: str, user_id: int, k: int = 5) -> List[Document]:
"""Search for similar documents."""
try:
if not self.use_pinecone:
logger.warning("Pinecone not available, returning empty results")
return []
embeddings = OpenAIEmbeddings()
# Create Pinecone vector store for searching
vector_store = PineconeVectorStore.from_existing_index(
index_name=self.index_name,
embedding=embeddings,
namespace=f"user_{user_id}"
)
# Search for similar documents
results = vector_store.similarity_search(
query=query,
k=k,
filter={"user_id": user_id}
)
logger.info(f"Found {len(results)} similar documents for user {user_id}")
return results
except Exception as e:
logger.error(f"Failed to search vector store: {e}")
return []
def get_user_documents(self, user_id: int, limit: int = 50) -> List[Document]:
"""Get all documents for a specific user."""
try:
if not self.use_pinecone:
logger.warning("Pinecone not available, returning empty results")
return []
embeddings = OpenAIEmbeddings()
# Create Pinecone vector store for searching
vector_store = PineconeVectorStore.from_existing_index(
index_name=self.index_name,
embedding=embeddings,
namespace=f"user_{user_id}"
)
# Get all documents for the user
results = vector_store.similarity_search(
query="", # Empty query to get all documents
k=limit,
filter={"user_id": user_id}
)
logger.info(f"Retrieved {len(results)} documents for user {user_id}")
return results
except Exception as e:
logger.error(f"Failed to get user documents: {e}")
return []
def delete_user_documents(self, user_id: int) -> bool:
"""Delete all documents for a specific user."""
try:
if not self.use_pinecone:
logger.warning("Pinecone not available, skipping deletion")
return False
pc = Pinecone(api_key=self.pinecone_api_key)
index = pc.Index(self.index_name)
# Delete all vectors in the user's namespace
index.delete(namespace=f"user_{user_id}")
logger.info(f"Successfully deleted all documents for user {user_id}")
return True
except Exception as e:
logger.error(f"Failed to delete user documents: {e}")
return False
# Global instance
vector_store = OnlineVectorStore() |