import sys sys.path.append('./LLAUS') from transformers import AutoTokenizer, AutoModelForCausalLM, AutoConfig import torch from llava import LlavaLlamaForCausalLM from llava.conversation import conv_templates from llava.utils import disable_torch_init from transformers import CLIPVisionModel, CLIPImageProcessor, StoppingCriteria from PIL import Image from torch.cuda.amp import autocast import gradio as gr import spaces from peft import prepare_model_for_int8_training, LoraConfig, get_peft_model import os from transformers import AutoProcessor, AutoModel import torch.nn.functional as F #--------------------------------- #++++++++ Model ++++++++++ #--------------------------------- DEFAULT_IMAGE_TOKEN = "" DEFAULT_IMAGE_PATCH_TOKEN = "" DEFAULT_IM_START_TOKEN = "" DEFAULT_IM_END_TOKEN = "" def patch_config(config_path): """Applies necessary patches to the model config.""" patch_dict = { "use_mm_proj": True, "mm_vision_tower": "openai/clip-vit-large-patch14", "mm_hidden_size": 1024 } cfg = AutoConfig.from_pretrained(config_path) if not hasattr(cfg, "mm_vision_tower"): print(f'`mm_vision_tower` not found in `{config_path}`, applying patch and save to disk.') for k, v in patch_dict.items(): setattr(cfg, k, v) cfg.save_pretrained(config_path) def load_llava_model(): """Loads and initializes the LLaVA model.""" model_name = "Baron-GG/LLaVA-Med" # Change this to your model if you uploaded a new one disable_torch_init() tokenizer = AutoTokenizer.from_pretrained(model_name) patch_config(model_name) model = LlavaLlamaForCausalLM.from_pretrained(model_name, torch_dtype=torch.float16).cuda() model.model.requires_grad_(False) image_processor = CLIPImageProcessor.from_pretrained(model.config.mm_vision_tower, torch_dtype=torch.float16) model.config.use_cache = False model.config.tune_mm_mlp_adapter = False model.config.freeze_mm_mlp_adapter = False model.config.mm_use_im_start_end = True mm_use_im_start_end = getattr(model.config, "mm_use_im_start_end", False) tokenizer.add_tokens([DEFAULT_IMAGE_PATCH_TOKEN], special_tokens=True) if mm_use_im_start_end: tokenizer.add_tokens([DEFAULT_IM_START_TOKEN, DEFAULT_IM_END_TOKEN], special_tokens=True) vision_tower = model.model.vision_tower[0] vision_tower.to(device='cuda', dtype=torch.float16) vision_config = vision_tower.config vision_config.im_patch_token = tokenizer.convert_tokens_to_ids([DEFAULT_IMAGE_PATCH_TOKEN])[0] vision_config.use_im_start_end = mm_use_im_start_end if mm_use_im_start_end: vision_config.im_start_token, vision_config.im_end_token = tokenizer.convert_tokens_to_ids([DEFAULT_IM_START_TOKEN, DEFAULT_IM_END_TOKEN]) image_token_len = (vision_config.image_size // vision_config.patch_size) ** 2 model = prepare_model_for_int8_training(model) lora_config = LoraConfig( r=64, lora_alpha=16, target_modules=["q_proj", "v_proj","k_proj","o_proj"], lora_dropout=0.05, bias="none", task_type="CAUSAL_LM", ) model = get_peft_model(model, lora_config).cuda() model.eval() return model, tokenizer, image_processor, image_token_len, mm_use_im_start_end def load_biomedclip_model(): """Loads the BiomedCLIP model and tokenizer.""" biomedclip_model_name = 'microsoft/BiomedCLIP-PubMedBERT_256-vit_base_patch16_224' processor = AutoProcessor.from_pretrained(biomedclip_model_name) model = AutoModel.from_pretrained(biomedclip_model_name).cuda().eval() return model, processor class KeywordsStoppingCriteria(StoppingCriteria): """Custom stopping criteria for generation.""" def __init__(self, keywords, tokenizer, input_ids): self.keywords = keywords self.tokenizer = tokenizer self.start_len = None self.input_ids = input_ids def __call__(self, output_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool: if self.start_len is None: self.start_len = self.input_ids.shape[1] else: outputs = self.tokenizer.batch_decode(output_ids[:, self.start_len:], skip_special_tokens=True)[0] for keyword in self.keywords: if keyword in outputs: return True return False def compute_similarity(image, text, biomedclip_model, biomedclip_processor): """Computes similarity scores using BiomedCLIP.""" with torch.no_grad(): inputs = biomedclip_processor(text=text, images=image, return_tensors="pt", padding=True).to(biomedclip_model.device) outputs = biomedclip_model(**inputs) image_embeds = outputs.image_embeds text_embeds = outputs.text_embeds image_embeds = F.normalize(image_embeds, dim=-1) text_embeds = F.normalize(text_embeds, dim=-1) similarity = (text_embeds @ image_embeds.transpose(-1, -2)).squeeze() return similarity @torch.no_grad() def eval_llava_model(llava_model, llava_tokenizer, llava_image_processor, image, question, image_token_len, mm_use_im_start_end, max_new_tokens, temperature): """Evaluates the LLaVA model for a given image and question.""" image_list = [] image_tensor = llava_image_processor.preprocess(image, return_tensors='pt')['pixel_values'][0] # 3, 224, 224 image_list.append(image_tensor) image_idx = 1 if mm_use_im_start_end: qs = DEFAULT_IM_START_TOKEN + DEFAULT_IMAGE_PATCH_TOKEN * image_token_len * image_idx + DEFAULT_IM_END_TOKEN + question else: qs = DEFAULT_IMAGE_PATCH_TOKEN * image_token_len * image_idx + '\n' + question conv = conv_templates["simple"].copy() conv.append_message(conv.roles[0], qs) prompt = conv.get_prompt() inputs = llava_tokenizer([prompt]) image_tensor = torch.stack(image_list, dim=0).half().cuda() input_ids = torch.as_tensor(inputs.input_ids).cuda() keywords = ['###'] stopping_criteria = KeywordsStoppingCriteria(keywords, llava_tokenizer, input_ids) with autocast(): output_ids = llava_model.generate( input_ids=input_ids, images=image_tensor, do_sample=True, temperature=temperature, max_new_tokens=max_new_tokens, stopping_criteria=[stopping_criteria] ) input_token_len = input_ids.shape[1] n_diff_input_output = (input_ids != output_ids[:, :input_token_len]).sum().item() if n_diff_input_output > 0: print(f'[Warning] Sample: {n_diff_input_output} output_ids are not the same as the input_ids') outputs = llava_tokenizer.batch_decode(output_ids[:, input_token_len:], skip_special_tokens=True)[0] while True: cur_len = len(outputs) outputs = outputs.strip() for pattern in ['###', 'Assistant:', 'Response:']: if outputs.startswith(pattern): outputs = outputs[len(pattern):].strip() if len(outputs) == cur_len: break try: index = outputs.index(conv.sep) except ValueError: outputs += conv.sep index = outputs.index(conv.sep) outputs = outputs[:index].strip() print(outputs) return outputs #--------------------------------- #++++++++ Gradio ++++++++++ #--------------------------------- SHARED_UI_WARNING = f'''### [NOTE] It is possible that you are waiting in a lengthy queue. You can duplicate and use it with a paid private GPU. Duplicate Space Alternatively, you can also use the demo on our [project page](https://minigpt-4.github.io). ''' def gradio_reset(chat_state, img_list): """Resets the chat state and image list.""" if chat_state is not None: chat_state.messages = [] if img_list is not None: img_list = [] return None, gr.update(value=None, interactive=True), gr.update(placeholder='Please upload your medical image first', interactive=False), gr.update(value="Upload & Start Analysis", interactive=True), chat_state, img_list def upload_img(gr_img, text_input, chat_state): """Handles image upload.""" if gr_img is None: return None, None, gr.update(interactive=True), chat_state, None img_list = [gr_img] return gr.update(interactive=False), gr.update(interactive=True, placeholder='Type and press Enter'), gr.update(value="Start Analysis", interactive=False), chat_state, img_list def gradio_ask(user_message, chatbot, chat_state): """Handles user input.""" if not user_message: return gr.update(interactive=True, placeholder='Input should not be empty!'), chatbot, chat_state chatbot = chatbot + [[user_message, None]] return '', chatbot, chat_state @spaces.GPU def gradio_answer(chatbot, chat_state, img_list, llava_model, llava_tokenizer, llava_image_processor, image_token_len, mm_use_im_start_end, max_new_token, temperature, biomedclip_model, biomedclip_processor): """Generates and adds the bot's response to the chatbot using LLaVA""" if not img_list: return chatbot, chat_state, img_list # compute similarity using biomedclip similarity_score = compute_similarity(img_list[0],chatbot[-1][0], biomedclip_model, biomedclip_processor) print(f'Similarity Score is: {similarity_score}') # prepare the input for LLAVA llava_input_text = f"Based on the image and query provided the similarity score is {similarity_score:.3f}. " + chatbot[-1][0] llm_message = eval_llava_model(llava_model, llava_tokenizer, llava_image_processor, img_list[0], llava_input_text, image_token_len, mm_use_im_start_end, max_new_token, temperature) chatbot[-1][1] = llm_message return chatbot, chat_state, img_list title = """

