File size: 6,926 Bytes
6b29344
 
 
 
 
 
65aa5a5
 
 
 
fe61d2d
 
65aa5a5
d0726f5
65aa5a5
6b29344
 
 
6555fdc
 
dfc02f9
 
 
 
 
6b29344
d0726f5
 
 
6555fdc
 
d0726f5
6b29344
fe61d2d
6b29344
 
d0726f5
6b29344
d0726f5
 
 
5d7bd20
d0726f5
 
 
 
 
 
fe61d2d
6b29344
 
d0726f5
6b29344
 
 
 
dfc02f9
 
 
 
 
 
6b29344
 
 
 
2ebe745
678ce59
 
6b29344
 
 
 
 
 
 
 
d1e0ac0
 
dff880e
d1e0ac0
6b29344
 
 
 
d1e0ac0
6b29344
d1e0ac0
6b29344
fe61d2d
d25cc99
fe61d2d
d25cc99
 
 
 
 
 
 
 
 
 
 
6b29344
fe61d2d
6b29344
 
 
 
d1e0ac0
 
 
6b29344
 
d1e0ac0
 
 
01e6a62
 
d1e0ac0
 
 
6b29344
 
d1e0ac0
 
 
6b29344
 
d25cc99
 
 
 
 
 
 
 
 
 
 
 
 
6b29344
 
 
 
 
 
 
 
 
 
 
 
 
 
 
01e6a62
6b29344
 
 
01e6a62
6b29344
 
 
 
01e6a62
6b29344
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
import os
import torch
from huggingface_hub import snapshot_download
from transformers import AutoTokenizer, AutoModelForCausalLM
import gradio as gr

# ——— CONFIG ———
REPO_ID = "CodCodingCode/llama-3.1-8b-clinical"
SUBFOLDER = "checkpoint-45000"
HF_TOKEN = os.getenv("HUGGINGFACE_HUB_TOKEN")
if not HF_TOKEN:
    raise RuntimeError("Missing HUGGINGFACE_HUB_TOKEN in env")

# ——— 1) Download the full repo ———
local_cache = snapshot_download(
    repo_id=REPO_ID,
    token=HF_TOKEN,
)
print("[DEBUG] snapshot_download → local_cache:", local_cache)
import pathlib

print(
    "[DEBUG] MODEL root contents:",
    list(pathlib.Path(local_cache).glob(f"{SUBFOLDER}/*")),
)

# ——— 2) Repo root contains tokenizer.json; model shards live in the checkpoint subfolder ———
MODEL_DIR = local_cache
MODEL_SUBFOLDER = SUBFOLDER
print("[DEBUG] MODEL_DIR:", MODEL_DIR)
print("[DEBUG] MODEL_DIR files:", os.listdir(MODEL_DIR))
print("[DEBUG] Checkpoint files:", os.listdir(os.path.join(MODEL_DIR, MODEL_SUBFOLDER)))

# ——— 3) Load tokenizer & model from disk ———
tokenizer = AutoTokenizer.from_pretrained(
    MODEL_DIR,
    use_fast=True,
)
print("[DEBUG] Loaded fast tokenizer object:", tokenizer, "type:", type(tokenizer))
# Confirm tokenizer files are present
import os

print("[DEBUG] Files in MODEL_DIR for tokenizer:", os.listdir(MODEL_DIR))
# Inspect tokenizer's initialization arguments
try:
    print("[DEBUG] Tokenizer init_kwargs:", tokenizer.init_kwargs)
except AttributeError:
    print("[DEBUG] No init_kwargs attribute on tokenizer.")

model = AutoModelForCausalLM.from_pretrained(
    MODEL_DIR,
    subfolder=MODEL_SUBFOLDER,
    device_map="auto",
    torch_dtype=torch.float16,
)
model.eval()
print(
    "[DEBUG] Loaded model object:",
    model.__class__.__name__,
    "device:",
    next(model.parameters()).device,
)


