Spaces:
Running
Running
# app.py - Complete Enhanced ICodeGuru Chatbot | |
import os | |
import json | |
import uuid | |
import time | |
import base64 | |
import datetime | |
from typing import List, Optional, Dict, Any | |
import streamlit as st | |
import streamlit.components.v1 as components | |
import nest_asyncio | |
from dataclasses import dataclass, asdict | |
from pathlib import Path | |
# LangChain imports (your teammate's backend) | |
from langchain.vectorstores import Chroma | |
from langchain.embeddings import HuggingFaceEmbeddings | |
from langchain.document_loaders import JSONLoader, DirectoryLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_groq import ChatGroq | |
from langchain.chains import RetrievalQA | |
from langchain.prompts import PromptTemplate | |
from langchain.schema import Document | |
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler | |
from langchain.memory import ConversationBufferMemory | |
from langchain.chains import ConversationalRetrievalChain | |
# Enhanced components | |
from components import render_response_box, render_enhanced_response_box | |
from user_manager import UserManager, UserProfile | |
from chat_manager import ChatManager, ChatSession | |
# Apply asyncio patch for Streamlit compatibility | |
nest_asyncio.apply() | |
# ========== Configuration ========== | |
GROQ_API_KEY = os.environ.get("GROQ_API_KEY") | |
if not GROQ_API_KEY: | |
st.error("β οΈ GROQ_API_KEY environment variable is not set!") | |
st.stop() | |
GROQ_MODEL = "llama3-8b-8192" | |
EMBEDDING_MODEL = "all-MiniLM-L6-v2" | |
CHROMA_PERSIST_DIR = "./chroma_db" | |
DOCS_DIR = "./docs" | |
USER_DATA_DIR = "./user_data" | |
CHAT_DATA_DIR = "./chat_data" | |
# Ensure directories exist | |
for directory in [USER_DATA_DIR, CHAT_DATA_DIR, DOCS_DIR]: | |
Path(directory).mkdir(exist_ok=True) | |
# ========== Page Configuration ========== | |
st.set_page_config( | |
page_title="ICodeGuru AI Assistant", | |
page_icon="π€", | |
layout="centered", | |
initial_sidebar_state="expanded" | |
) | |
# Load CSS with error handling | |
try: | |
with open("style.css") as f: | |
st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True) | |
except FileNotFoundError: | |
st.warning("style.css file not found. Using default styling.") | |
# ========== Initialize Managers ========== | |
def get_user_manager(): | |
return UserManager(USER_DATA_DIR) | |
def get_chat_manager(): | |
return ChatManager(CHAT_DATA_DIR) | |
user_manager = get_user_manager() | |
chat_manager = get_chat_manager() | |
# ========== Logo Function ========== | |
def get_base64_image(image_path): | |
try: | |
with open(image_path, "rb") as img_file: | |
return f"data:image/jpeg;base64,{base64.b64encode(img_file.read()).decode()}" | |
except FileNotFoundError: | |
return "" | |
# ========== User Authentication ========== | |
def render_user_auth(): | |
"""Render user authentication interface""" | |
if 'user_id' not in st.session_state: | |
st.session_state.user_id = None | |
if not st.session_state.user_id: | |
st.sidebar.markdown("### π€ User Profile") | |
auth_option = st.sidebar.radio("Choose option:", ["Login", "Create New Profile"]) | |
if auth_option == "Create New Profile": | |
with st.sidebar.form("create_profile"): | |
username = st.text_input("Username", placeholder="Enter username") | |
display_name = st.text_input("Display Name", placeholder="Your display name") | |
expertise_level = st.selectbox("Programming Experience", | |
["Beginner", "Intermediate", "Advanced", "Expert"]) | |
preferred_languages = st.multiselect("Preferred Languages", | |
["Python", "JavaScript", "Java", "C++", "C#", "Go", "Rust", "PHP", "Ruby"]) | |
learning_goals = st.text_area("Learning Goals", | |
placeholder="What do you want to learn?") | |
if st.form_submit_button("Create Profile"): | |
if username and display_name: | |
try: | |
profile = UserProfile( | |
user_id=str(uuid.uuid4()), | |
username=username, | |
display_name=display_name, | |
expertise_level=expertise_level, | |
preferred_languages=preferred_languages, | |
learning_goals=learning_goals | |
) | |
user_manager.create_user(profile) | |
st.session_state.user_id = profile.user_id | |
st.session_state.current_user = profile | |
st.rerun() | |
except Exception as e: | |
st.error(f"Error creating profile: {str(e)}") | |
else: | |
st.error("Username and Display Name are required!") | |
else: # Login | |
existing_users = user_manager.get_all_usernames() | |
if existing_users: | |
selected_username = st.sidebar.selectbox("Select Username", existing_users) | |
if st.sidebar.button("Login"): | |
profile = user_manager.get_user_by_username(selected_username) | |
if profile: | |
st.session_state.user_id = profile.user_id | |
st.session_state.current_user = profile | |
st.rerun() | |
else: | |
st.sidebar.info("No existing profiles. Create a new one!") | |
else: | |
# User is logged in | |
user = st.session_state.get('current_user') | |
if user: | |
st.sidebar.markdown(f"### π Welcome, {user.display_name}!") | |
st.sidebar.markdown(f"**Level:** {user.expertise_level}") | |
if st.sidebar.button("Logout"): | |
st.session_state.user_id = None | |
st.session_state.current_user = None | |
if 'current_session_id' in st.session_state: | |
del st.session_state.current_session_id | |
st.rerun() | |
# ========== Enhanced LangChain RAG System ========== | |
class EnhancedLangChainRAGSystem: | |
def __init__(self): | |
self.embeddings = None | |
self.vectorstore = None | |
self.llm = None | |
self.retrieval_chain = None | |
self.memory = ConversationBufferMemory( | |
memory_key="chat_history", | |
return_messages=True, | |
output_key="answer" | |
) | |
self.setup_components() | |
def setup_components(self): | |
"""Setup all LangChain components.""" | |
self.embeddings = HuggingFaceEmbeddings( | |
model_name=EMBEDDING_MODEL, | |
model_kwargs={'device': 'cpu'}, | |
encode_kwargs={'normalize_embeddings': True} | |
) | |
self.llm = ChatGroq( | |
groq_api_key=GROQ_API_KEY, | |
model_name=GROQ_MODEL, | |
temperature=0.1, | |
max_tokens=1024 | |
) | |
self.load_vectorstore() | |
self.setup_retrieval_chain() | |
def load_vectorstore(self): | |
"""Load existing vectorstore or create empty one.""" | |
try: | |
self.vectorstore = Chroma( | |
persist_directory=CHROMA_PERSIST_DIR, | |
embedding_function=self.embeddings, | |
collection_name="icodeguru_knowledge" | |
) | |
except Exception as e: | |
self.vectorstore = Chroma( | |
persist_directory=CHROMA_PERSIST_DIR, | |
embedding_function=self.embeddings, | |
collection_name="icodeguru_knowledge" | |
) | |
def setup_retrieval_chain(self): | |
"""Setup the conversational retrieval chain with personalization.""" | |
def get_personalized_prompt(): | |
user = st.session_state.get('current_user') | |
if user: | |
user_context = f""" | |
User Profile Context: | |
- Name: {user.display_name} | |
- Experience Level: {user.expertise_level} | |
- Preferred Languages: {', '.join(user.preferred_languages) if user.preferred_languages else 'None specified'} | |
- Learning Goals: {user.learning_goals or 'None specified'} | |
Please tailor your response to match the user's experience level and preferences. | |
""" | |
else: | |
user_context = "User profile not available. Provide general guidance." | |
return f"""You are an expert assistant for iCodeGuru, a programming education platform. | |
{user_context} | |
Use the following context to answer the user's question comprehensively and accurately. | |
Always provide relevant video links, website links, or resources when available in the context. | |
Refer strictly to the provided context. If the answer isn't found in the context, explicitly say: "The provided knowledge base doesn't contain this information." | |
Context: {{context}} | |
Chat History: {{chat_history}} | |
Human: {{question}}""" | |
PROMPT = PromptTemplate( | |
template=get_personalized_prompt(), | |
input_variables=["context", "chat_history", "question"] | |
) | |
try: | |
retriever = self.vectorstore.as_retriever( | |
search_type="similarity", | |
search_kwargs={"k": 4} | |
) | |
self.retrieval_chain = ConversationalRetrievalChain.from_llm( | |
llm=self.llm, | |
retriever=retriever, | |
memory=self.memory, | |
combine_docs_chain_kwargs={"prompt": PROMPT}, | |
return_source_documents=True, | |
verbose=False | |
) | |
except Exception as e: | |
self.retrieval_chain = None | |
def load_and_process_documents(self) -> List[Document]: | |
"""Load and process JSON documents from the docs directory.""" | |
documents = [] | |
if not os.path.exists(DOCS_DIR): | |
return documents | |
json_files = [f for f in os.listdir(DOCS_DIR) if f.endswith('.json')] | |
if not json_files: | |
return documents | |
for filename in json_files: | |
file_path = os.path.join(DOCS_DIR, filename) | |
try: | |
loader = JSONLoader( | |
file_path=file_path, | |
jq_schema='.[]', | |
text_content=False | |
) | |
file_docs = loader.load() | |
for doc in file_docs: | |
doc.metadata['source_file'] = filename | |
doc.metadata['file_path'] = file_path | |
documents.extend(file_docs) | |
except Exception as e: | |
continue | |
return documents | |
def split_documents(self, documents: List[Document]) -> List[Document]: | |
"""Split documents into smaller chunks.""" | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=800, | |
chunk_overlap=100, | |
length_function=len, | |
separators=["\n\n", "\n", " ", ""] | |
) | |
chunks = text_splitter.split_documents(documents) | |
return chunks | |
def clear_knowledge_base(self): | |
"""Clear the existing knowledge base.""" | |
try: | |
if self.vectorstore: | |
self.vectorstore.delete_collection() | |
self.vectorstore = Chroma( | |
persist_directory=CHROMA_PERSIST_DIR, | |
embedding_function=self.embeddings, | |
collection_name="icodeguru_knowledge" | |
) | |
except Exception as e: | |
pass | |
def ingest_documents(self): | |
"""Complete document ingestion pipeline.""" | |
documents = self.load_and_process_documents() | |
if not documents: | |
return False | |
chunks = self.split_documents(documents) | |
if not chunks: | |
return False | |
try: | |
self.clear_knowledge_base() | |
self.vectorstore.add_documents(chunks) | |
self.vectorstore.persist() | |
self.setup_retrieval_chain() | |
return True | |
except Exception as e: | |
return False | |
def get_answer(self, question: str) -> dict: | |
"""Get answer for a user question.""" | |
if not self.retrieval_chain: | |
return { | |
"answer": "β οΈ Knowledge base is initializing. Please try again in a moment.", | |
"source_documents": [] | |
} | |
try: | |
doc_count = 0 | |
try: | |
doc_count = self.vectorstore._collection.count() | |
except: | |
try: | |
test_results = self.vectorstore.similarity_search("test", k=1) | |
doc_count = len(test_results) if test_results else 0 | |
except: | |
doc_count = 0 | |
if doc_count == 0: | |
return { | |
"answer": "I'm ready to help! However, I don't have any specific documents loaded in my knowledge base right now. I can still answer general programming questions based on my training. Feel free to ask anything!", | |
"source_documents": [] | |
} | |
response = self.retrieval_chain({"question": question}) | |
return response | |
except Exception as e: | |
return { | |
"answer": f"I apologize, but I encountered an issue processing your question. Could you please try rephrasing it?", | |
"source_documents": [] | |
} | |
def reset_conversation(self): | |
"""Reset the conversation memory.""" | |
self.memory.clear() | |
# Initialize the RAG system | |
def get_rag_system(): | |
"""Cache the RAG system to avoid reinitialization.""" | |
return EnhancedLangChainRAGSystem() | |
# ========== Session Management ========== | |
def initialize_chat_session(): | |
"""Initialize or load chat session""" | |
if 'current_session_id' not in st.session_state: | |
user_id = st.session_state.get('user_id') | |
if user_id: | |
session_id = chat_manager.create_session(user_id) | |
st.session_state.current_session_id = session_id | |
st.session_state.messages = [] | |
else: | |
st.session_state.messages = [] | |
else: | |
# Load existing session messages | |
session = chat_manager.get_session(st.session_state.current_session_id) | |
if session: | |
st.session_state.messages = [] | |
for msg in session.messages: | |
st.session_state.messages.append({ | |
"role": msg.role, | |
"content": msg.content, | |
"message_id": msg.message_id, | |
"rating": msg.rating, | |
"is_bookmarked": msg.is_bookmarked, | |
"source_documents": msg.source_documents | |
}) | |
# ========== Chat History Management ========== | |
def render_chat_history_sidebar(): | |
"""Render chat history in sidebar""" | |
if st.session_state.get('user_id'): | |
user_sessions = chat_manager.get_user_sessions(st.session_state.user_id) | |
if user_sessions: | |
st.sidebar.markdown("### π¬ Chat History") | |
for session in user_sessions[:10]: # Show last 10 sessions | |
session_title = session.title[:30] + "..." if len(session.title) > 30 else session.title | |
col1, col2 = st.sidebar.columns([3, 1]) | |
with col1: | |
if st.button(session_title, key=f"session_{session.session_id}"): | |
st.session_state.current_session_id = session.session_id | |
initialize_chat_session() | |
st.rerun() | |
with col2: | |
if st.button("ποΈ", key=f"delete_{session.session_id}", help="Delete session"): | |
chat_manager.delete_session(session.session_id) | |
if st.session_state.get('current_session_id') == session.session_id: | |
del st.session_state.current_session_id | |
st.rerun() | |
# ========== Enhanced Sidebar Features ========== | |
def render_enhanced_sidebar(): | |
"""Render enhanced sidebar with all features""" | |
global GROQ_MODEL | |
# User Authentication | |
render_user_auth() | |
if st.session_state.get('user_id'): | |
# Chat History | |
render_chat_history_sidebar() | |
st.sidebar.markdown("---") | |
# New Chat Button | |
if st.sidebar.button("π New Chat", type="primary"): | |
user_id = st.session_state.user_id | |
session_id = chat_manager.create_session(user_id) | |
st.session_state.current_session_id = session_id | |
st.session_state.messages = [] | |
get_rag_system().reset_conversation() | |
st.rerun() | |
# Model Selection | |
st.sidebar.markdown("### π§ AI Settings") | |
model_options = ["llama3-8b-8192", "llama3-70b-8192"] | |
selected_model = st.sidebar.selectbox("Choose LLM Model", model_options, index=0) | |
if selected_model != GROQ_MODEL: | |
GROQ_MODEL = selected_model | |
get_rag_system().llm.model_name = selected_model | |
# Knowledge Base Management | |
st.sidebar.markdown("### π Knowledge Base") | |
if st.sidebar.button("π Refresh Knowledge Base"): | |
with st.spinner("Refreshing knowledge base..."): | |
success = get_rag_system().ingest_documents() | |
if success: | |
st.sidebar.success("β Knowledge base refreshed!") | |
else: | |
st.sidebar.warning("β οΈ No documents found to load") | |
# Export Chat History | |
st.sidebar.markdown("### π€ Export") | |
if st.sidebar.button("π Export Chat History"): | |
if st.session_state.get('current_session_id'): | |
export_data = chat_manager.export_chat_history( | |
st.session_state.user_id, | |
st.session_state.current_session_id | |
) | |
if export_data: | |
st.sidebar.download_button( | |
label="β¬οΈ Download JSON", | |
data=json.dumps(export_data, indent=2), | |
file_name=f"chat_export_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.json", | |
mime="application/json" | |
) | |
# User Statistics | |
st.sidebar.markdown("### π Your Stats") | |
user_stats = user_manager.get_user_stats(st.session_state.user_id) | |
chat_stats = chat_manager.get_chat_statistics(st.session_state.user_id) | |
col1, col2 = st.sidebar.columns(2) | |
with col1: | |
st.metric("Total Chats", chat_stats.get('total_sessions', 0)) | |
with col2: | |
st.metric("Messages", chat_stats.get('total_messages', 0)) | |
st.sidebar.metric("Bookmarks", chat_stats.get('bookmarked_messages', 0)) | |
# Bookmarked Messages | |
bookmarked = chat_manager.get_bookmarked_messages(st.session_state.user_id) | |
if bookmarked: | |
st.sidebar.markdown("### π Bookmarked Responses") | |
for bookmark in bookmarked[:5]: # Show 5 most recent | |
message_preview = bookmark['message']['content'][:50] + "..." | |
if st.sidebar.button(message_preview, key=f"bookmark_{bookmark['message']['message_id']}"): | |
# Show full bookmarked message | |
st.sidebar.write(bookmark['message']['content']) | |
# ========== Message Rating Handler ========== | |
def handle_component_value(): | |
"""Handle component interactions (ratings, bookmarks)""" | |
if 'component_value' in st.session_state and st.session_state.component_value: | |
data = st.session_state.component_value | |
if data.get('action') == 'rate_message': | |
chat_manager.rate_message( | |
data['session_id'], | |
data['message_id'], | |
data['rating'] | |
) | |
elif data.get('action') == 'bookmark_message': | |
chat_manager.bookmark_message( | |
data['session_id'], | |
data['message_id'], | |
data['is_bookmarked'] | |
) | |
# Clear the component value | |
st.session_state.component_value = None | |
# ========== Main App Logic ========== | |
def main(): | |
"""Main application logic""" | |
# Handle component interactions | |
handle_component_value() | |
# Display logo and header | |
image_data_url = get_base64_image("10001.jpeg") | |
st.markdown(f""" | |
<div class="custom-header"> | |
<h1><img src="{image_data_url}" class="chatbot-logo" alt="Bot" /> ICodeGuru AI Assistant</h1> | |
</div> | |
""", unsafe_allow_html=True) | |
# Render enhanced sidebar | |
render_enhanced_sidebar() | |
# Initialize RAG system | |
rag_system = get_rag_system() | |
# Check if user is logged in | |
if not st.session_state.get('user_id'): | |
st.info("π Please login or create a profile to start chatting!") | |
return | |
# Initialize chat session | |
initialize_chat_session() | |
# Generate response function | |
def generate_response(user_query): | |
"""Generate AI response using LangChain system""" | |
if not user_query or not user_query.strip(): | |
return "Please provide a valid question." | |
try: | |
response = rag_system.get_answer(user_query) | |
answer = response.get("answer", "I apologize, but I couldn't generate a response. Please try again.") | |
source_docs = response.get("source_documents", []) | |
if source_docs: | |
sources_text = "\n\nπ **Sources:**\n" | |
for i, doc in enumerate(source_docs[:2], 1): | |
source_file = doc.metadata.get('source_file', 'Unknown') | |
content_preview = doc.page_content[:100] + "..." if len(doc.page_content) > 100 else doc.page_content | |
sources_text += f"{i}. {source_file}: {content_preview}\n" | |
answer += sources_text | |
return answer, [doc.metadata.get('source_file', '') for doc in source_docs] | |
except Exception as e: | |
return "I apologize, but I encountered an issue processing your question. Could you please try again.", [] | |
# Display chat messages | |
for i, msg in enumerate(st.session_state.messages): | |
with st.chat_message(msg["role"]): | |
if msg["role"] == "assistant": | |
message_id = msg.get("message_id", f"msg-{i}") | |
session_id = st.session_state.get("current_session_id", "") | |
render_enhanced_response_box( | |
msg["content"], | |
message_id, | |
session_id, | |
is_bookmarked=msg.get("is_bookmarked", False), | |
rating=msg.get("rating"), | |
show_actions=True | |
) | |
else: | |
st.markdown(msg["content"]) | |
# Chat input | |
prompt = st.chat_input("Type your message...") | |
if prompt: | |
# Add user message to session | |
user_message_id = chat_manager.add_message( | |
st.session_state.current_session_id, | |
"user", | |
prompt | |
) | |
# Add to session state | |
st.session_state.messages.append({ | |
"role": "user", | |
"content": prompt, | |
"message_id": user_message_id | |
}) | |
with st.chat_message("user"): | |
st.markdown(prompt) | |
# Generate and display assistant response | |
with st.chat_message("assistant"): | |
with st.spinner("Thinking..."): | |
full_response, source_docs = generate_response(prompt) | |
# Add assistant message to session | |
assistant_message_id = chat_manager.add_message( | |
st.session_state.current_session_id, | |
"assistant", | |
full_response, | |
source_docs | |
) | |
# Display response with enhanced box | |
render_enhanced_response_box( | |
full_response, | |
assistant_message_id, | |
st.session_state.current_session_id, | |
is_bookmarked=False, | |
rating=None, | |
show_actions=True | |
) | |
# Add to session state | |
st.session_state.messages.append({ | |
"role": "assistant", | |
"content": full_response, | |
"message_id": assistant_message_id, | |
"rating": None, | |
"is_bookmarked": False, | |
"source_documents": source_docs | |
}) | |
# Update user chat count | |
user_manager.increment_chat_count(st.session_state.user_id) | |
if __name__ == "__main__": | |
main() |