Medical Image Analysis Tool

""" description = """

Upload medical images, ask questions, and receive analysis.

""" examples_list=[ ["./case1.png", "Analyze the X-ray for any abnormalities."], ["./case2.jpg", "What type of disease may be present?"], ["./case1.png","What is the anatomical structure shown here?"] ] # Load models and related resources outside of the Gradio block for loading on startup llava_model, llava_tokenizer, llava_image_processor, image_token_len, mm_use_im_start_end = load_llava_model() biomedclip_model, biomedclip_processor = load_biomedclip_model() with gr.Blocks() as demo: gr.Markdown(title) # gr.Markdown(SHARED_UI_WARNING) gr.Markdown(description) with gr.Row(): with gr.Column(scale=0.5): image = gr.Image(type="pil", label="Medical Image") upload_button = gr.Button(value="Upload & Start Analysis", interactive=True, variant="primary") clear = gr.Button("Restart") max_new_token = gr.Slider( minimum=1, maximum=512, value=128, step=1, interactive=True, label="Max new tokens" ) temperature = gr.Slider( minimum=0.1, maximum=2.0, value=0.3, step=0.1, interactive=True, label="Temperature", ) with gr.Column(): chat_state = gr.State() img_list = gr.State() chatbot = gr.Chatbot(label='Medical Analysis') text_input = gr.Textbox(label='Analysis Query', placeholder='Please upload your medical image first', interactive=False) gr.Examples(examples=examples_list, inputs=[image, text_input]) upload_button.click(upload_img, [image, text_input, chat_state], [image, text_input, upload_button, chat_state, img_list]) text_input.submit(gradio_ask, [text_input, chatbot, chat_state], [text_input, chatbot, chat_state]).then( gradio_answer, [chatbot, chat_state, img_list, llava_model, llava_tokenizer, llava_image_processor, image_token_len, mm_use_im_start_end, max_new_token, temperature, biomedclip_model, biomedclip_processor], [chatbot, chat_state, img_list] ) clear.click(gradio_reset, [chat_state, img_list], [chatbot, image, text_input, upload_button, chat_state, img_list], queue=False) demo.launch()