Abhishek Ranjan
feat: implement rag chatbot for artisan
7f8188c
"""
extract documents and store them in a vector db as a collection
"""
from .web_scrape import scrape_main
from langchain_openai import AzureOpenAIEmbeddings
from langchain_milvus import Milvus
import os
from dotenv import load_dotenv
import logging
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_experimental.text_splitter import SemanticChunker
from langchain_core.documents import Document
from concurrent.futures import ProcessPoolExecutor
import asyncio
import time
from itertools import chain
from rich.pretty import pprint
load_dotenv()
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
embedding_model = AzureOpenAIEmbeddings(
azure_deployment="text-embedding-ada-002",
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version="2023-05-15",
)
def get_milvus_vector_store():
return Milvus(
embedding_function=embedding_model,
collection_name="test",
connection_args={"uri": os.getenv("MILVUS_URI")},
auto_id=True,
drop_old=True,
index_params={
"index_type": "HNSW",
"metric_type": "COSINE",
"params": {"M": 8, "efConstruction": 64},
},
)
def get_vs_as_retriever():
return get_milvus_vector_store().as_retriever(
search_type="similarity", search_kwargs={"k": 5}
)
URLS_TO_SCRAPE = [
"https://www.artisan.co",
"https://www.artisan.co/about",
"https://www.artisan.co/sales-ai",
"https://www.artisan.co/ai-sales-agent",
"https://www.artisan.co/products/linkedin-outreach",
"https://www.artisan.co/products/email-warmup",
"https://www.artisan.co/products/sales-automation",
"https://www.artisan.co/products/email-personalization",
"https://www.artisan.co/features/email-deliverability",
"https://help.artisan.co/articles/7415399613-can-i-schedule-when-emails-go-out",
"https://help.artisan.co/articles/5365244006-what-is-email-warmup",
"https://help.artisan.co/articles/8442274387-ava-is-sending-strange-messages-from-my-email",
"https://help.artisan.co/articles/1195138264-is-there-a-limit-to-the-amount-of-leads-i-can-have-in-my-csv-file",
"https://help.artisan.co/articles/5617649387-help-i-can-t-turn-on-my-campaign",
"https://help.artisan.co/articles/1048710797-how-does-website-visitor-identification-work",
"https://help.artisan.co/articles/3886727025-generate-sample-email",
"https://help.artisan.co/articles/6218358204-running-ava-on-copilot-vs-autopilot",
"https://help.artisan.co/articles/9265896700-adding-delegates-and-team-members",
"https://help.artisan.co/articles/2734968853-how-to-create-a-campaign",
"https://help.artisan.co/articles/7633990298-how-to-integrate-artisan-with-your-crm",
"https://help.artisan.co/articles/6092562650-how-do-i-upload-a-csv-file-of-my-own-leads",
"https://help.artisan.co/articles/4356675492-how-do-i-add-variables-to-my-email",
"https://help.artisan.co/articles/3551943296-how-do-i-request-a-script-tag-for-my-watchtower-campaign",
"https://help.artisan.co/articles/9602711709-how-do-i-integrate-ava-with-slack",
"https://www.artisan.co/pricing",
]
def chunk_parent_document(document: Document) -> list[Document]:
semantic_text_splitter = SemanticChunker(
embeddings=embedding_model, min_chunk_size=100
)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=250,
separators=[
"\n#{1,6} ",
"\n\\*\\*\\*+\n",
"\n---+\n",
"\n___+\n",
"\n\n",
"\n",
".",
"?",
"!",
],
)
chunked_documents = semantic_text_splitter.split_documents([document])
chunked_documents = [
chunked_doc_item
for chunked_doc_item in chunked_documents
if len(chunked_doc_item.page_content) > 0 or chunked_doc_item.page_content
]
final_chunked_documents = []
for idx, chunked_doc in enumerate(chunked_documents):
pprint(chunked_doc)
if len(chunked_doc.page_content) > 5000:
sub_chunked_documents = text_splitter.split_documents([chunked_doc])
final_chunked_documents.extend(sub_chunked_documents)
else:
final_chunked_documents.append(chunked_doc)
return final_chunked_documents
async def ingest_urls(chunk_executor: ProcessPoolExecutor, milvus_vector_store: Milvus):
lc_documents = await scrape_main(URLS_TO_SCRAPE)
start_time = time.time()
chunked_documents = list(chunk_executor.map(chunk_parent_document, lc_documents))
chunked_documents = list(chain.from_iterable(chunked_documents))
end_time = time.time()
logger.info(f"Time taken to chunk documents: {end_time - start_time} seconds")
start_time = time.time()
if chunked_documents:
_ = milvus_vector_store.add_documents(chunked_documents, batch_size=50)
logger.info(
f"Time taken to ingest documents into Milvus: {time.time() - start_time} seconds"
)
if __name__ == "__main__":
chunk_executor = ProcessPoolExecutor()
milvus_vector_store = get_milvus_vector_store()
asyncio.run(ingest_urls(chunk_executor, milvus_vector_store))