WellBePlusAssistant / wellbe_agent.py
natasha1704's picture
Update wellbe_agent.py
e36ff4f verified
"""
Core logic for the WellBe+ multi‑context AI assistant.
This module handles MCP server instantiation, model selection, and single‑shot
question answering. It is intentionally agnostic of any UI layer so that it can
be re‑used from a command‑line script, a notebook, or the Gradio front‑end.
"""
from __future__ import annotations
import asyncio
from copy import deepcopy
from typing import Tuple, Union, Optional
import os
import openai
from agents import Agent, Runner
from agents.extensions.models.litellm_model import LitellmModel
from agents.mcp.server import MCPServerStreamableHttp, MCPServerSse
from config import PROMPT_TEMPLATE, MCP_CONFIGS
__all__ = [
"initialize_mcp_servers",
"select_model",
"answer_question",
"answer_sync",
]
# Global variables to store initialized servers
_healthcare_server: Optional[Union[MCPServerSse, str]] = None
_whoop_server: Optional[Union[MCPServerSse, str]] = None
def initialize_mcp_servers(whoop_email: str, whoop_password: str) -> None:
"""Initialize and cache the MCP servers (once only)."""
global _healthcare_server, _whoop_server
if _healthcare_server is None:
cfg = deepcopy(MCP_CONFIGS)
os.environ["WHOOP_EMAIL"] = whoop_email
os.environ["WHOOP_PASSWORD"] = whoop_password
_healthcare_server = MCPServerSse(
params={
"url": "https://agents-mcp-hackathon-healthcare-mcp-public.hf.space/gradio_api/mcp/sse"
},
name="Healthcare MCP Server"
)
_whoop_server = MCPServerSse(
params={
"url": "https://agents-mcp-hackathon-whoop-mcp-server.hf.space/gradio_api/mcp/sse"
},
name="Whoop MCP Server"
)
def select_model() -> Union[str, LitellmModel]:
return "o3-mini"
async def answer_question(
question: str,
openai_key: str,
whoop_email: str,
whoop_password: str,
) -> str:
"""Run the WellBe+ agent on a single question and return the assistant reply."""
initialize_mcp_servers(whoop_email, whoop_password)
if _healthcare_server is None:
return "**Error:** MCP servers not initialized."
async with _healthcare_server as hserver, _whoop_server as wserver:
agent = Agent(
name="WellBe+ Assistant",
instructions=PROMPT_TEMPLATE,
model=select_model(),
mcp_servers=[
hserver,
wserver
],
)
result = await Runner.run(agent, question)
return result.final_output
def answer_sync(question: str, openai_key: str, email: str, password: str) -> str:
"""Blocking wrapper around :func:`answer_question`."""
api_key = openai_key or os.getenv("OPENAI_API_KEY")
openai.api_key = api_key
os.environ["OPENAI_API_KEY"] = api_key
if not question.strip():
return "Please enter a question."
try:
return asyncio.run(
answer_question(
question, openai_key.strip(), email.strip(), password.strip()
)
)
except Exception as exc: # noqa: BLE001
return f"**Error:** {exc}"