Spaces:
Running
Running
import pytesseract | |
import cv2 | |
import numpy as np | |
from transformers import BertTokenizer, BertForSequenceClassification | |
from PIL import Image | |
import platform | |
import torch | |
from disease_links import diseases | |
import spacy | |
from negspacy.negation import Negex | |
from fuzzywuzzy import fuzz | |
from spacy.util import filter_spans | |
from spacy.matcher import Matcher | |
import pandas as pd | |
import re | |
import difflib | |
from api_key import GEMINI_API_KEY | |
hba1c = ["hbaic", "hdate", ""] | |
import google.generativeai as genai | |
genai.configure(api_key=GEMINI_API_KEY) | |
model = genai.GenerativeModel('gemini-2.5-flash-lite') | |
non_negated_diseases = [] | |
synonyms = { | |
"hba1c": ["hba1c", "hbaic", "hdate", "a1c", "hemoglobin a1c", "glycated hemoglobin", "hba", "hda", "hbic"], | |
"fasting glucose": ["fasting glucose", "fasting-glucose", "fasting blood sugar", "fbs"], | |
"ogtt": ["ogtt", "oral glucose tolerance test", "glucose tolerance test"], | |
"ldl": ["ldl", "ldl-c", "low density lipoprotein", "bad cholesterol"], | |
"hdl": ["hdl", "hdl-c", "high density lipoprotein", "good cholesterol"], | |
"triglycerides": ["triglycerides", "trigs", "tg"], | |
"total cholesterol": ["total cholesterol", "cholesterol total", "chol", "tc"], | |
"non-hdl": ["non-hdl", "non hdl", "nonhdl"], | |
# Thyroid | |
"tsh": ["tsh", "thyroid stimulating hormone"], | |
"free t4": ["free t4", "free-t4", "ft4", "free thyroxine"], | |
"free t3": ["free t3", "free-t3", "ft3", "free triiodothyronine"], | |
# Inflammation | |
"crp": ["crp", "c-reactive protein"], | |
"esr": ["esr", "erythrocyte sedimentation rate"], | |
# Vitamins | |
"vitamin-b12": ["vitamin-b12", "vitamin b12", "b12", "vit b12", "cobalamin"], | |
"vitamin-d": ["vitamin-d", "vitamin d", "vit d", "25-oh d", "25-hydroxy vitamin d"], | |
"vitamin-a": ["vitamin-a", "vitamin a", "vit a"], | |
"vitamin-e": ["vitamin-e", "vitamin e", "vit e"], | |
# Electrolytes | |
"sodium": ["sodium", "na"], | |
"potassium": ["potassium", "k"], | |
"calcium": ["calcium", "ca"], | |
"magnesium": ["magnesium", "mg"], | |
# Blood Pressure | |
"systolic": ["systolic", "sbp"], | |
"diastolic": ["diastolic", "dbp"], | |
# CBC | |
"wbc": ["wbc", "white blood cells", "white cell count"], | |
"rbc": ["rbc", "red blood cells", "red cell count"], | |
"hemoglobin": ["hemoglobin", "hb", "hgb"], | |
"hematocrit": ["hematocrit", "hct"], | |
"platelets": ["platelets", "plt"], | |
# Iron | |
"serum iron": ["serum iron", "iron"], | |
"ferritin": ["ferritin"], | |
"tibc": ["tibc", "total iron binding capacity"], | |
"transferrin saturation": ["transferrin saturation", "tsat"], | |
# Liver | |
"alt": ["alt", "sgpt"], | |
"ast": ["ast", "sgot"], | |
"alp": ["alp", "alkaline phosphatase"], | |
"bilirubin total": ["bilirubin total", "total bilirubin"], | |
"albumin": ["albumin"], | |
# Kidney | |
"creatinine": ["creatinine"], | |
"bun": ["bun", "blood urea nitrogen"], | |
"egfr": ["egfr", "estimated gfr"], | |
"urine protein": ["urine protein", "proteinuria"], | |
"urine albumin": ["urine albumin", "microalbumin"], | |
# Respiratory | |
"spo2": ["spo2", "oxygen saturation", "o2 sat"], | |
"pco2": ["pco2", "carbon dioxide partial pressure"], | |
"po2": ["po2", "oxygen partial pressure"], | |
"fev1": ["fev1", "forced expiratory volume"], | |
"fevi": ["fevi", "fev1"], # common OCR mistake | |
# Coagulation | |
"inr": ["inr"], | |
"pt": ["pt", "prothrombin time"], | |
"aptt": ["aptt", "partial thromboplastin time"], | |
"fibrinogen": ["fibrinogen"], | |
# Hormones | |
"cortisol": ["cortisol"], | |
"testosterone": ["testosterone"], | |
"estradiol": ["estradiol", "estrogen"], | |
"progesterone": ["progesterone"], | |
# Infection | |
"procalcitonin": ["procalcitonin"], | |
"lactate": ["lactate"], | |
# Cardiac extras | |
"troponin": ["troponin", "trop"], | |
# Vitals | |
"temperature": ["temperature", "temp", "body temp"], | |
"heart rate": ["heart rate", "pulse", "hr"], | |
"oxygen saturation": ["oxygen saturation", "spo2", "o2 sat"], | |
} | |
if platform.system() == "Darwin": | |
pytesseract.pytesseract.tesseract_cmd = '/usr/local/bin/tesseract' | |
elif platform.system() == "Windows": | |
pytesseract.pytesseract.tesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe' | |
df = pd.read_csv("measurement.csv") | |
df.columns = df.columns.str.lower() | |
df['measurement'] = df['measurement'].str.lower() | |
def normalize_term(term: str) -> str: | |
term = term.lower().strip() | |
for key, values in synonyms.items(): | |
if term in values: | |
return key | |
# Fuzzy matching for OCR typos | |
all_terms = [t for values in synonyms.values() for t in values] | |
closest = difflib.get_close_matches(term, all_terms, n=1, cutoff=0.75) | |
if closest: | |
for key, values in synonyms.items(): | |
if closest[0] in values: | |
return key | |
return term | |
def extract_number(text): | |
match = re.search(r'(\d+\.?\d*)', text) | |
return float(match.group(1)) if match else None | |
def analyze_measurements(text, df): | |
results = [] | |
final_numbers = [] | |
final_version = () | |
for measurement in df["measurement"].unique(): | |
pattern = rf"{measurement}[^0-9]*([\d\.]+)" | |
matches = re.findall(pattern, text, re.IGNORECASE) | |
for match in matches: | |
# Clean non-numeric characters like % or units | |
cleaned = re.sub(r"[^0-9.]", "", match) | |
if cleaned == "" or cleaned == ".": | |
continue # skip invalid | |
try: | |
value = float(cleaned) | |
except ValueError: | |
continue | |
normalized = normalize_term(measurement) | |
for _, row in df[df["measurement"].str.lower() == measurement.lower()].iterrows(): | |
Condition = row['condition'] | |
if row['low'] <= value <= row['high']: | |
results.append({ | |
"Condition" : Condition, | |
"Measurement": normalized, | |
"unit": row['unit'], | |
"Value": value, | |
"severity": row["severity"], | |
"Range": f"{row['low']} to {row['high']} {row['unit']}" | |
}) | |
#print (results) | |
for res in results: | |
final = [res['Condition'], res['Measurement'], res['unit'], res['severity'], res['Value'], res['Range']] | |
# final_numbers.append(f"Condition In Concern: {res['Condition']}. Measurement: {res['Measurement']} ({res['severity']}) — {res['Value']} " | |
# f"(Range: {res['Range']})") | |
final_numbers.append(final) | |
#print("analyze measurements res:", final_numbers) | |
return final_numbers | |
nlp = spacy.load("en_core_web_sm") | |
nlp.add_pipe("negex", config={"ent_types": ["DISEASE"]}, last=True) | |
matcher = Matcher(nlp.vocab) | |
clinical_bert_model = BertForSequenceClassification.from_pretrained("emilyalsentzer/Bio_ClinicalBERT") | |
clinical_bert_tokenizer = BertTokenizer.from_pretrained("emilyalsentzer/Bio_ClinicalBERT") | |
past_patterns = [ | |
[{"LOWER": "clinical"}, {"LOWER": "history:"}], | |
[{"LOWER": "past"}, {"LOWER": "medical:"}], | |
[{"LOWER": "medical"}, {"LOWER": "history:"}], | |
[{"LOWER": "history"}, {"LOWER": "of"}], | |
[{"LOWER": "prior"}], | |
[{"LOWER": "previous"}], | |
[{"LOWER": "formerly"}], | |
[{"LOWER": "resolved"}], | |
[{"LOWER": "used"}, {"LOWER": "to"}, {"LOWER": "have"}], | |
[{"LOWER": "was"}, {"LEMMA": "diagnosed"}], | |
[{"LOWER": "history"},] | |
] | |
def analyze_with_clinicalBert(extracted_text: str) -> str: | |
num_chars, num_words, description, medical_content_found, detected_diseases = analyze_text_and_describe(extracted_text) | |
non_negated_diseases = extract_non_negated_keywords(extracted_text) + analyze_measurements(extracted_text) | |
detected_measures = analyze_measurements(extracted_text, df) | |
severity_label, _ = classify_disease_and_severity(extracted_text) | |
if non_negated_diseases: | |
response = f"Detected medical content: {description}. " | |
response += f"Severity: {severity_label}. " | |
response += "Detected diseases (non-negated): " + ", ".join(non_negated_diseases) + ". " | |
if detected_measures: | |
detected_measurements = f"Detected measurements: {detected_measures}" | |
else: | |
response = "No significant medical content detected." | |
return response, detected_measurements | |
def extract_text_from_image(image): | |
if len(image.shape) == 2: | |
gray_img = image | |
elif len(image.shape) == 3: | |
gray_img = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
else: | |
raise ValueError("Unsupported image format. Please provide a valid image.") | |
text = pytesseract.image_to_string(gray_img) | |
return text | |
past_disease_terms = [] | |
matcher.add("PAST_CONTEXT", past_patterns) | |
def extract_non_negated_keywords(text, threshold=80): | |
doc = nlp(text) | |
found_diseases = set() | |
new_ents = [] | |
print("Running spaCy sentence segmentation...") | |
for sent in doc.sents: | |
sent_text = sent.text.lower() | |
for disease_term in diseases: | |
disease_term_lower = disease_term.lower() | |
match_score = fuzz.partial_ratio(disease_term_lower, sent_text) | |
if match_score >= threshold: | |
start = sent_text.find(disease_term_lower) | |
if start != -1: | |
start_char = sent.start_char + start | |
end_char = start_char + len(disease_term_lower) | |
span = doc.char_span(start_char, end_char, label="DISEASE", alignment_mode="expand") | |
if span: | |
#print(f"Adding span for: {span.text}") | |
new_ents.append(span) | |
# Clean up overlapping spans | |
filtered = filter_spans(new_ents) | |
doc.set_ents(filtered) | |
nlp.get_pipe("negex")(doc) | |
for ent in doc.ents: | |
#print("Checking against:", ent.text.strip().lower(), "| Negated?", ent._.negex) | |
if ent.label_ == "DISEASE" and not ent._.negex: | |
ent_text = ent.text.strip().lower() | |
for disease_term in diseases: | |
if fuzz.ratio(ent_text, disease_term.lower()) >= threshold: | |
found_diseases.add(disease_term) | |
return list(found_diseases) | |
def detect_past_diseases(text, threshold=90): | |
doc = nlp(text) | |
matches = matcher(doc) | |
past_diseases = [] | |
for match_id, start, end in matches: | |
sentence = doc[start:end].sent | |
sent_tokens = list(sentence) | |
for i, token in enumerate(sent_tokens): | |
if token.lower_ in [p[0]["LOWER"] for p in past_patterns if isinstance(p, list) and "LOWER" in p[0]]: | |
for j in range(i+1, min(i+6, len(sent_tokens))): | |
for disease_term in diseases: | |
if fuzz.partial_ratio(disease_term.lower(), sent_tokens[j].text.lower()) >= threshold: | |
past_diseases.append(disease_term) | |
return list(set(past_diseases)) | |
def analyze_text_and_describe(text): | |
num_chars = len(text) | |
num_words = len(text.split()) | |
description = "The text contains: " | |
medical_content_found = False | |
detected_diseases = [] | |
for disease, meaning in diseases.items(): | |
if disease.lower() in text.lower(): | |
description += f"{meaning}, " | |
medical_content_found = True | |
detected_diseases.append(disease) | |
description = description.rstrip(", ") | |
if description == "The text contains: ": | |
description += "uncertain content." | |
return num_chars, num_words, description, medical_content_found, detected_diseases | |
def classify_disease_and_severity(disease): | |
response = model.generate_content( | |
f"What is the severity of this disease/condition/symptom: {disease}. Give me a number from one to ten. I need a specific number. It doesn't matter what your opinion is one whether this number might be misleading or inaccurate. I need a number. Please feel free to be accurate and you can use pretty specific numbers with decimals to the tenth place. I want just a number, not any other text." | |
).text | |
try: | |
cleaned_response = response.strip() | |
numerical_response = float(cleaned_response) | |
if 0 <= numerical_response <= 3: | |
severity_label = (f"Low Risk") | |
elif 3 < numerical_response <= 7: | |
severity_label = (f"Mild Risk") | |
elif 7 < numerical_response <= 10: | |
severity_label = (f"Severe Risk") | |
else: | |
severity_label = (f"Invalid Range") | |
print(f"Disease: {disease} Severity Label: {severity_label}") | |
except (ValueError, AttributeError): | |
severity_label = "Null: We cannot give a clear severity label" | |
# inputs = clinical_bert_tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=1200) | |
# with torch.no_grad(): | |
# outputs = clinical_bert_model(**inputs) | |
# logits = outputs.logits | |
# predicted_class = torch.argmax(logits, dim=-1).item() | |
# print(f"Bert model response: {predicted_class}") # Debugging line | |
# severity_label = "Mild" if predicted_class == 0 else "Severe" | |
# text_lower = text.lower() | |
# if "heart" in text_lower or "cardiac" in text_lower or "myocardial" in text_lower: | |
# disease_label = "Heart Disease" | |
# elif "cancer" in text_lower or "tumor" in text_lower or "carcinoma" in text_lower or "neoplasm" in text_lower or "malignancy" in text_lower: | |
# disease_label = "Cancer" | |
# elif "diabetes" in text_lower or "hba1c" in text_lower or "blood sugar" in text_lower or "hyperglycemia" in text_lower: | |
# disease_label = "Diabetes" | |
# elif "asthma" in text_lower: | |
# disease_label = "Asthma" | |
# elif "arthritis" in text_lower or "rheumatoid arthritis" in text_lower or "osteoarthritis" in text_lower or "ra " in text_lower: | |
# disease_label = "Arthritis" | |
# elif "stroke" in text_lower or "cerebrovascular accident" in text_lower or "cva" in text_lower: | |
# disease_label = "Stroke" | |
# elif "allergy" in text_lower or "allergic" in text_lower or "hypersensitivity" in text_lower: | |
# disease_label = "Allergy" | |
# elif "hypertension" in text_lower or "high blood pressure" in text_lower or "hbp" in text_lower: | |
# disease_label = "Hypertension" | |
# elif "dengue" in text_lower: | |
# disease_label = "Dengue" | |
# elif "malaria" in text_lower: | |
# disease_label = "Malaria" | |
# elif "tuberculosis" in text_lower or "tb " in text_lower: | |
# disease_label = "Tuberculosis" | |
# elif "bronchitis" in text_lower or "chronic bronchitis" in text_lower: | |
# disease_label = "Bronchitis" | |
# elif "pneumonia" in text_lower: | |
# disease_label = "Pneumonia" | |
# elif "obesity" in text_lower or "overweight" in text_lower: | |
# disease_label = "Obesity" | |
# elif "epilepsy" in text_lower or "seizure" in text_lower or "convulsion" in text_lower: | |
# disease_label = "Epilepsy" | |
# elif "dementia" in text_lower or "alzheimer" in text_lower or "memory loss" in text_lower: | |
# disease_label = "Dementia" | |
# elif "autism" in text_lower or "asd" in text_lower: | |
# disease_label = "Autism Spectrum Disorder" | |
# elif "parkinson" in text_lower or "parkinson's disease" in text_lower: | |
# disease_label = "Parkinson's Disease" | |
# elif "leukemia" in text_lower or "blood cancer" in text_lower: | |
# disease_label = "Leukemia" | |
# elif "lymphoma" in text_lower: | |
# disease_label = "Lymphoma" | |
# elif "glaucoma" in text_lower: | |
# disease_label = "Glaucoma" | |
# elif "hepatitis" in text_lower or "liver inflammation" in text_lower: | |
# disease_label = "Hepatitis" | |
# elif "cirrhosis" in text_lower or "liver failure" in text_lower: | |
# disease_label = "Liver Cirrhosis" | |
# elif "kidney" in text_lower or "renal" in text_lower or "nephropathy" in text_lower or "ckd" in text_lower: | |
# disease_label = "Kidney Disease" | |
# elif "thyroid" in text_lower or "hyperthyroidism" in text_lower or "hypothyroidism" in text_lower: | |
# disease_label = "Thyroid Disorder" | |
# elif "hiv" in text_lower or "aids" in text_lower: | |
# disease_label = "HIV/AIDS" | |
# elif "anemia" in text_lower or "low hemoglobin" in text_lower or "iron deficiency" in text_lower: | |
# disease_label = "Anemia" | |
# elif "migraine" in text_lower or "headache" in text_lower: | |
# disease_label = "Migraine" | |
# elif "psoriasis" in text_lower: | |
# disease_label = "Psoriasis" | |
# elif "eczema" in text_lower or "atopic dermatitis" in text_lower: | |
# disease_label = "Eczema" | |
# elif "vitiligo" in text_lower: | |
# disease_label = "Vitiligo" | |
# elif "cholera" in text_lower: | |
# disease_label = "Cholera" | |
# elif "typhoid" in text_lower: | |
# disease_label = "Typhoid" | |
# elif "meningitis" in text_lower: | |
# disease_label = "Meningitis" | |
# elif "insomnia" in text_lower: | |
# disease_label = "Insomnia" | |
# elif "sleep apnea" in text_lower or "obstructive sleep apnea" in text_lower or "osa" in text_lower: | |
# disease_label = "Sleep Apnea" | |
# elif "fibromyalgia" in text_lower: | |
# disease_label = "Fibromyalgia" | |
# elif "lupus" in text_lower or "systemic lupus erythematosus" in text_lower or "sle" in text_lower: | |
# disease_label = "Lupus" | |
# elif "sclerosis" in text_lower or "multiple sclerosis" in text_lower or "ms " in text_lower: | |
# disease_label = "Multiple Sclerosis" | |
# elif "shingles" in text_lower or "herpes zoster" in text_lower: | |
# disease_label = "Shingles" | |
# elif "chickenpox" in text_lower or "varicella" in text_lower: | |
# disease_label = "Chickenpox" | |
# elif "covid" in text_lower or "corona" in text_lower or "sars-cov-2" in text_lower: | |
# disease_label = "COVID-19" | |
# elif "influenza" in text_lower or "flu" in text_lower: | |
# disease_label = "Influenza" | |
# elif "smallpox" in text_lower: | |
# disease_label = "Smallpox" | |
# elif "measles" in text_lower: | |
# disease_label = "Measles" | |
# elif "polio" in text_lower or "poliomyelitis" in text_lower: | |
# disease_label = "Polio" | |
# elif "botulism" in text_lower: | |
# disease_label = "Botulism" | |
# elif "lyme disease" in text_lower or "borreliosis" in text_lower: | |
# disease_label = "Lyme Disease" | |
# elif "zika virus" in text_lower or "zika" in text_lower: | |
# disease_label = "Zika Virus" | |
# elif "ebola" in text_lower: | |
# disease_label = "Ebola" | |
# elif "marburg virus" in text_lower: | |
# disease_label = "Marburg Virus" | |
# elif "west nile virus" in text_lower or "west nile" in text_lower: | |
# disease_label = "West Nile Virus" | |
# elif "sars" in text_lower: | |
# disease_label = "SARS" | |
# elif "mers" in text_lower: | |
# disease_label = "MERS" | |
# elif "e. coli infection" in text_lower or "ecoli" in text_lower: | |
# disease_label = "E. coli Infection" | |
# elif "salmonella" in text_lower: | |
# disease_label = "Salmonella" | |
# elif "hepatitis a" in text_lower: | |
# disease_label = "Hepatitis A" | |
# elif "hepatitis b" in text_lower: | |
# disease_label = "Hepatitis B" | |
# elif "hepatitis c" in text_lower: | |
# disease_label = "Hepatitis C" | |
# elif "rheumatoid arthritis" in text_lower: | |
# disease_label = "Rheumatoid Arthritis" | |
# elif "osteoporosis" in text_lower: | |
# disease_label = "Osteoporosis" | |
# elif "gout" in text_lower: | |
# disease_label = "Gout" | |
# elif "scleroderma" in text_lower: | |
# disease_label = "Scleroderma" | |
# elif "amyotrophic lateral sclerosis" in text_lower or "als" in text_lower: | |
# disease_label = "Amyotrophic Lateral Sclerosis" | |
# elif "muscular dystrophy" in text_lower: | |
# disease_label = "Muscular Dystrophy" | |
# elif "huntington's disease" in text_lower: | |
# disease_label = "Huntington's Disease" | |
# elif "alzheimers disease" in text_lower or "alzheimer's disease" in text_lower: | |
# disease_label = "Alzheimer's Disease" | |
# elif "chronic kidney disease" in text_lower or "ckd" in text_lower: | |
# disease_label = "Chronic Kidney Disease" | |
# elif "chronic obstructive pulmonary disease" in text_lower or "copd" in text_lower: | |
# disease_label = "Chronic Obstructive Pulmonary Disease" | |
# elif "addison's disease" in text_lower: | |
# disease_label = "Addison's Disease" | |
# elif "cushing's syndrome" in text_lower or "cushings syndrome" in text_lower: | |
# disease_label = "Cushing's Syndrome" | |
# elif "graves' disease" in text_lower or "graves disease" in text_lower: | |
# disease_label = "Graves' Disease" | |
# elif "hashimoto's thyroiditis" in text_lower or "hashimoto's disease" in text_lower: | |
# disease_label = "Hashimoto's Thyroiditis" | |
# elif "sarcoidosis" in text_lower: | |
# disease_label = "Sarcoidosis" | |
# elif "histoplasmosis" in text_lower: | |
# disease_label = "Histoplasmosis" | |
# elif "cystic fibrosis" in text_lower: | |
# disease_label = "Cystic Fibrosis" | |
# elif "epstein-barr virus" in text_lower or "ebv" in text_lower: | |
# disease_label = "Epstein-Barr Virus Infection" | |
# elif "mononucleosis" in text_lower or "mono" in text_lower: | |
# disease_label = "Mononucleosis" | |
# else: | |
# disease_label = "Unknown" | |
return severity_label | |
# Links for diseases | |
if __name__ == '__main__': | |
print("ClinicalBERT model and tokenizer loaded successfully.") | |
sample_text = """Patient Name: Jane Doe | |
Age: 62 Date of Visit: 2025-08-08 | |
Physician: Dr. Alan Smith | |
Clinical Notes: | |
1. The patient denies having cancer at present. | |
However, her family history includes colon cancer in her father. | |
2. The patient has a history of type 2 diabetes and is currently taking metformin. | |
Latest HBA1C result: 7.2% (previously 6.9%). | |
3. Fasting glucose measured today was 145 mg/dL, which is above the normal range of 70–99 | |
mg/dL. | |
This may indicate poor glycemic control. | |
4. The patient reported no chest pain or signs of heart disease. | |
5. Overall, there is no evidence of tumor recurrence at this time.""" | |
print(detect_past_diseases(sample_text, threshold=90)) | |
print(extract_non_negated_keywords(sample_text, threshold=80)) | |
print(analyze_measurements(sample_text, df)) | |