import nltk nltk.download('punkt') nltk.download('punkt_tab') # SECTIONED URL LIST (in case we want to tag later) url_dict = { "Website Designing": [ "https://www.imageonline.co.in/website-designing-mumbai.html", "https://www.imageonline.co.in/domain-hosting-services-india.html", "https://www.imageonline.co.in/best-seo-company-mumbai.html", "https://www.imageonline.co.in/wordpress-blog-designing-india.html", "https://www.imageonline.co.in/social-media-marketing-company-mumbai.html", "https://www.imageonline.co.in/website-template-customization-india.html", "https://www.imageonline.co.in/regular-website-maintanence-services.html", "https://www.imageonline.co.in/mobile-app-designing-mumbai.html", "https://www.imageonline.co.in/web-application-screen-designing.html" ], "Website Development": [ "https://www.imageonline.co.in/website-development-mumbai.html", "https://www.imageonline.co.in/open-source-customization.html", "https://www.imageonline.co.in/ecommerce-development-company-mumbai.html", "https://www.imageonline.co.in/website-with-content-management-system.html", "https://www.imageonline.co.in/web-application-development-india.html" ], "Mobile App Development": [ "https://www.imageonline.co.in/mobile-app-development-company-mumbai.html" ], "About Us": [ "https://www.imageonline.co.in/about-us.html", "https://www.imageonline.co.in/vision.html", "https://www.imageonline.co.in/team.html" ], "Testimonials": [ "https://www.imageonline.co.in/testimonial.html" ] } import trafilatura import requests # Function to extract clean text using trafilatura def extract_clean_text(url): """ Fetch and extract clean main content from a URL using trafilatura. Returns None if content couldn't be extracted. """ try: downloaded = trafilatura.fetch_url(url) if downloaded: content = trafilatura.extract(downloaded, include_comments=False, include_tables=False) return content except Exception as e: print(f"Error fetching {url}: {e}") return None # Scrape data and prepare for RAG with metadata scraped_data = [] for section, urls in url_dict.items(): for url in urls: print(f"๐ŸŸฉ Scraping: {url}") text = extract_clean_text(url) if text: print(f"โœ… Extracted {len(text)} characters.\n") scraped_data.append({ "content": text, "metadata": { "source": url, "section": section } }) else: print(f"โŒ Failed to extract content from {url}.\n") print(f"Total pages scraped: {len(scraped_data)}") import tiktoken from nltk.tokenize import sent_tokenize # Initialize GPT tokenizer (cl100k_base works with Together.ai and OpenAI APIs) tokenizer = tiktoken.get_encoding("cl100k_base") def chunk_text(text, max_tokens=400): """ Chunk text into overlapping segments based on sentence boundaries and token limits. """ sentences = sent_tokenize(text) chunks = [] current_chunk = [] for sentence in sentences: current_chunk.append(sentence) tokens = tokenizer.encode(" ".join(current_chunk)) if len(tokens) > max_tokens: # Finalize current chunk without last sentence current_chunk.pop() chunks.append(" ".join(current_chunk).strip()) current_chunk = [sentence] # Start new chunk with overflow sentence # Append final chunk if current_chunk: chunks.append(" ".join(current_chunk).strip()) return chunks chunked_data = [] for item in scraped_data: text = item["content"] metadata = item["metadata"] chunks = chunk_text(text, max_tokens=400) for chunk in chunks: chunked_data.append({ "content": chunk, "metadata": metadata # Keep the same URL + section for each chunk }) # Extract text chunks from chunked_data for embedding texts_to_embed = [item["content"] for item in chunked_data] from sentence_transformers import SentenceTransformer # Load the embedding model embedding_model = SentenceTransformer("BAAI/bge-base-en-v1.5") def embed_chunks(text_list, model): """ Generate embeddings for a list of text chunks. """ return model.encode(text_list, convert_to_numpy=True) # Generate embeddings embeddings = embed_chunks(texts_to_embed, embedding_model) print(f"โœ… Generated {len(embeddings)} embeddings") print(f"๐Ÿ”น Shape of first embedding: {embeddings[0].shape}") import chromadb import uuid # Initialize ChromaDB client (persistent storage) chroma_client = chromadb.PersistentClient(path="./chroma_store") # Create or get collection collection = chroma_client.get_or_create_collection(name="imageonline_chunks") # Extract documents, embeddings, metadatas documents = [item["content"] for item in chunked_data] metadatas = [item["metadata"] for item in chunked_data] ids = [str(uuid.uuid4()) for _ in documents] # Safety check assert len(documents) == len(embeddings) == len(metadatas), "Data length mismatch!" # Add to ChromaDB collection.add( documents=documents, embeddings=embeddings.tolist(), metadatas=metadatas, ids=ids ) # Sample query query = "web design company" query_embedding = embedding_model.encode([query])[0] # Query ChromaDB results = collection.query( query_embeddings=[query_embedding.tolist()], n_results=3 ) # Display results for i in range(len(results['documents'][0])): print(f"\n๐Ÿ” Match {i+1}:") print(f"Content: {results['documents'][0][i][:200]}...") print(f"๐Ÿ“Ž Metadata: {results['metadatas'][0][i]}") from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables import RunnableLambda, RunnablePassthrough from langchain_core.output_parsers import StrOutputParser from langchain_together import ChatTogether from langchain_community.vectorstores import Chroma from langchain_community.embeddings import HuggingFaceEmbeddings # Initialize vectorstore embedding_function = HuggingFaceEmbeddings(model_name="BAAI/bge-base-en-v1.5") vectorstore = Chroma( client=chroma_client, # from your previous chroma setup collection_name="imageonline_chunks", embedding_function=embedding_function ) # Create retriever retriever = vectorstore.as_retriever(search_kwargs={"k": 3}) def retrieve_and_format(query): docs = retriever.get_relevant_documents(query) context_strings = [] for doc in docs: content = doc.page_content metadata = doc.metadata source = metadata.get("source", "") section = metadata.get("section", "") context_strings.append(f"[{section}] {content}\n(Source: {source})") return "\n\n".join(context_strings) llm = ChatTogether( model="meta-llama/Llama-3-8b-chat-hf", temperature=0.3, max_tokens=1024, top_p=0.7, together_api_key="a36246d65d8290f43667350b364c5b6bb8562eb50a4b947eec5bd7e79f2dffc6" # Replace before deployment or use os.getenv ) prompt = ChatPromptTemplate.from_template(""" You are an expert assistant for ImageOnline Web Solutions. Answer the user's query based ONLY on the following context: {context} Query: {question} """) rag_chain = ( {"context": RunnableLambda(retrieve_and_format), "question": RunnablePassthrough()} | prompt | llm | StrOutputParser() ) import gradio as gr def chat_interface(message, history): history = history or [] # Display user message history.append(("๐Ÿง‘ You: " + message, "โณ Generating response...")) try: # Call RAG pipeline answer = rag_chain.invoke(message) # Replace placeholder with actual response history[-1] = ("๐Ÿง‘ You: " + message, "๐Ÿค– Bot: " + answer) except Exception as e: error_msg = f"โš ๏ธ Error: {str(e)}" history[-1] = ("๐Ÿง‘ You: " + message, f"๐Ÿค– Bot: {error_msg}") return history, history def launch_gradio(): with gr.Blocks() as demo: gr.Markdown("# ๐Ÿ’ฌ ImageOnline RAG Chatbot") gr.Markdown("Ask about Website Designing, App Development, SEO, Hosting, etc.") chatbot = gr.Chatbot() state = gr.State([]) with gr.Row(): msg = gr.Textbox(placeholder="Ask your question here...", show_label=False, scale=8) send_btn = gr.Button("๐Ÿ“จ Send", scale=1) msg.submit(chat_interface, inputs=[msg, state], outputs=[chatbot, state]) send_btn.click(chat_interface, inputs=[msg, state], outputs=[chatbot, state]) with gr.Row(): clear_btn = gr.Button("๐Ÿงน Clear Chat") clear_btn.click(fn=lambda: ([], []), outputs=[chatbot, state]) return demo if __name__ == "__main__": demo = launch_gradio() demo.launch()