Spaces:
Paused
Paused
import torch | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import pandas as pd | |
import requests | |
import tqdm | |
from sentence_transformers import SentenceTransformer, util | |
import re | |
from datetime import datetime, date | |
import time | |
from openai import OpenAI | |
import json | |
import os | |
from typing import Dict, Any, List | |
import textwrap | |
from flask import Flask, request, jsonify | |
import gradio as gr | |
DESCRIPTION = ''' | |
<div> | |
<h1 style="text-align: center;">Phobos 🪐</h1> | |
<p>This is a open tuned model that was fitted onto a RAG pipeline using <a href="https://huggingface.co/sentence-transformers/all-mpnet-base-v2"><b>all-mpnet-base-v2</b></a>.</p> | |
<h3 style="text-align: center;">In order to chat, please say 'gen phobos' = General Question you have of any topic. Say 'phobos' for questions specifically medical.</h3> | |
</div> | |
''' | |
# API keys | |
api_key = os.getenv('OPEN_AI_API_KEY') | |
df_embeds = pd.read_csv("chunks_tokenized.csv") | |
df_embeds["embeddings"] = df_embeds["embeddings"].apply(lambda x: np.fromstring(x.strip("[]"), sep=" ")) | |
embeds_dict = df_embeds.to_dict(orient="records") | |
# convert into tensors | |
embeddings = torch.tensor(np.array(df_embeds["embeddings"].to_list()), dtype=torch.float32).to('cuda') | |
# Make a text wrapper | |
def text_wrapper(text): | |
""" | |
Wraps the text that will pass here | |
""" | |
clean_text = textwrap.fill(text, 80) | |
print(clean_text) | |
# Let's first get the embedding model | |
embedding_model = SentenceTransformer(model_name_or_path="all-mpnet-base-v2", | |
device='cuda') | |
# functionize RAG Pipeline | |
def rag_pipeline(query, | |
embedding_model, | |
embeddings, | |
device: str, | |
chunk_min_token: list): | |
""" | |
Grabs a query and retrieve data all in passages, augments them, than it | |
it outputs the top 5 relevant results regarding query's meaning using dot scores. | |
""" | |
# Retrieval | |
query_embeddings = embedding_model.encode(query, convert_to_tensor=True).to(device) | |
# Augmentation | |
dot_scores = util.dot_score(a=query_embeddings, b=embeddings)[0] | |
# Output | |
scores, indices = torch.topk(dot_scores, k=5) | |
counting = 0 | |
for score, idx in zip(scores, indices): | |
counting+=1 | |
clean_score = score.item()*100 | |
print(f"For the ({counting}) result has a score: {round(clean_score, 2)}%") | |
print(f"On index: {idx}") | |
print(f"Relevant Text:\n") | |
print(f"{text_wrapper(chunk_min_token[idx]['sentence_chunk'])}\n") | |
# Message request to gpt | |
def message_request_to_model(input_text: str): | |
""" | |
Message to pass to the request on API | |
""" | |
message_to_model = [ | |
{"role": "system", "content": "You are a helpful assistant called 'Phobos'."}, | |
{"role": "user", "content": input_text}, # This must be in string format or else the request won't be successful | |
] | |
return message_to_model | |
# Functionize API request from the very beginning as calling gpt for the first time | |
def request_gpt_model(input_text, | |
temperature, | |
message_to_model_api, | |
model: str="gpt-3.5-turbo"): | |
""" | |
This will pass in a request to the gpt api with the messages and | |
will take the whole prompt generated as input as intructions to model | |
and output the similiar meaning on the output. | |
""" | |
# Create client | |
client = OpenAI(api_key=api_key) | |
# Make a request, for the input prompt | |
response = client.chat.completions.create( | |
model=model, | |
messages=message_to_model_api, | |
temperature=temperature, | |
) | |
# Output the message in readable format | |
output = response.choices[0].message.content | |
json_response = json.dumps(json.loads(response.model_dump_json()), indent=4) | |
# print(f"{text_wrapper(output)}") | |
print(output) | |
return output, json_response | |
# Functionize saving output to file | |
def save_log_models_activity(query, prompt, continue_question, output, cont_output, embeds_dict, json_response, | |
model, rag_pipeline, message_request_to_model, indices, embedding_model, source_directed: str): | |
""" | |
This will save the models input and output interaction, onto | |
a txt file, for each request, labeling model that was used. | |
What sort of embedding process, pipeline that was used and | |
date and time it was ran | |
""" | |
# If there is a follow up question: | |
input_query = "" | |
if continue_question != "": | |
input_query += continue_question | |
else: | |
input_query += query | |
clean_query = re.sub(r'[^\w\s]', '', input_query).replace(' ', '_') | |
file_path = os.path.join("./logfiles/may-2024/", f"{clean_query}.txt") | |
#Open the file in write mode | |
with open(file_path, 'w', encoding='utf-8') as file: | |
file.write(f"Original Query: {query}\n\n") | |
if prompt != "": | |
file.write(f"Base Prompt: {prompt}\n\n") | |
if continue_question != "": | |
file.write(f"Follow up question:\n\n{continue_question}\n\n") | |
file.write(f"Output:\n\n {cont_output}") | |
else: | |
file.write(f"Output:\n\n{output}\n\n") | |
# Json response | |
file.write(f"\n\nJson format response: {json_response}\n\n") | |
for idx in indices: | |
# Let's log the models activity in txt file | |
if rag_pipeline: | |
file.write(f"{source_directed}") | |
file.write(f"\n\nPipeline Used: RAG\n") | |
file.write(f"Embedding Model used on tokenizing pipeline:\n\n{embedding_model}\n") | |
file.write(f"\nRelevant Passages: {embeds_dict[idx]['sentence_chunk']}\n\n") | |
break | |
file.write(f"Model used: {model}\n") | |
# file.write(f"{message_request_to_model}") | |
today = date.today() | |
current_time = datetime.now().time() | |
file.write(f"Date: {today.strftime('%B %d, %Y')}\nTime: {current_time.strftime('%H:%M:%S')}\n\n") | |
# retrieve rag resources such as score and indices | |
def rag_resources(query: str, | |
device: str="cuda"): | |
""" | |
Extracts only the scores and indices of the top 5 best results | |
according to dot scores on query. | |
""" | |
# Retrieval | |
query_embeddings = embedding_model.encode(query, convert_to_tensor=True).to(device) | |
# Augmentation | |
dot_scores = util.dot_score(a=query_embeddings, b=embeddings)[0] | |
# Output | |
scores, indices = torch.topk(dot_scores, k=5) | |
return scores, indices | |
# Format the prompt | |
def rag_prompt_formatter(prompt: str, | |
prev_quest: list, | |
context_items: List[Dict[str, Any]]): | |
""" | |
Format the base prompt with the user query. | |
""" | |
# Convert the list into string | |
prev_questions_str ='\n'.join(prev_quest) # convert to string so we can later format on base_prompt | |
context = "- " + "\n- ".join(i["sentence_chunk"] for i in context_items) | |
base_prompt = """In this text, you will act as supportive medical assistant. | |
Give yourself room to think. | |
Explain each topic with facts and also suggestions based on the users needs. | |
Keep your answers thorough but practical. | |
\nHere are the past questions and answers you gave to the user, to serve you as a memory: | |
{previous_questions} | |
\nYou as the assistant will recieve context items for retrieving information. | |
\nNow use the following context items to answer the user query. Be advised if the user does not give you | |
any query that seems medical, DO NOT extract the relevant passages: | |
{context} | |
\nRelevant passages: Please extract the context items that helped you answer the user's question | |
<extract relevant passages from the context here> | |
User query: {query} | |
Answer:""" | |
prompt = base_prompt.format(previous_questions=prev_questions_str, context=context, query=prompt) | |
return prompt | |
# Format general prompt for any question | |
def general_prompt_formatter(prompt: str, | |
prev_quest: list): | |
""" | |
Formats the prompt to just past the 10 previous questions without | |
rag. | |
""" | |
# Convert the list into string | |
prev_questions_str ='\n'.join(prev_quest) # convert to string so we can later format on base_prompt | |
base_prompt = """In this text, you will act as supportive assistant. | |
Give yourself room to think. | |
Explain each topic with facts and also suggestions based on the users needs. | |
Keep your answers thorough but practical. | |
\nHere are the past questions and answers you gave to the user, to serve you as a memory: | |
{previous_questions} | |
\nAnswer the User query regardless if there was past questions or not. | |
\nUser query: {query} | |
Answer:""" | |
prompt = base_prompt.format(previous_questions=prev_questions_str, query=prompt) # format method expect a string to subsistute not a list | |
return prompt | |
# Saving 10 Previous questions and answers | |
def prev_recent_questions(input_text: str, | |
ai_output: list): | |
""" | |
Saves the previous 10 questions asked by the user into | |
a .txt file, stores those file in a list, when the len() | |
of that list reaches 10 it will reset to expect the next 10 | |
questions and answer given by AI. | |
""" | |
formatted_response = f"Current Question: {input_text}\n\n" | |
# Convert the tuple elements to strings and concatenate them with the formatted_response | |
formatted_response += "".join(str(elem) for elem in ai_output) | |
# clean the query (input_text) | |
clean_query = re.sub(r'[^\w\s]', '', input_text).replace(' ', '_') | |
file_path = os.path.join("./memory/may-2024", f"{clean_query}.txt") | |
# Let's save the content in the path for the .txt file | |
try: | |
with open(file_path, 'w', encoding='utf-8') as file: | |
file.write(formatted_response) | |
today = date.today() | |
current_time = datetime.now().today() | |
file.write(f"\n\nDate: {today.strftime('%B %d, %Y')}\nTime: {current_time.strftime('%H:%M:%S')}\n\n") | |
except Exception as e: | |
print(f"Error writing file: {e}") | |
# # Make a list of the path names | |
return file_path | |
# Function RAG-GPT | |
def rag_gpt(query: str, | |
previous_quest: list, | |
continue_question: str="", | |
rag_pipeline: bool=True, | |
temperature: int=0, | |
model: str="gpt-3.5-turbo", | |
embeds_dict=embeds_dict): | |
""" | |
This contains the RAG system implemented with | |
OpenAI models. This will process the the data through | |
RAG, afterwards be formatted into instructive prompt to model | |
filled with examples, context items and query. Afterwards, | |
this prompt is passed the models endpoint on API and cleanly return's | |
the output on response. | |
""" | |
if continue_question == "": | |
print(f"Your question: {query}\n") | |
else: | |
print(f"Your Question: {continue_question}\n") | |
# Show query | |
query_back = f"Your question: {query}\n" | |
cont_query_back = f"Your Question: {continue_question}\n" | |
top_score_back = "" | |
# RAG resources | |
# scores, indices = rag_resources(query) | |
if rag_pipeline: | |
scores, indices = rag_resources(query) | |
# Get context item for prompt generation | |
context_items = [embeds_dict[idx] for idx in indices] | |
# augment the context items with the base prompt and user query | |
prompt = rag_prompt_formatter(prompt=query, prev_quest=previous_quest, context_items=context_items) | |
# Show analytics on response data | |
top_score = [score.item() for score in scores] | |
print(f"Highest Result: {round(top_score[0], 2)*100}%\n") | |
top_score_back += f"Highest Result: {round(top_score[0], 2)*100}%\n" | |
else: | |
prompt = general_prompt_formatter(prompt=query, prev_quest=previous_quest) | |
print(f"Here is the previous 7 questions: {previous_quest}") | |
print(f"This is the prompt: {prompt}") | |
print(f"\nEnd of prompt") | |
# all variables to return back to json on API endpoint for gardio | |
cont_output_back = "" | |
output_back = "" | |
source_grabbed_back = "" | |
url_source_back = "" | |
pdf_source_back = "" | |
link_or_pagnum_back = "" | |
# LLM input prompt | |
# If there is follow up question | |
# Let's log the models activity in txt file | |
if continue_question != "": | |
message_request = message_request_to_model(input_text=continue_question) | |
cont_output, json_response = request_gpt_model(continue_question, temperature=temperature, message_to_model_api=message_request, model=model) | |
cont_output_back += cont_output | |
output = "" | |
index = embeds_dict[indices[0]] | |
# Let's get the link or page number of retrieval | |
link_or_pagnum = index["link_or_page_number"] | |
link_or_pagnum = str(link_or_pagnum) | |
if link_or_pagnum.isdigit(): | |
link_or_pagnum_back += link_or_pagnum | |
# link_or_pagnum = int(link_or_pagnum) | |
source = f"The sources origins comes from a PDF" | |
# source_back += source | |
save_log_models_activity(query=query, | |
prompt=prompt, | |
continue_question=continue_question, | |
output=output, | |
cont_output=cont_output, | |
embeds_dict=embeds_dict, | |
json_response=json_response, | |
model=model, | |
rag_pipeline=rag_pipeline, | |
message_request_to_model=continue_question, | |
indices=indices, | |
embedding_model=embedding_model, | |
source_directed=source) | |
else: | |
link = f"Source Directed : {index['link_or_page_number']}" | |
# link_back += link | |
save_log_models_activity(query=query, | |
prompt=prompt, | |
continue_question=continue_question, | |
output=output, | |
cont_output=cont_output, | |
embeds_dict=embeds_dict, | |
json_response=json_response, | |
model=model, | |
rag_pipeline=rag_pipeline, | |
message_request_to_model=continue_question, | |
indices=indices, | |
embedding_model=embedding_model, | |
source_directed=link) | |
# If no follow up question | |
else: | |
message_request = message_request_to_model(input_text=prompt) | |
output, json_response = request_gpt_model(prompt, temperature=temperature, message_to_model_api=message_request, model=model) | |
output_back += output | |
cont_output = "" | |
if rag_pipeline: | |
index = embeds_dict[indices[0]] | |
# Let's get the link or page number of retrieval | |
link_or_pagnum = index["link_or_page_number"] | |
link_or_pagnum = str(link_or_pagnum) | |
if link_or_pagnum.isdigit(): | |
link_or_pagnum_back += link_or_pagnum | |
print("is digit\n") | |
source = f"The sources origins comes from a PDF" | |
# source_back += source | |
save_log_models_activity(query=query, | |
prompt=prompt, | |
continue_question=continue_question, | |
output=output, | |
cont_output=cont_output, | |
embeds_dict=embeds_dict, | |
json_response=json_response, | |
model=model, | |
rag_pipeline=rag_pipeline, | |
message_request_to_model=query, | |
indices=indices, | |
embedding_model=embedding_model, | |
source_directed=source) | |
else: | |
link = f"Source Directed : {index['link_or_page_number']}" | |
# link_back += link | |
save_log_models_activity(query=query, | |
prompt=prompt, | |
continue_question=continue_question, | |
output=output, | |
cont_output=cont_output, | |
embeds_dict=embeds_dict, | |
json_response=json_response, | |
model=model, | |
rag_pipeline=rag_pipeline, | |
message_request_to_model=query, | |
indices=indices, | |
embedding_model=embedding_model, | |
source_directed=link) | |
else: | |
save_log_models_activity(query=query, | |
prompt=prompt, | |
continue_question="", | |
output=output, | |
cont_output="", | |
embeds_dict=embeds_dict, | |
json_response=json_response, | |
model=model, | |
rag_pipeline=rag_pipeline, | |
message_request_to_model="", | |
indices="", | |
embedding_model=embedding_model, | |
source_directed="") | |
if rag_pipeline: | |
for idx in indices: | |
print(f"\n\nOriginated Source:\n\n {embeds_dict[idx]['sentence_chunk']}\n") | |
source_grabbed_back += f"\n\nOriginated Source:\n\n {embeds_dict[idx]['sentence_chunk']}\n" | |
link_or_pagnum = embeds_dict[idx]['link_or_page_number'] | |
link_or_pagnum = str(link_or_pagnum) | |
if link_or_pagnum.isdigit(): | |
link_or_pagnum = int(link_or_pagnum) | |
print(f"The sources origins comes from a PDF") | |
pdf_source_back += f"The sources origins comes from a PDF" | |
else: | |
print(f"Source Directed : {embeds_dict[idx]['link_or_page_number']}") | |
url_source_back += f"Source Directed : {embeds_dict[idx]['link_or_page_number']}" | |
break | |
else: | |
pass | |
if continue_question != "": | |
return cont_output_back, source_grabbed_back, pdf_source_back, url_source_back | |
else: | |
return output_back, source_grabbed_back, pdf_source_back, url_source_back | |
# Mode of the LLM | |
llm_mode = "" | |
# List of files paths for memory | |
memory_file_paths = [] | |
# first time condition | |
first_time = True | |
# Previous 5 questions stored in a dictionary for the memory of LLM | |
prev_5_questions_list = [] | |
def check_cuda_and_gpu_type(): | |
# Your logic to check CUDA availability and GPU type | |
if torch.cuda.is_available(): | |
gpu_info = torch.cuda.get_device_name(0) # Get info about first GPU | |
return f"CUDA is Available! GPU Info: {gpu_info}" | |
else: | |
return "CUDA is Not Available." | |
def bot_comms(input, history): | |
""" | |
Communication between UI on gradio to the rag_gpt model. | |
""" | |
global llm_mode | |
global memory_file_paths | |
global prev_5_questions_list | |
global first_time | |
if input == "cuda info": | |
output = check_cuda_and_gpu_type() | |
return output | |
state_mode = True | |
# Input as 'gen phobos' | |
if input == "gen phobos": | |
output_text = "Great! Ask me any question. 🦧" | |
llm_mode = input | |
return output_text | |
if input == "phobos": | |
output_text = "Okay! What's your medical questions.⚕️" | |
llm_mode = input | |
return output_text | |
# Reset memory with command | |
if input == "reset memory": | |
memory_file_paths = [] | |
output_text = f"Manually Resetted Memory! 🧠" | |
return output_text | |
if llm_mode == "gen phobos": | |
# Get the 10 previous file paths | |
for path in memory_file_paths: | |
with open(path, 'r', encoding='utf-8') as file: | |
q_a = file.read() | |
# Now we have the q/a in string format | |
q_a = str(q_a) | |
# Make keys and values for prev dict | |
prev_5_questions_list.append(q_a) | |
if first_time: | |
state_mode = False | |
# Get the previous questions and answers list to pass to rag_gpt to place on base prompt | |
gen_gpt_output = rag_gpt(input, previous_quest=[], rag_pipeline=state_mode) | |
first_time = False | |
else: | |
state_mode = False | |
gen_gpt_output = rag_gpt(input, previous_quest=prev_5_questions_list, rag_pipeline=state_mode) | |
# reset the memory file_paths | |
if len(memory_file_paths) == 5: | |
memory_file_paths = [] | |
file_path = prev_recent_questions(input_text=input, ai_output=gen_gpt_output) | |
memory_file_paths.append(file_path) | |
if llm_mode == "phobos": | |
for path in memory_file_paths: | |
with open(path, 'r', encoding='utf-8') as file: | |
q_a = file.read() | |
# Now we have the q/a in string format | |
q_a = str(q_a) | |
# Make keys and values for prev dict | |
prev_5_questions_list.append(q_a) | |
if first_time: | |
# Get the previous questions and answers list to pass to rag_gpt to place on base prompt | |
rag_output_text = rag_gpt(input, previous_quest=[], rag_pipeline=state_mode) | |
first_time = False | |
# return jsonify({'output': rag_output_text}) | |
else: | |
rag_output_text = rag_gpt(input, previous_quest=prev_5_questions_list, rag_pipeline=state_mode) | |
# return jsonify({'output': rag_output_text}) | |
# reset the memory file_paths | |
if len(memory_file_paths) == 5: | |
memory_file_paths = [] | |
file_path = prev_recent_questions(input_text=input, ai_output=rag_output_text) | |
memory_file_paths.append(file_path) | |
output = rag_gpt(query=input, | |
previous_quest=[], | |
rag_pipeline=False) | |
formatted_response = "\n".join(output[0].split("\n")) | |
return formatted_response | |
# Gradio block | |
chatbot=gr.Chatbot(height=725, label='Gradio ChatInterface') | |
with gr.Blocks(fill_height=True) as demo: | |
gr.Markdown(DESCRIPTION) | |
gr.ChatInterface( | |
fn=bot_comms, | |
chatbot=chatbot, | |
fill_height=True, | |
examples=["gen phobos", "phobos", "reset memory", "cuda info"], | |
cache_examples=False | |
) | |
if __name__ == "__main__": | |
demo.launch() |