Spaces:
Running
Running
# main.py | |
import os | |
import streamlit as st | |
import anthropic | |
from requests import JSONDecodeError | |
# Updated imports for latest LangChain | |
from langchain_huggingface import HuggingFaceEmbeddings | |
from langchain_community.vectorstores import SupabaseVectorStore | |
from langchain_openai import ChatOpenAI | |
from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint | |
# Updated memory and chain imports | |
from langchain.memory import ConversationBufferMemory | |
from langchain.chains import create_retrieval_chain | |
from langchain.chains.combine_documents import create_stuff_documents_chain | |
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder | |
from langchain_core.messages import HumanMessage, AIMessage | |
from supabase import Client, create_client | |
from streamlit.logger import get_logger | |
from stats import get_usage, add_usage | |
# βββββββ supabase + secrets ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
supabase_url = st.secrets.SUPABASE_URL | |
supabase_key = st.secrets.SUPABASE_KEY | |
openai_api_key = st.secrets.openai_api_key | |
anthropic_api_key = st.secrets.anthropic_api_key | |
hf_api_key = st.secrets.hf_api_key | |
username = st.secrets.username | |
supabase: Client = create_client(supabase_url, supabase_key) | |
logger = get_logger(__name__) | |
# βββββββ embeddings (Updated to use langchain-huggingface) βββββββββββββββββββββ | |
embeddings = HuggingFaceEmbeddings( | |
model_name="BAAI/bge-large-en-v1.5", | |
model_kwargs={"device": "cpu"}, | |
encode_kwargs={"normalize_embeddings": True} | |
) | |
# βββββββ vector store ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
vector_store = SupabaseVectorStore( | |
client=supabase, | |
embedding=embeddings, | |
query_name="match_documents", | |
table_name="documents", | |
) | |
# βββββββ LLM setup ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
model = "HuggingFaceTB/SmolLM3-3B" | |
temperature = 0.1 | |
max_tokens = 500 | |
import re | |
def clean_response(answer: str) -> str: | |
"""Clean up AI response by removing unwanted artifacts and formatting.""" | |
if not answer: | |
return answer | |
# Remove thinking tags and content | |
answer = re.sub(r'<think>.*?</think>', '', answer, flags=re.DOTALL) | |
answer = re.sub(r'<thinking>.*?</thinking>', '', answer, flags=re.DOTALL) | |
# Remove other common AI response artifacts | |
answer = re.sub(r'\[.*?\]', '', answer, flags=re.DOTALL) | |
answer = re.sub(r'\{.*?\}', '', answer, flags=re.DOTALL) | |
answer = re.sub(r'```.*?```', '', answer, flags=re.DOTALL) | |
answer = re.sub(r'---.*?---', '', answer, flags=re.DOTALL) | |
# Remove excessive whitespace and newlines | |
answer = re.sub(r'\s+', ' ', answer).strip() | |
# Remove common AI-generated prefixes/suffixes | |
answer = re.sub(r'^(Assistant:|AI:|Grok:)\s*', '', answer, flags=re.IGNORECASE) | |
answer = re.sub(r'\s*(Sincerely,.*|Best regards,.*|Regards,.*)$', '', answer, flags=re.IGNORECASE) | |
return answer | |
def create_conversational_rag_chain(): | |
"""Create a modern conversational RAG chain using LCEL.""" | |
# Create the HuggingFace LLM | |
llm = ChatOpenAI( | |
base_url=f"https://router.huggingface.co/hf-inference/models/{model}/v1", | |
api_key=hf_api_key, | |
model=model, | |
temperature=temperature, | |
max_tokens=max_tokens, | |
timeout=30, | |
max_retries=3, | |
) | |
# Create retriever | |
retriever = vector_store.as_retriever( | |
search_kwargs={"score_threshold": 0.6, "k": 4, "filter": {"user": username}} | |
) | |
# Create system prompt for RAG | |
system_prompt = """You are a helpful safety assistant. Use the following pieces of retrieved context to answer the question. | |
If you don't know the answer based on the context, just say that you don't have enough information to answer that question. | |
Context: {context} | |
Chat History: {chat_history} | |
Question: {input} | |
Answer:""" | |
prompt = ChatPromptTemplate.from_messages([ | |
("system", system_prompt), | |
MessagesPlaceholder("chat_history"), | |
("human", "{input}"), | |
]) | |
# Create document processing chain | |
question_answer_chain = create_stuff_documents_chain(llm, prompt) | |
# Create retrieval chain | |
rag_chain = create_retrieval_chain(retriever, question_answer_chain) | |
return rag_chain | |
def response_generator(query: str, chat_history: list) -> str: | |
"""Ask the RAG chain to answer `query`, with JSONβerror fallback.""" | |
# log usage | |
add_usage(supabase, "chat", "prompt:" + query, {"model": model, "temperature": temperature}) | |
logger.info("Using HF model %s", model) | |
# Create the RAG chain | |
rag_chain = create_conversational_rag_chain() | |
# Format chat history for the chain | |
formatted_history = [] | |
for msg in chat_history: | |
if msg["role"] == "user": | |
formatted_history.append(HumanMessage(content=msg["content"])) | |
elif msg["role"] == "assistant": | |
formatted_history.append(AIMessage(content=msg["content"])) | |
try: | |
result = rag_chain.invoke({ | |
"input": query, | |
"chat_history": formatted_history | |
}) | |
answer = result.get("answer", "") | |
context = result.get("context", []) | |
if not context: | |
return ( | |
"I'm sorry, I don't have enough information to answer that. " | |
"If you have a public data source to add, please email copilot@securade.ai." | |
) | |
answer = clean_response(answer) | |
return answer | |
except JSONDecodeError as e: | |
logger.error("JSONDecodeError: %s", e) | |
return "Sorry, I had trouble processing your request. Please try again." | |
except Exception as e: | |
logger.error("Unexpected error: %s", e) | |
return "Sorry, I encountered an error while processing your request. Please try again." | |
# βββββββ Streamlit UI ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
st.set_page_config( | |
page_title="Securade.ai - Safety Copilot", | |
page_icon="https://securade.ai/favicon.ico", | |
layout="centered", | |
initial_sidebar_state="collapsed", | |
menu_items={ | |
"About": "# Securade.ai Safety Copilot v0.1\n[https://securade.ai](https://securade.ai)", | |
"Get Help": "https://securade.ai", | |
"Report a Bug": "mailto:hello@securade.ai", | |
}, | |
) | |
st.title("π·ββοΈ Safety Copilot π¦Ί") | |
stats = get_usage(supabase) | |
st.markdown(f"_{stats} queries answered!_") | |
st.markdown( | |
"Chat with your personal safety assistant about any health & safety related queries. " | |
"[[blog](https://securade.ai/blog/how-securade-ai-safety-copilot-transforms-worker-safety.html)" | |
"|[paper](https://securade.ai/assets/pdfs/Securade.ai-Safety-Copilot-Whitepaper.pdf)]" | |
) | |
# Initialize chat history | |
if "chat_history" not in st.session_state: | |
st.session_state.chat_history = [] | |
# Display chat history | |
for msg in st.session_state.chat_history: | |
with st.chat_message(msg["role"]): | |
st.markdown(msg["content"]) | |
# Handle new user input | |
if prompt := st.chat_input("Ask a question"): | |
# Add user message to history | |
st.session_state.chat_history.append({"role": "user", "content": prompt}) | |
# Display user message | |
with st.chat_message("user"): | |
st.markdown(prompt) | |
# Generate and display response | |
with st.spinner("Safety briefing in progress..."): | |
answer = response_generator(prompt, st.session_state.chat_history[:-1]) # Exclude current message | |
with st.chat_message("assistant"): | |
st.markdown(answer) | |
# Add assistant response to history | |
st.session_state.chat_history.append({"role": "assistant", "content": answer}) |