# === Role Agent with instruction/input/output format ===
class RoleAgent:
    def __init__(self, role_instruction, tokenizer, model):
        self.tokenizer = tokenizer
        self.model = model
        self.role_instruction = role_instruction

    def act(self, input_text):
        prompt = (
            f"Instruction: {self.role_instruction}\n"
            f"Input: {input_text}\n"
            f"Output:"
        )
        encoding = self.tokenizer(prompt, return_tensors="pt")
        inputs = {k: v.to(self.model.device) for k, v in encoding.items()}

        outputs = self.model.generate(
            **inputs,
            max_new_tokens=256,
            do_sample=True,
            temperature=0.7,
            pad_token_id=self.tokenizer.eos_token_id,
        )
        response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)

        thinking = ""
        print(f"[RESPONSE]: {response}")
        answer = response
        if "Output:" in response:
            # Split on the last occurrence of 'Output:' in case it's repeated
            answer = response.rsplit("Output:", 1)[-1].strip()
        else:
            # Fallback: if thinking/answer/end tags exist, use previous logic
            tags = ("THINKING:", "ANSWER:", "END")
            if all(tag in response for tag in tags):
                print("[FIX] tagged response detected:", response)
                block = response.split("THINKING:", 1)[1].split("END", 1)[0]
                thinking = block.split("ANSWER:", 1)[0].strip()
                answer = block.split("ANSWER:", 1)[1].strip()

        return {"thinking": thinking, "output": answer}


# === Agents ===
summarizer = RoleAgent(
    role_instruction="You are a clinical summarizer trained to extract structured vignettes from doctor–patient dialogues.",
    tokenizer=tokenizer,
    model=model,
)
diagnoser = RoleAgent(
    role_instruction="You are a board-certified diagnostician that diagnoses patients.",
    tokenizer=tokenizer,
    model=model,
)
questioner = RoleAgent(
    role_instruction="You are a physician asking questions to diagnose a patient.",
    tokenizer=tokenizer,
    model=model,
)
treatment_agent = RoleAgent(
    role_instruction="You are a board-certified clinician. Based on the diagnosis and patient vignette provided below, suggest a concise treatment plan that could realistically be initiated by a primary care physician or psychiatrist.",
    tokenizer=tokenizer,
    model=model,
)

"""[DEBUG] prompt: Instruction: You are a clinical summarizer trained to extract structured vignettes from doctor–patient dialogues.
Input: Doctor: What brings you in today?
Patient: I am a male. I am 15. My knee hurts. What may be the issue with my knee?

Previous Vignette: 
Output:
Instruction: You are a clinical summarizer trained to extract structured vignettes from doctor–patient dialogues.
Input: Doctor: What brings you in today?
Patient: I am a male. I am 15. My knee hurts. What may be the issue with my knee?

Previous Vignette: 
Output: The patient is a 15-year-old male presenting with knee pain."""


# === Inference State ===
conversation_history = []
summary = ""
diagnosis = ""


# === Gradio Inference ===
def simulate_interaction(user_input, iterations=1):
    history = [f"Doctor: What brings you in today?", f"Patient: {user_input}"]
    summary, diagnosis = "", ""

    for i in range(iterations):
        # Summarize
        sum_in = "\n".join(history) + f"\nPrevious Vignette: {summary}"
        sum_out = summarizer.act(sum_in)
        summary = sum_out["output"]

        # Diagnose
        diag_out = diagnoser.act(summary)
        diagnosis = diag_out["output"]

        # Question
        q_in = f"Vignette: {summary}\nCurrent Estimated Diagnosis: {diag_out['thinking']} {diagnosis}"
        q_out = questioner.act(q_in)
        history.append(f"Doctor: {q_out['output']}")
        history.append("Patient: (awaiting response)")

        # Treatment
        treatment_out = treatment_agent.act(
            f"Diagnosis: {diagnosis}\nVignette: {summary}"
        )

        return {
            "summary": sum_out,
            "diagnosis": diag_out,
            "question": q_out,
            "treatment": treatment_out,
            "conversation": "\n".join(history),
        }


# === Gradio UI ===
def ui_fn(user_input):
    res = simulate_interaction(user_input)
    return f"""📋 Vignette Summary:
💭 THINKING: {res['summary']['thinking']}
ANSWER: {res['summary']['output']}

🩺 Diagnosis:
💭 THINKING: {res['diagnosis']['thinking']}
ANSWER: {res['diagnosis']['output']}
T
❓ Follow-up Question:
💭 THINKING: {res['question']['thinking']}
ANSWER: {res['question']['output']}

💊 Treatment Plan:
{res['treatment']['output']}

💬 Conversation:
{res['conversation']}
"""


demo = gr.Interface(
    fn=ui_fn,
    inputs=gr.Textbox(label="Patient Response"),
    outputs=gr.Textbox(label="Doctor Simulation Output"),
    title="🧠 AI Doctor Multi-Agent Reasoning",
)

if __name__ == "__main__":
    demo.launch(share=True)