|
import streamlit as st |
|
import requests |
|
import re |
|
from bs4 import BeautifulSoup |
|
from langchain_text_splitters import RecursiveCharacterTextSplitter |
|
from langchain.docstore.document import Document |
|
import chromadb |
|
from sentence_transformers import SentenceTransformer |
|
import google.generativeai as genai |
|
|
|
|
|
st.set_page_config(layout="wide") |
|
|
|
|
|
genai.configure(api_key="AIzaSyAxUd2tS-qj9C7frYuHRsv92tziXHgIvLo") |
|
|
|
|
|
CHROMA_PATH = "chroma_db" |
|
chroma_client = chromadb.PersistentClient(path=CHROMA_PATH) |
|
|
|
|
|
if 'scraped' not in st.session_state: |
|
st.session_state.scraped = False |
|
if 'collection_name' not in st.session_state: |
|
st.session_state.collection_name = "default_collection" |
|
if 'chat_history' not in st.session_state: |
|
st.session_state.chat_history = [] |
|
|
|
|
|
embedding_model = SentenceTransformer("all-MiniLM-L6-v2") |
|
|
|
def clean_text(text): |
|
return re.sub(r'\s+', ' ', re.sub(r'http\S+', '', text)).strip() |
|
|
|
def split_content_into_chunks(content): |
|
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200, length_function=len) |
|
return text_splitter.split_documents([Document(page_content=content)]) |
|
|
|
def add_chunks_to_db(chunks, collection_name): |
|
collection = chroma_client.get_or_create_collection(name=collection_name) |
|
documents = [chunk.page_content for chunk in chunks] |
|
embeddings = embedding_model.encode(documents, convert_to_list=True) |
|
collection.upsert(documents=documents, ids=[f"ID{i}" for i in range(len(chunks))], embeddings=embeddings) |
|
|
|
def scrape_text(url): |
|
try: |
|
response = requests.get(url) |
|
response.raise_for_status() |
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
|
text = clean_text(soup.get_text()) |
|
chunks = split_content_into_chunks(text) |
|
add_chunks_to_db(chunks, st.session_state.collection_name) |
|
|
|
st.session_state.scraped = True |
|
return "Scraping and processing complete. You can now ask questions!" |
|
except requests.exceptions.RequestException as e: |
|
return f"Error scraping {url}: {e}" |
|
|
|
def ask_question(query, collection_name): |
|
collection = chroma_client.get_or_create_collection(name=collection_name) |
|
query_embedding = embedding_model.encode(query, convert_to_list=True) |
|
results = collection.query(query_embeddings=[query_embedding], n_results=2) |
|
top_chunks = results.get("documents", [[]])[0] |
|
|
|
system_prompt = f""" |
|
You are a helpful assistant. Answer only from the provided context. |
|
If you lack information, say: "I don't have enough information to answer that question." |
|
Context: |
|
{str(top_chunks)} |
|
""" |
|
|
|
model = genai.GenerativeModel('gemini-2.0-flash') |
|
response = model.generate_content(system_prompt + "\nUser Query: " + query) |
|
return response.text |
|
|
|
|
|
with st.sidebar: |
|
st.header("Database Management") |
|
if st.button("Clear Chat History"): |
|
st.session_state.chat_history = [] |
|
st.rerun() |
|
|
|
st.header("Step 1: Scrape a Website") |
|
url = st.text_input("Enter URL:") |
|
if url and st.button("Scrape & Process"): |
|
with st.spinner("Scraping..."): |
|
st.success(scrape_text(url)) |
|
|
|
|
|
st.title("Web Scraper & Q&A Chatbot") |
|
if st.session_state.scraped: |
|
st.subheader("Step 2: Ask Questions") |
|
for message in st.session_state.chat_history: |
|
with st.chat_message(message["role"]): |
|
st.write(message["content"]) |
|
|
|
user_query = st.chat_input("Ask your question here") |
|
if user_query: |
|
st.session_state.chat_history.append({"role": "user", "content": user_query}) |
|
with st.spinner("Searching..."): |
|
answer = ask_question(user_query, st.session_state.collection_name) |
|
st.session_state.chat_history.append({"role": "assistant", "content": answer}) |
|
|
|
|
|
st.session_state.chat_history = st.session_state.chat_history[-6:] |
|
st.rerun() |
|
else: |
|
st.info("Please scrape a website first.") |