|
""" |
|
Core logic for the WellBe+ multi‑context AI assistant. |
|
This module handles MCP server instantiation, model selection, and single‑shot |
|
question answering. It is intentionally agnostic of any UI layer so that it can |
|
be re‑used from a command‑line script, a notebook, or the Gradio front‑end. |
|
""" |
|
|
|
from __future__ import annotations |
|
|
|
import asyncio |
|
from copy import deepcopy |
|
from typing import Tuple, Union, Optional |
|
import os |
|
import openai |
|
from agents import Agent, Runner |
|
from agents.extensions.models.litellm_model import LitellmModel |
|
from agents.mcp.server import MCPServerStreamableHttp, MCPServerSse |
|
|
|
from config import PROMPT_TEMPLATE, MCP_CONFIGS |
|
|
|
__all__ = [ |
|
"initialize_mcp_servers", |
|
"select_model", |
|
"answer_question", |
|
"answer_sync", |
|
] |
|
|
|
|
|
_healthcare_server: Optional[Union[MCPServerSse, str]] = None |
|
_whoop_server: Optional[Union[MCPServerSse, str]] = None |
|
|
|
|
|
def initialize_mcp_servers(whoop_email: str, whoop_password: str) -> None: |
|
"""Initialize and cache the MCP servers (once only).""" |
|
global _healthcare_server, _whoop_server |
|
if _healthcare_server is None: |
|
cfg = deepcopy(MCP_CONFIGS) |
|
os.environ["WHOOP_EMAIL"] = whoop_email |
|
os.environ["WHOOP_PASSWORD"] = whoop_password |
|
|
|
_healthcare_server = MCPServerSse( |
|
params={ |
|
"url": "https://agents-mcp-hackathon-healthcare-mcp-public.hf.space/gradio_api/mcp/sse" |
|
}, |
|
name="Healthcare MCP Server" |
|
) |
|
|
|
_whoop_server = MCPServerSse( |
|
params={ |
|
"url": "https://agents-mcp-hackathon-whoop-mcp-server.hf.space/gradio_api/mcp/sse" |
|
}, |
|
name="Whoop MCP Server" |
|
) |
|
|
|
|
|
def select_model() -> Union[str, LitellmModel]: |
|
return "o3-mini" |
|
|
|
|
|
async def answer_question( |
|
question: str, |
|
openai_key: str, |
|
whoop_email: str, |
|
whoop_password: str, |
|
) -> str: |
|
"""Run the WellBe+ agent on a single question and return the assistant reply.""" |
|
initialize_mcp_servers(whoop_email, whoop_password) |
|
if _healthcare_server is None: |
|
return "**Error:** MCP servers not initialized." |
|
|
|
async with _healthcare_server as hserver, _whoop_server as wserver: |
|
agent = Agent( |
|
name="WellBe+ Assistant", |
|
instructions=PROMPT_TEMPLATE, |
|
model=select_model(), |
|
mcp_servers=[ |
|
hserver, |
|
wserver |
|
], |
|
) |
|
result = await Runner.run(agent, question) |
|
return result.final_output |
|
|
|
|
|
def answer_sync(question: str, openai_key: str, email: str, password: str) -> str: |
|
"""Blocking wrapper around :func:`answer_question`.""" |
|
api_key = openai_key or os.getenv("OPENAI_API_KEY") |
|
openai.api_key = api_key |
|
os.environ["OPENAI_API_KEY"] = api_key |
|
if not question.strip(): |
|
return "Please enter a question." |
|
try: |
|
return asyncio.run( |
|
answer_question( |
|
question, openai_key.strip(), email.strip(), password.strip() |
|
) |
|
) |
|
except Exception as exc: |
|
return f"**Error:** {exc}" |
|
|