Tafazzul-Nadeeem
Chatbot ready and deployment checked
cb42c88
import gradio as gr
from openai import OpenAI
import os
import base64
import time
import copy
import re
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
from agents import rag_decision
from agents import get_top_k
from agents import get_prescription_text
from prompts import bot_welcome_message, openai_opening_system_message
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
with gr.Blocks() as demo:
################################ AGENTS ###################################
# Agent1 - RAG Decision Agent (whether RAG is needed for the user's query)
def agent1_rag_decision(query):
decision = rag_decision(query)
return decision
# Agent2 - RAG Retrieval Agent (retrieve top k relevant documents)
def agent2_use_rag(query, k=3):
results = get_top_k(query, k=k)
return results
# Agent3 - LLM Agent (get query response from LLM)
def agent3_llm_agent(messages):
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
temperature=0.7
)
return response.choices[0].message.content.strip()
def agent4_get_prescription_text(messages):
"""
Openai agent to get prescription text.
"""
prescription_text = get_prescription_text(messages)
return prescription_text
###########################################################################
def encode_image(image_path):
with open(image_path, "rb") as f:
return base64.b64encode(f.read()).decode("utf-8")
def load_welcome():
history = []
history.append({"role": "system", "content": openai_opening_system_message})
history.append({"role": "assistant", "content": bot_welcome_message})
return history
def clear_and_load():
# Return the welcome message
history = []
history.append({"role": "system", "content": openai_opening_system_message})
history.append({"role": "assistant", "content": bot_welcome_message})
return history, None
def add_message(history, message):
# Send the image to the agent4_get_prescription_text
messages = []
if message["text"] is not None:
messages.append({
"role": "user",
"content":[{"type": "text", "text": message["text"]}]
})
for x in message["files"]:
encoded_content = encode_image(x)
messages[0]["content"].append({
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{encoded_content}"}
})
history.append({"role": "user", "content": {"path": x}})
# call agent4_get_prescription_text if there is an image_url in the message
has_image_url = any("image_url" in item for item in messages[0]["content"])
if has_image_url:
prescription_text = agent4_get_prescription_text(messages)
history.append({"role": "system", "content": prescription_text})
if message["text"] is not None:
history.append({"role": "user", "content": message["text"]})
return history, gr.MultimodalTextbox(value=None, interactive=False, file_count="multiple", placeholder="Enter message or upload file...")
def respond(history):
if len(history) == 2:
history.insert(0,{"role": "system", "content": openai_opening_system_message})
messages = copy.deepcopy(history)
for i, msg in enumerate(messages):
if isinstance(msg["content"], str):
# If the content is a string, encode it
messages[i]["content"] = [{
"type": "text",
"text": msg["content"]
}]
if isinstance(msg["content"],tuple):
# If the content is a file path, encode it
# file_path = msg["content"][0]
# encoded_content = encode_image(file_path)
messages[i]["content"] = [{
"type": "text",
"text": "User Image"}]
clean_messages = [] # OpenAI doesnot accept metadata or options in messages
for msg in messages:
clean_msg = {
"role": msg["role"],
"content": msg["content"]
}
clean_messages.append(clean_msg)
########################### AGENTIC WORKFLOW ##########################
# Call Agent1- the RAG Decision Agent
rag_query = ""
if clean_messages[-1]["role"] == "system" and "No prescription found" in clean_messages[-1]["content"]:
# If the last message is a system message with "No prescription found", skip RAG decision
rag_decision = False
elif clean_messages[-2]["role"] == "system" and "No prescription found" in clean_messages[-2]["content"]:
rag_decision = False
else:
# Get the last 10 messages in the format "role: <message>"
last_10 = clean_messages[-10:] if len(clean_messages) > 10 else clean_messages
rag_query = "\n".join(
f"{msg['role']}: {msg['content'][0]['text'] if isinstance(msg['content'], list) and msg['content'] and 'text' in msg['content'][0] else ''}"
for msg in last_10
)
rag_decision = agent1_rag_decision(rag_query)
if rag_decision == True:
#Call Agent2 - the RAG Retrieval Agent
top_k_results = agent2_use_rag(clean_messages[-1]["content"][0]["text"], k=5)
# Append the top k results to the messages
for i, result in enumerate(top_k_results):
clean_messages.append({
"role": "system",
"content": f"RAG Retrieved Result-{i+1}: " + result["content"]
})
# Call Agent3 - the LLM Agent to get query response
response = agent3_llm_agent(clean_messages)
else:
# Call Agent3 - the LLM Agent to get query response
response = agent3_llm_agent(clean_messages)
#######################################################################
# history.append({"role": "assistant", "content": response})
# return history
history.append({"role": "assistant", "content": ""})
# Split by sentence boundaries (naive but works for most cases)
chunks = re.split(r'(?<=[.!?]) +', response)
for chunk in chunks:
history[-1]["content"] += chunk + " "
time.sleep(0.3)
yield history
##########################################################################
gr.Markdown(
"""
<h1 style='text-align: center; font-size: 1.5em; color: #2c3e50; margin-bottom: 0.2em;'>
MedScan Diagnostic Services Chatbot (Agentic AI framework powered by OpenAI)
</h1>
"""
)
chatbot = gr.Chatbot(type="messages",
render_markdown=True,
height=380)
chat_input = gr.MultimodalTextbox(
interactive=True,
file_count="multiple",
placeholder="Enter message or upload file...",
show_label=False
)
clear = gr.Button("New Chat")
clear.click(
clear_and_load,
inputs=None,
outputs=[chatbot, chat_input]
)
demo.load(load_welcome, None, chatbot, api_name="load_welcome")
chat_msg = chat_input.submit(
add_message, [chatbot, chat_input], [chatbot, chat_input]
)
bot_msg = chat_msg.then(respond, chatbot, chatbot, api_name="bot_response")
bot_msg.then(lambda: gr.MultimodalTextbox(interactive=True), None, [chat_input])
if __name__ == "__main__":
demo.launch()