import gradio as gr import os import json import asyncio import concurrent.futures from openai import AzureOpenAI from trialgpt_retrieval.retriever import retrieve_trials_gpu, create_retriever_session from nltk.tokenize import sent_tokenize import nltk import hashlib import time from functools import lru_cache from threading import Lock # Download required NLTK data try: nltk.download('punkt') nltk.download('punkt_tab') except Exception as e: print(f"Warning: Could not download NLTK data: {e}") #set OPENAI_ENDPOINT and OPENAI_API_KEY os.environ["OPENAI_ENDPOINT"] = "https://salma-mc65g33q-eastus2.services.ai.azure.com/" os.environ["OPENAI_API_KEY"] = "1rGhyBRu7xrTwIUwySekgzJcFnURGiYlqlfVyjaTDebD22N2drDWJQQJ99BFACHYHv6XJ3w3AAAAACOGu5LU" # Check environment variables if not os.getenv("OPENAI_ENDPOINT") or not os.getenv("OPENAI_API_KEY"): print("Warning: OPENAI_ENDPOINT and OPENAI_API_KEY environment variables are required.") # Initialize the Azure OpenAI client client = AzureOpenAI( api_version="2023-09-01-preview", azure_endpoint=os.getenv("OPENAI_ENDPOINT"), api_key=os.getenv("OPENAI_API_KEY"), ) # Global cache and locks for thread safety cache = {} cache_lock = Lock() MAX_CACHE_SIZE = 1000 # Initialize persistent retriever session (major speedup) print("Initializing GPU retriever session...") retriever_session = create_retriever_session(use_gpu=True) print("Retriever session ready!") def get_cache_key(text): """Generate cache key from text.""" return hashlib.md5(text.encode()).hexdigest() def get_from_cache(key): """Thread-safe cache retrieval.""" with cache_lock: return cache.get(key) def set_cache(key, value): """Thread-safe cache storage with size limit.""" with cache_lock: if len(cache) >= MAX_CACHE_SIZE: # Remove oldest entries (simple FIFO) oldest_keys = list(cache.keys())[:len(cache)//2] for old_key in oldest_keys: del cache[old_key] cache[key] = value @lru_cache(maxsize=100) def generate_keywords_cached(note_hash, note): """Cached keyword generation.""" system = 'You are a helpful assistant and your task is to help search relevant clinical trials for a given patient description. Please first summarize the main medical problems of the patient. Then generate up to 32 key conditions for searching relevant clinical trials for this patient. The key condition list should be ranked by priority. Please output only a JSON dict formatted as Dict{{"summary": Str(summary), "conditions": List[Str(condition)]}}.' prompt = f"Here is the patient description: \\n{note}\\n\\nJSON output:" messages = [ {"role": "system", "content": system}, {"role": "user", "content": prompt} ] try: response = client.chat.completions.create( model="gpt-4", messages=messages, temperature=0, max_tokens=2048, ) output = response.choices[0].message.content if output is None: return "Error: Received empty response from the model.", [] try: start = output.find('{') end = output.rfind('}') if start != -1 and end > start: json_output = output[start:end+1] data = json.loads(json_output) return data["summary"], data["conditions"] else: return "Error: No JSON found in model response.", [] except (json.JSONDecodeError, KeyError) as e: return f"Error parsing model output: {e}", [] except Exception as e: return f"Error calling OpenAI API: {e}", [] def generate_keywords(note): """Wrapper for cached keyword generation.""" note_hash = get_cache_key(note) return generate_keywords_cached(note_hash, note) def parse_criteria(criteria): output = "" criteria = criteria.split("\\n\\n") idx = 0 for criterion in criteria: criterion = criterion.strip() if "inclusion criteria" in criterion.lower() or "exclusion criteria" in criterion.lower(): continue if len(criterion) < 5: continue output += f"{idx}. {criterion}\\n" idx += 1 return output def print_trial(trial_info: dict, inc_exc: str) -> str: trial = f"Title: {trial_info.get('title', 'N/A')}\\n" metadata = trial_info.get('metadata', {}) diseases_list = metadata.get('diseases_list', []) drugs_list = metadata.get('drugs_list', []) trial += f"Target diseases: {', '.join(diseases_list) if diseases_list else 'N/A'}\\n" trial += f"Interventions: {', '.join(drugs_list) if drugs_list else 'N/A'}\\n" trial += f"Summary: {trial_info.get('text', 'N/A')}\\n" if inc_exc == "inclusion": inclusion_criteria = metadata.get('inclusion_criteria', 'No inclusion criteria available') trial += "Inclusion criteria:\\n %s\\n" % parse_criteria(inclusion_criteria) elif inc_exc == "exclusion": exclusion_criteria = metadata.get('exclusion_criteria', 'No exclusion criteria available') trial += "Exclusion criteria:\\n %s\\n" % parse_criteria(exclusion_criteria) return trial def get_matching_prompt(trial_info: dict, inc_exc: str, patient: str) -> tuple[str, str]: """Output the prompt.""" prompt = f"You are a helpful assistant for clinical trial recruitment. Your task is to compare a given patient note and the {inc_exc} criteria of a clinical trial to determine the patient's eligibility at the criterion level.\\n" if inc_exc == "inclusion": prompt += "The factors that allow someone to participate in a clinical study are called inclusion criteria. They are based on characteristics such as age, gender, the type and stage of a disease, previous treatment history, and other medical conditions.\\n" elif inc_exc == "exclusion": prompt += "The factors that disqualify someone from participating are called exclusion criteria. They are based on characteristics such as age, gender, the type and stage of a disease, previous treatment history, and other medical conditions.\\n" prompt += f"You should check the {inc_exc} criteria one-by-one, and output the following three elements for each criterion:\\n" prompt += f"\\tElement 1. For each {inc_exc} criterion, briefly generate your reasoning process: First, judge whether the criterion is not applicable (not very common), where the patient does not meet the premise of the criterion. Then, check if the patient note contains direct evidence. If so, judge whether the patient meets or does not meet the criterion. If there is no direct evidence, try to infer from existing evidence, and answer one question: If the criterion is true, is it possible that a good patient note will miss such information? If impossible, then you can assume that the criterion is not true. Otherwise, there is not enough information.\\n" prompt += f"\\tElement 2. If there is relevant information, you must generate a list of relevant sentence IDs in the patient note. If there is no relevant information, you must annotate an empty list.\\n" prompt += f"\\tElement 3. Classify the patient eligibility for this specific {inc_exc} criterion: " if inc_exc == "inclusion": prompt += 'the label must be chosen from {"not applicable", "not enough information", "included", "not included"}. "not applicable" should only be used for criteria that are not applicable to the patient. "not enough information" should be used where the patient note does not contain sufficient information for making the classification. Try to use as less "not enough information" as possible because if the note does not mention a medically important fact, you can assume that the fact is not true for the patient. "included" denotes that the patient meets the inclusion criterion, while "not included" means the reverse.\\n' elif inc_exc == "exclusion": prompt += 'the label must be chosen from {"not applicable", "not enough information", "excluded", "not excluded"}. "not applicable" should only be used for criteria that are not applicable to the patient. "not enough information" should be used where the patient note does not contain sufficient information for making the classification. Try to use as less "not enough information" as possible because if the note does not mention a medically important fact, you can assume that the fact is not true for the patient. "excluded" denotes that the patient meets the exclusion criterion and should be excluded in the trial, while "not excluded" means the reverse.\\n' prompt += "You should output only a JSON dict exactly formatted as: dict{str(criterion_number): list[str(element_1_brief_reasoning), list[int(element_2_sentence_id)], str(element_3_eligibility_label)]}." user_prompt = f"Here is the patient note, each sentence is led by a sentence_id:\\n{patient}\\n\\n" user_prompt += f"Here is the clinical trial:\\n{print_trial(trial_info, inc_exc)}\\n\\n" user_prompt += f"Plain JSON output:" return prompt, user_prompt def trialgpt_matching_single(trial: dict, patient: str, inc_exc: str, model: str): """Single matching call for parallel processing.""" cache_key = get_cache_key(f"{trial.get('_id', '')}-{inc_exc}-{get_cache_key(patient)}") cached_result = get_from_cache(cache_key) if cached_result: return inc_exc, cached_result system_prompt, user_prompt = get_matching_prompt(trial, inc_exc, patient) messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ] try: response = client.chat.completions.create( model=model, messages=messages, temperature=0, ) message = response.choices[0].message.content.strip() start = message.find('{') end = message.rfind('}') if start != -1 and end > start: message = message[start:end+1] result = json.loads(message) set_cache(cache_key, result) return inc_exc, result except (json.JSONDecodeError, TypeError, Exception) as e: result = {"error": f"Failed to parse model output: {e}", "raw": message if 'message' in locals() else "No response"} return inc_exc, result def trialgpt_matching_parallel(trial: dict, patient: str, model: str): """Parallel processing of inclusion and exclusion criteria.""" with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: futures = [ executor.submit(trialgpt_matching_single, trial, patient, "inclusion", model), executor.submit(trialgpt_matching_single, trial, patient, "exclusion", model) ] results = {} for future in concurrent.futures.as_completed(futures): try: inc_exc, result = future.result(timeout=30) # 30 second timeout results[inc_exc] = result except concurrent.futures.TimeoutError: print(f"Timeout for trial matching") results[inc_exc] = {"error": "Timeout"} except Exception as e: print(f"Error in parallel matching: {e}") results[inc_exc] = {"error": str(e)} return results # Load corpus once at startup print("Loading corpus data...") def load_corpus(corpus="sigir"): corpus_path = f"dataset/{corpus}/corpus.jsonl" corpus_map = {} with open(corpus_path, "r") as f: for line in f: entry = json.loads(line) corpus_map[entry["_id"]] = entry return corpus_map corpus_data = load_corpus() print(f"Corpus loaded with {len(corpus_data)} trials") # --- Optimized Trial Ranking --- eps = 1e-9 def get_matching_score(matching): included, not_inc, na_inc, no_info_inc = 0, 0, 0, 0 excluded, not_exc, na_exc, no_info_exc = 0, 0, 0, 0 if "inclusion" in matching: for criteria, info in matching["inclusion"].items(): if len(info) == 3: if info[2] == "included": included += 1 elif info[2] == "not included": not_inc += 1 elif info[2] == "not applicable": na_inc += 1 elif info[2] == "not enough information": no_info_inc += 1 if "exclusion" in matching: for criteria, info in matching["exclusion"].items(): if len(info) == 3: if info[2] == "excluded": excluded += 1 elif info[2] == "not excluded": not_exc += 1 elif info[2] == "not applicable": na_exc += 1 elif info[2] == "not enough information": no_info_exc += 1 score = 0 if (included + not_inc + no_info_inc) > 0: score += included / (included + not_inc + no_info_inc + eps) if not_inc > 0: score -= 1 if excluded > 0: score -= 1 return score def get_clinical_trials(patient_details): start_time = time.time() # Check environment variables if not os.getenv("OPENAI_ENDPOINT") or not os.getenv("OPENAI_API_KEY"): return "**Error:** Please set OPENAI_ENDPOINT and OPENAI_API_KEY environment variables before using this app." if not patient_details or patient_details.strip() == "": return "**Error:** Please enter patient details." try: # Step 1: Generate keywords (cached) keyword_start = time.time() summary, conditions = generate_keywords(patient_details) keyword_time = time.time() - keyword_start if isinstance(summary, str) and summary.startswith("Error"): return f"**Error:** {summary}" if not conditions: return "## No Conditions Identified\n\nUnable to identify medical conditions from the patient description." # Step 2: Fast GPU-accelerated trial retrieval retrieval_start = time.time() retrieved_nctids = retrieve_trials_gpu( conditions, corpus="sigir", top_k=5, use_gpu=True, retriever=retriever_session # Use persistent session ) retrieval_time = time.time() - retrieval_start if not retrieved_nctids: return "## No Matching Clinical Trials Found\n\nNo clinical trials were found matching the patient's conditions." # Step 3: Preprocess patient data once processing_start = time.time() sents = sent_tokenize(patient_details) sents.append("The patient will provide informed consent, and will comply with the trial protocol without any practical issues.") patient_processed = "\\n".join([f"{idx}. {sent}" for idx, sent in enumerate(sents)]) processing_time = time.time() - processing_start # Step 4: Parallel trial matching matching_start = time.time() results = [] # Process trials in parallel batches with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: future_to_nctid = { executor.submit( trialgpt_matching_parallel, corpus_data[nctid], patient_processed, "gpt-4" ): nctid for nctid in retrieved_nctids[:5] if nctid in corpus_data } for future in concurrent.futures.as_completed(future_to_nctid): nctid = future_to_nctid[future] try: matching_result = future.result(timeout=60) # 60 second timeout per trial score = get_matching_score(matching_result) title = corpus_data[nctid].get('title', 'Unknown Title') results.append({"title": title, "nctid": nctid, "score": score}) except Exception as e: print(f"Warning: Error processing trial {nctid}: {e}") continue matching_time = time.time() - matching_start # Step 5: Sort and format results results.sort(key=lambda x: x["score"], reverse=True) total_time = time.time() - start_time # Format output output_str = f"## Matching Clinical Trials\n\n" output_str += f"*Processing time: {total_time:.1f}s (Keywords: {keyword_time:.1f}s, Retrieval: {retrieval_time:.1f}s, Matching: {matching_time:.1f}s)*\n\n" output_str += "| Rank | Trial | NCT ID | Summary | Score | Link |\n" output_str += "|------|-------|--------|---------|-------|------|\n" for rank, res in enumerate(results): trial_info = corpus_data.get(res['nctid'], {}) metadata = trial_info.get('metadata', {}) brief_summary = metadata.get('brief_summary', 'No summary available') if len(brief_summary) > 100: brief_summary = brief_summary[:97] + "..." colors = ["🟢", "🟡", "🟠", "🔴", "⚫"] color = colors[min(rank, len(colors)-1)] ct_link = f"https://clinicaltrials.gov/study/{res['nctid']}" output_str += f"| {color} {rank+1} | {res['title']} | {res['nctid']} | {brief_summary} | {res['score']:.2f} | [View Trial]({ct_link}) |\n" return output_str except Exception as e: return f"**Error:** An unexpected error occurred: {str(e)}" if __name__ == "__main__": def chat_function(message, history): return get_clinical_trials(message) # Create optimized chatbot with simple, reliable CSS with gr.Blocks(theme=gr.themes.Soft(), fill_height=False) as demo: # Add simple CSS for styling with black text gr.HTML("""