Spaces:
Running
Running
import os | |
import uuid | |
from langchain_community.document_loaders import PyPDFLoader | |
from langchain_text_splitters import RecursiveCharacterTextSplitter | |
from langchain_google_genai import GoogleGenerativeAIEmbeddings | |
from langchain_qdrant import Qdrant | |
from qdrant_client import QdrantClient, models | |
from dotenv import load_dotenv | |
load_dotenv() | |
os.environ["GOOGLE_API_KEY"] = os.getenv("GEMINI_API_KEY") | |
QDRANT_URL = os.getenv("QDRANT_URL") | |
QDRANT_API_KEY = os.getenv("QDRANT_API_KEY") | |
QDRANT_COLLECTION_NAME = os.getenv("QDRANT_COLLECTION_NAME") | |
class VectorDatabaseSearch: | |
def __init__(self, collection_name=QDRANT_COLLECTION_NAME): | |
self.collection_name = collection_name | |
self.embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001") | |
self.client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY) | |
self._initialize_collection() | |
self.vectorstore = Qdrant( | |
client=self.client, | |
collection_name=collection_name, | |
embeddings=self.embeddings | |
) | |
def _initialize_collection(self): | |
"""Initialize Qdrant collection if it doesn't exist""" | |
try: | |
collections = self.client.get_collections() | |
if not any(c.name == self.collection_name for c in collections.collections): | |
self.client.create_collection( | |
collection_name=self.collection_name, | |
vectors_config=models.VectorParams( | |
size=768, | |
distance=models.Distance.COSINE | |
) | |
) | |
print(f"Created collection: {self.collection_name}") | |
except Exception as e: | |
print(f"Error initializing collection: {e}") | |
def add_pdf(self, pdf_path): | |
"""Add PDF to vector database""" | |
try: | |
loader = PyPDFLoader(pdf_path) | |
docs = loader.load() | |
splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) | |
split_docs = splitter.split_documents(docs) | |
book_name = os.path.splitext(os.path.basename(pdf_path))[0] | |
print(f"Processing {book_name} with {len(split_docs)} chunks") | |
for doc in split_docs: | |
# Ensure metadata is stored in a consistent way | |
doc.metadata = { | |
"source": book_name, | |
"page": doc.metadata.get('page', 1), | |
"id": str(uuid.uuid4()) | |
} | |
# Add documents to vector store | |
self.vectorstore.add_documents(split_docs) | |
print(f"Added {len(split_docs)} chunks from {book_name}") | |
return True | |
except Exception as e: | |
print(f"Error adding PDF: {e}") | |
return False | |
def search(self, query, top_k=5): | |
"""Search documents based on query""" | |
try: | |
results = self.vectorstore.similarity_search_with_score(query, k=top_k) | |
formatted = [] | |
for doc, score in results: | |
formatted.append({ | |
"source": doc.metadata['source'], | |
"page": doc.metadata['page'], | |
"content": doc.page_content[:500], | |
"confidence": round(score * 100, 2) | |
}) | |
return formatted | |
except Exception as e: | |
print(f"Search error: {e}") | |
return [] | |
def get_book_info(self): | |
"""Retrieve list of unique book sources in the collection""" | |
try: | |
# First check if the collection exists | |
collections = self.client.get_collections() | |
if not any(c.name == self.collection_name for c in collections.collections): | |
print(f"Collection {self.collection_name} does not exist yet") | |
return [] | |
# Get all points with payload from the collection | |
points = self.client.scroll( | |
collection_name=self.collection_name, | |
limit=1000, | |
with_payload=True, | |
with_vectors=False # We don't need vector data | |
)[0] | |
# Debug information | |
print(f"Retrieved {len(points)} points from collection") | |
# Extract unique book sources from payloads | |
books = set() | |
for point in points: | |
# Check if payload exists and has 'metadata' field with 'source' | |
if hasattr(point, 'payload') and point.payload: | |
# Check different possible payload structures | |
if 'metadata' in point.payload and 'source' in point.payload['metadata']: | |
books.add(point.payload['metadata']['source']) | |
elif 'source' in point.payload: | |
books.add(point.payload['source']) | |
print(f"Found {len(books)} unique books") | |
return list(books) | |
except Exception as e: | |
print(f"Error retrieving book info: {e}") | |
return [] |