import gradio as gr from openai import OpenAI import os import base64 import time import copy import re from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() from agents import rag_decision from agents import get_top_k from agents import get_prescription_text from prompts import bot_welcome_message, openai_opening_system_message client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) with gr.Blocks() as demo: ################################ AGENTS ################################### # Agent1 - RAG Decision Agent (whether RAG is needed for the user's query) def agent1_rag_decision(query): decision = rag_decision(query) return decision # Agent2 - RAG Retrieval Agent (retrieve top k relevant documents) def agent2_use_rag(query, k=3): results = get_top_k(query, k=k) return results # Agent3 - LLM Agent (get query response from LLM) def agent3_llm_agent(messages): response = client.chat.completions.create( model="gpt-4o-mini", messages=messages, temperature=0.7 ) return response.choices[0].message.content.strip() def agent4_get_prescription_text(messages): """ Openai agent to get prescription text. """ prescription_text = get_prescription_text(messages) return prescription_text ########################################################################### def encode_image(image_path): with open(image_path, "rb") as f: return base64.b64encode(f.read()).decode("utf-8") def load_welcome(): history = [] history.append({"role": "system", "content": openai_opening_system_message}) history.append({"role": "assistant", "content": bot_welcome_message}) return history def clear_and_load(): # Return the welcome message history = [] history.append({"role": "system", "content": openai_opening_system_message}) history.append({"role": "assistant", "content": bot_welcome_message}) return history, None def add_message(history, message): # Send the image to the agent4_get_prescription_text messages = [] if message["text"] is not None: messages.append({ "role": "user", "content":[{"type": "text", "text": message["text"]}] }) for x in message["files"]: encoded_content = encode_image(x) messages[0]["content"].append({ "type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{encoded_content}"} }) history.append({"role": "user", "content": {"path": x}}) # call agent4_get_prescription_text if there is an image_url in the message has_image_url = any("image_url" in item for item in messages[0]["content"]) if has_image_url: prescription_text = agent4_get_prescription_text(messages) history.append({"role": "system", "content": prescription_text}) if message["text"] is not None: history.append({"role": "user", "content": message["text"]}) return history, gr.MultimodalTextbox(value=None, interactive=False, file_count="multiple", placeholder="Enter message or upload file...") def respond(history): if len(history) == 2: history.insert(0,{"role": "system", "content": openai_opening_system_message}) messages = copy.deepcopy(history) for i, msg in enumerate(messages): if isinstance(msg["content"], str): # If the content is a string, encode it messages[i]["content"] = [{ "type": "text", "text": msg["content"] }] if isinstance(msg["content"],tuple): # If the content is a file path, encode it # file_path = msg["content"][0] # encoded_content = encode_image(file_path) messages[i]["content"] = [{ "type": "text", "text": "User Image"}] clean_messages = [] # OpenAI doesnot accept metadata or options in messages for msg in messages: clean_msg = { "role": msg["role"], "content": msg["content"] } clean_messages.append(clean_msg) ########################### AGENTIC WORKFLOW ########################## # Call Agent1- the RAG Decision Agent rag_query = "" if clean_messages[-1]["role"] == "system" and "No prescription found" in clean_messages[-1]["content"]: # If the last message is a system message with "No prescription found", skip RAG decision rag_decision = False elif clean_messages[-2]["role"] == "system" and "No prescription found" in clean_messages[-2]["content"]: rag_decision = False else: # Get the last 10 messages in the format "role: " last_10 = clean_messages[-10:] if len(clean_messages) > 10 else clean_messages rag_query = "\n".join( f"{msg['role']}: {msg['content'][0]['text'] if isinstance(msg['content'], list) and msg['content'] and 'text' in msg['content'][0] else ''}" for msg in last_10 ) rag_decision = agent1_rag_decision(rag_query) if rag_decision == True: #Call Agent2 - the RAG Retrieval Agent top_k_results = agent2_use_rag(clean_messages[-1]["content"][0]["text"], k=5) # Append the top k results to the messages for i, result in enumerate(top_k_results): clean_messages.append({ "role": "system", "content": f"RAG Retrieved Result-{i+1}: " + result["content"] }) # Call Agent3 - the LLM Agent to get query response response = agent3_llm_agent(clean_messages) else: # Call Agent3 - the LLM Agent to get query response response = agent3_llm_agent(clean_messages) ####################################################################### # history.append({"role": "assistant", "content": response}) # return history history.append({"role": "assistant", "content": ""}) # Split by sentence boundaries (naive but works for most cases) chunks = re.split(r'(?<=[.!?]) +', response) for chunk in chunks: history[-1]["content"] += chunk + " " time.sleep(0.3) yield history ########################################################################## gr.Markdown( """

MedScan Diagnostic Services Chatbot (Agentic AI framework powered by OpenAI)

""" ) chatbot = gr.Chatbot(type="messages", render_markdown=True, height=380) chat_input = gr.MultimodalTextbox( interactive=True, file_count="multiple", placeholder="Enter message or upload file...", show_label=False ) clear = gr.Button("New Chat") clear.click( clear_and_load, inputs=None, outputs=[chatbot, chat_input] ) demo.load(load_welcome, None, chatbot, api_name="load_welcome") chat_msg = chat_input.submit( add_message, [chatbot, chat_input], [chatbot, chat_input] ) bot_msg = chat_msg.then(respond, chatbot, chatbot, api_name="bot_response") bot_msg.then(lambda: gr.MultimodalTextbox(interactive=True), None, [chat_input]) if __name__ == "__main__": demo.launch()