Spaces:
Sleeping
Sleeping
#!/usr/bin/env python | |
# coding: utf-8 | |
# ## Prior Authorization Review APP: PriorAuthAI | |
# **Import general libraries** | |
# In[1]: | |
import os | |
import glob | |
from dotenv import load_dotenv | |
import gradio as gr | |
import asyncio | |
import numpy as np | |
from sklearn.manifold import TSNE | |
import plotly.graph_objects as go | |
# **Langchain related imports** | |
# In[2]: | |
from langchain.document_loaders import DirectoryLoader, TextLoader, PyPDFLoader | |
from langchain.text_splitter import CharacterTextSplitter | |
from langchain.schema import Document | |
from langchain_openai import OpenAIEmbeddings, ChatOpenAI | |
from langchain_chroma import Chroma | |
from langchain.memory import ConversationBufferMemory | |
from langchain.chains import ConversationalRetrievalChain | |
# **LLM Info** | |
# In[3]: | |
load_dotenv() | |
os.environ['OPENAI_API_KEY'] = os.getenv('OPENAI_API_KEY') | |
MODEL = "gpt-4o-mini" | |
# **Load Data** | |
# In[4]: | |
# Define the knowledge base folder | |
folders = glob.glob("MCS_Policydocs/*") | |
documents = [] | |
for folder in folders: | |
doc_type = os.path.basename(folder) # Extract the procedure name (folder name) | |
# Load all PDFs inside the procedure folder | |
loader = DirectoryLoader(folder, glob="**/*.pdf", loader_cls=PyPDFLoader) | |
docs_folder = loader.load() | |
# Tag each document with its procedure name | |
for doc in docs_folder: | |
doc.metadata["doc_type"] = doc_type | |
documents.append(doc) | |
# In[5]: | |
len(documents) | |
# **Split the text into chunks for manageability and token limits** | |
# In[6]: | |
# text_splitter = CharacterTextSplitter(chunk_size=2000, chunk_overlap=300) | |
# chunks = text_splitter.split_documents(documents) # rechunk existing documents | |
# **Populate Vector store with embeddings for each document** | |
# In[7]: | |
embeddings = OpenAIEmbeddings() # We will use OpenAI EMbeddings | |
# Delete if already exists | |
db_name = "vector_db" | |
if os.path.exists(db_name): | |
Chroma(persist_directory=db_name, embedding_function=embeddings).delete_collection() | |
# Create embedding vectorstore | |
vectorstore = Chroma.from_documents(documents=documents, embedding=embeddings, persist_directory=db_name) | |
print(f"Vectorstore created with {vectorstore._collection.count()} documents") | |
# In[8]: | |
collection = vectorstore._collection | |
sample_embedding = collection.get(limit=1, include=["embeddings"])["embeddings"][0] | |
dimensions = len(sample_embedding) | |
print(f"The vectors have {dimensions:,} dimensions") | |
# **RAG Implementation with Langchain** | |
# In[9]: | |
from langchain.prompts import PromptTemplate | |
prompt = PromptTemplate( | |
input_variables=["question", "context"], | |
template=""" | |
You are a **Medicare Administrative Contractor (MAC) AI Assistant**, assisting nurses and medical reviewers in evaluating prior authorization requests. | |
Your responses must be **accurate, structured, and policy-backed**, ensuring compliance with Medicare LCD/NCD guidelines. | |
For initial conversation greet the user that you are a MAC AI Assistant and you can currently answer any policy questions related to these four procedures: | |
1. Spinal Neuro stimulation, 2. Cervical Fusion, 3. MRI for Chronic Back Pain, 4. Total Joint Arthroplasty | |
πΉ **How to Respond:** | |
β **Prioritize accuracy and relevance based on the question.** | |
- If the LLM has sufficient knowledge, answer directly. | |
- If the query requires policy-based details, retrieve information from the MCS_Policydocs knowledge base. | |
- If there's a conflict, always prioritize the retrieved policy data over general LLM knowledge. | |
β **Adapt your response format based on the nature of the query:** | |
- If the query requires structured data (e.g., approval criteria, required documentation, denials), use **clear bullet points**. | |
- If the query is conversational, **respond naturally** without forcing structure. | |
- If it's a follow-up, **infer context** from the previous question instead of asking for unnecessary clarification. | |
β **Retrieving Documents:** | |
- **Do not retrieve or mention sources unless necessary to answer the query.** | |
- **Offer** document references, but only show them if explicitly requested by the user. | |
- When retrieving sources, **ensure they are highly relevant** to the question asked. | |
β **Handling Follow-up Questions:** | |
- If the user asks a follow-up (e.g., βWhat are the denial reasons?β), assume it refers to the **previous question** unless context suggests otherwise. | |
- If the follow-up is vague, attempt to infer meaning before asking for clarification. | |
- Avoid resetting the conversation flow or greeting the user again in a follow-up response. | |
πΉ **Context (Retrieved LCD/NCD Policies, if applicable):** | |
{context} | |
**π Answer:** | |
**Question:** {question} | |
""" | |
) | |
# In[17]: | |
# Initialize LLM | |
llm = ChatOpenAI(temperature=0.7, model_name=MODEL) | |
# In[18]: | |
# set up conversatin memory for chat | |
memory = ConversationBufferMemory(memory_key='chat_history', return_messages=True, output_key="answer") | |
# the retriever is an abstraction over the VectorStore that will be used during RAG | |
retriever = vectorstore.as_retriever() | |
#retriever = vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": 5}) | |
# Setup ConversationalRetrievalChain with fixed memory handling | |
conversation_chain = ConversationalRetrievalChain.from_llm( | |
llm=llm, | |
retriever = retriever, | |
memory=memory, # β Keep memory, but fix output_key issue | |
return_source_documents=True, # β Keep retrieved documents | |
combine_docs_chain_kwargs={"prompt": prompt} | |
) | |
# **Now Implement in Gradio UI** | |
# In[19]: | |
def chat(message, history): | |
global retrieved_docs | |
# Ensure retrieved_docs exists | |
if "retrieved_docs" not in globals(): | |
retrieved_docs = {} | |
# Handle requests for sources | |
if message.lower() in ["show sources", "show documents", "provide references"]: | |
if "last_retrieval" in retrieved_docs: | |
docs = retrieved_docs["last_retrieval"] | |
response = "**Retrieved Documents:**" | |
for i, doc in enumerate(docs): | |
response += f"\nπ {i+1}. {doc.metadata.get('source', 'Unknown')}\nSnippet: {doc.page_content[:200]}...\n" | |
return response if docs else "No relevant documents were retrieved." | |
return "No sources available for the last query." | |
# Use LLM to generate a response | |
result = conversation_chain.invoke({"question": message}) | |
answer = result["answer"] | |
documents = result["source_documents"] | |
# Store retrieved documents for follow-ups if needed | |
if documents: | |
retrieved_docs["last_retrieval"] = documents | |
# Offer documents but show only if requested | |
return f"**Answer:** {answer}\n\n_(Type 'show sources' if you need document references.)_" | |
# In[20]: | |
# Launch Gradio UI | |
ui = gr.ChatInterface(fn=chat, title="PriorAuthAI: RAG based Prior Authorization Review Chatbot") | |
ui.launch() | |
# In[ ]: | |