Spaces:
Running
Running
import streamlit as st | |
from groq import Groq | |
from langchain_groq import ChatGroq | |
from langchain_core.prompts import ChatPromptTemplate | |
from langchain_core.output_parsers import StrOutputParser | |
import edge_tts | |
import asyncio | |
import os | |
from typing import Optional | |
GROQ_API_KEY = os.getenv('GROQ_API_KEY') | |
class CodeAssistantBot: | |
def __init__(self): | |
self.client = Groq(api_key=GROQ_API_KEY) | |
self.model = ChatGroq(model="llama-3.3-70b-versatile", temperature=0.6) | |
# Initialize prompts | |
self.analysis_prompt = ChatPromptTemplate.from_messages([ | |
("system", | |
"""You are an expert code assistant. Analyze the code and context provided, | |
then give clear, helpful responses. Keep responses concise and focused on the code.""" | |
), | |
("user", """Code: {code} | |
Output: {output} | |
Error: {error} | |
Question: {question}""") | |
]) | |
self.summary_prompt = ChatPromptTemplate.from_messages([( | |
"system", | |
"""Summarize the conversation focusing on key technical points and insights. | |
Keep it brief and clear.""" | |
), ("user", "Conversation: {conversation}")]) | |
def analyze_code(self, code: str, output: str, error: str, | |
question: str) -> str: | |
try: | |
parser = StrOutputParser() | |
chain = self.analysis_prompt | self.model | parser | |
return chain.invoke({ | |
'code': code, | |
'output': output, | |
'error': error, | |
'question': question | |
}) | |
except Exception as e: | |
return f"Sorry, I encountered an error: {str(e)}" | |
def summarize_conversation(self, conversation: list) -> str: | |
try: | |
parser = StrOutputParser() | |
chain = self.summary_prompt | self.model | parser | |
formatted_conv = "\n".join( | |
[f"Q: {q}\nA: {a}" for q, a in conversation]) | |
return chain.invoke({'conversation': formatted_conv}) | |
except Exception as e: | |
return f"Could not generate summary: {str(e)}" | |
async def text_to_speech(text: str, filename: str): | |
voice = "fr-FR-VivienneMultilingualNeural" | |
communicate = edge_tts.Communicate(text, voice) | |
await communicate.save(filename) | |
def render_chatbot(code: str, output: str, error: str): | |
"""Render the chatbot UI in a fixed-height, scrollable panel""" | |
# Initialize session state | |
if "conversation" not in st.session_state: | |
st.session_state.conversation = [] | |
if "audio_count" not in st.session_state: | |
st.session_state.audio_count = 0 | |
# Create bot instance | |
bot = CodeAssistantBot() | |
# Apply CSS for scrollable chat window | |
st.markdown(""" | |
<style> | |
.chat-container { | |
height: 500px; /* Fixed height */ | |
overflow-y: auto; /* Scrollable */ | |
border: 1px solid #ddd; | |
padding: 10px; | |
background-color: #f9f9f9; | |
border-radius: 5px; | |
} | |
.chat-message { | |
padding: 10px; | |
border-radius: 5px; | |
margin-bottom: 10px; | |
} | |
.user-message { | |
background-color: #e3f2fd; | |
} | |
.bot-message { | |
background-color: #f5f5f5; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
# Chat history in a scrollable container | |
chat_container = st.container() | |
with chat_container: | |
st.markdown('<div class="chat-container">', unsafe_allow_html=True) | |
# Display all chat messages | |
for q, a in st.session_state.conversation: | |
st.markdown(f'<div class="chat-message user-message">You: {q}</div>', unsafe_allow_html=True) | |
st.markdown(f'<div class="chat-message bot-message">Assistant: {a}</div>', unsafe_allow_html=True) | |
# Close the chat container div | |
st.markdown('</div>', unsafe_allow_html=True) | |
# Input area at the bottom | |
col1, col2 = st.columns([4, 1]) | |
with col1: | |
user_input = st.text_input("Ask your Question here", key="chat_input", placeholder="Type your question here...") | |
with col2: | |
send_clicked = st.button("π") # Button to send the message | |
if user_input and send_clicked: | |
# Get response | |
response = bot.analyze_code(code, output, error, user_input) | |
st.session_state.conversation.append((user_input, response)) | |
# Generate summary and speech if conversation is long enough | |
if len(st.session_state.conversation) > 3: | |
with st.spinner("Generating conversation summary..."): | |
summary = bot.summarize_conversation(st.session_state.conversation) | |
audio_file = f"summary_{st.session_state.audio_count}.wav" | |
asyncio.run(text_to_speech(summary, audio_file)) | |
st.session_state.audio_count += 1 | |
with st.expander("π Conversation Summary", expanded=False): | |
st.markdown(summary) | |
st.audio(audio_file, format="audio/wav") | |
# **Auto-scroll to bottom (forcing UI refresh)** | |
st.markdown(""" | |
<script> | |
var chatDiv = window.parent.document.querySelector('.chat-container'); | |
if (chatDiv) { | |
chatDiv.scrollTop = chatDiv.scrollHeight; | |
} | |
</script> | |
""", unsafe_allow_html=True) | |