Spaces:
Running
Running
| import os | |
| import json | |
| import uuid | |
| from datetime import datetime | |
| import os | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| from langchain_openai import ChatOpenAI | |
| _FIXTURE_CACHE: Dict[str, Any] = {} | |
| _DISPUTES_DB: Dict[str, Dict[str, Any]] = {} | |
| _SESSIONS: Dict[str, Dict[str, Any]] = {} | |
| _OTP_DB: Dict[str, Dict[str, Any]] = {} | |
| _QUOTES: Dict[str, Dict[str, Any]] = {} | |
| _BENEFICIARIES_DB: Dict[str, List[Dict[str, Any]]] = {} | |
| def _fixtures_dir() -> Path: | |
| return Path(__file__).parent / "mock_data" | |
| def _load_fixture(name: str) -> Any: | |
| if name in _FIXTURE_CACHE: | |
| return _FIXTURE_CACHE[name] | |
| p = _fixtures_dir() / name | |
| with p.open("r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| _FIXTURE_CACHE[name] = data | |
| return data | |
| def _parse_iso_date(text: Optional[str]) -> Optional[datetime]: | |
| if not text: | |
| return None | |
| try: | |
| return datetime.strptime(text, "%Y-%m-%d") | |
| except Exception: | |
| return None | |
| def _get_customer_blob(customer_id: str) -> Dict[str, Any]: | |
| data = _load_fixture("accounts.json") | |
| return dict(data.get("customers", {}).get(customer_id, {})) | |
| def get_accounts(customer_id: str) -> List[Dict[str, Any]]: | |
| cust = _get_customer_blob(customer_id) | |
| if isinstance(cust, list): | |
| # backward-compat: old format was a list of accounts | |
| return list(cust) | |
| return list(cust.get("accounts", [])) | |
| def get_profile(customer_id: str) -> Dict[str, Any]: | |
| cust = _get_customer_blob(customer_id) | |
| if isinstance(cust, dict): | |
| return dict(cust.get("profile", {})) | |
| return {} | |
| def find_customer_by_name(first_name: str, last_name: str) -> Dict[str, Any]: | |
| data = _load_fixture("accounts.json") | |
| customers = data.get("customers", {}) | |
| fn = (first_name or "").strip().lower() | |
| ln = (last_name or "").strip().lower() | |
| for cid, blob in customers.items(): | |
| prof = blob.get("profile") if isinstance(blob, dict) else None | |
| if isinstance(prof, dict): | |
| pfn = str(prof.get("first_name") or "").strip().lower() | |
| pln = str(prof.get("last_name") or "").strip().lower() | |
| if fn == pfn and ln == pln: | |
| return {"customer_id": cid, "profile": prof} | |
| return {} | |
| def find_customer_by_full_name(full_name: str) -> Dict[str, Any]: | |
| data = _load_fixture("accounts.json") | |
| customers = data.get("customers", {}) | |
| target = (full_name or "").strip().lower() | |
| for cid, blob in customers.items(): | |
| prof = blob.get("profile") if isinstance(blob, dict) else None | |
| if isinstance(prof, dict): | |
| fn = f"{str(prof.get('first_name') or '').strip()} {str(prof.get('last_name') or '').strip()}".strip().lower() | |
| ff = str(prof.get("full_name") or "").strip().lower() | |
| if target and (target == fn or target == ff): | |
| return {"customer_id": cid, "profile": prof} | |
| return {} | |
| def _normalize_dob(text: Optional[str]) -> Optional[str]: | |
| if not isinstance(text, str) or not text.strip(): | |
| return None | |
| t = text.strip().lower() | |
| # YYYY-MM-DD | |
| try: | |
| if len(t) >= 10 and t[4] == '-' and t[7] == '-': | |
| d = datetime.strptime(t[:10], "%Y-%m-%d") | |
| return d.strftime("%Y-%m-%d") | |
| except Exception: | |
| pass | |
| # YYYY MM DD or YYYY/MM/DD or YYYY.MM.DD (loosely) | |
| try: | |
| import re as _re | |
| parts = _re.findall(r"\d+", t) | |
| if len(parts) >= 3 and len(parts[0]) == 4: | |
| y, m, d = int(parts[0]), int(parts[1]), int(parts[2]) | |
| if 1900 <= y <= 2100 and 1 <= m <= 12 and 1 <= d <= 31: | |
| dt = datetime(y, m, d) | |
| return dt.strftime("%Y-%m-%d") | |
| except Exception: | |
| pass | |
| # Month name DD YYYY | |
| MONTHS = { | |
| "jan": 1, "january": 1, "feb": 2, "february": 2, "mar": 3, "march": 3, | |
| "apr": 4, "april": 4, "may": 5, "jun": 6, "june": 6, "jul": 7, "july": 7, | |
| "aug": 8, "august": 8, "sep": 9, "sept": 9, "september": 9, | |
| "oct": 10, "october": 10, "nov": 11, "november": 11, "dec": 12, "december": 12, | |
| } | |
| try: | |
| parts = t.replace(',', ' ').split() | |
| if len(parts) >= 3 and parts[0] in MONTHS: | |
| m = MONTHS[parts[0]] | |
| day = int(''.join(ch for ch in parts[1] if ch.isdigit())) | |
| year = int(parts[2]) | |
| d = datetime(year, m, day) | |
| return d.strftime("%Y-%m-%d") | |
| except Exception: | |
| pass | |
| # DD/MM/YYYY or MM/DD/YYYY | |
| try: | |
| for sep in ('/', '-'): | |
| if sep in t and t.count(sep) == 2: | |
| a, b, c = t.split(sep)[:3] | |
| if len(c) == 4 and a.isdigit() and b.isdigit() and c.isdigit(): | |
| da, db, dy = int(a), int(b), int(c) | |
| # If first looks like month, assume MM/DD | |
| if 1 <= da <= 12 and 1 <= db <= 31: | |
| d = datetime(dy, da, db) | |
| else: | |
| # assume DD/MM | |
| d = datetime(dy, db, da) | |
| return d.strftime("%Y-%m-%d") | |
| except Exception: | |
| pass | |
| return None | |
| def _find_account_by_id(account_id: str) -> Optional[Dict[str, Any]]: | |
| data = _load_fixture("accounts.json") | |
| customers = data.get("customers", {}) | |
| for _, blob in customers.items(): | |
| accts = (blob or {}).get("accounts", []) | |
| for a in accts or []: | |
| if str(a.get("account_id")) == account_id: | |
| return a | |
| return None | |
| def get_account_balance(account_id: str) -> Dict[str, Any]: | |
| acc = _find_account_by_id(account_id) or {} | |
| return { | |
| "account_id": account_id, | |
| "currency": acc.get("currency"), | |
| "balance": float(acc.get("balance", 0.0)), | |
| "daily_wire_limit": float(acc.get("daily_wire_limit", 0.0)), | |
| "wire_enabled": bool(acc.get("wire_enabled", False)), | |
| } | |
| def get_exchange_rate(from_currency: str, to_currency: str, amount: float) -> Dict[str, Any]: | |
| if from_currency.upper() == to_currency.upper(): | |
| return { | |
| "from": from_currency.upper(), | |
| "to": to_currency.upper(), | |
| "mid_rate": 1.0, | |
| "applied_rate": 1.0, | |
| "margin_bps": 0, | |
| "converted_amount": round(float(amount), 2), | |
| } | |
| data = _load_fixture("exchange_rates.json") | |
| pairs = data.get("pairs", []) | |
| mid = None | |
| bps = 150 | |
| fc = from_currency.upper() | |
| tc = to_currency.upper() | |
| for p in pairs: | |
| if str(p.get("from")).upper() == fc and str(p.get("to")).upper() == tc: | |
| mid = float(p.get("mid_rate")) | |
| bps = int(p.get("margin_bps", bps)) | |
| break | |
| if mid is None: | |
| # naive inverse lookup | |
| for p in pairs: | |
| if str(p.get("from")).upper() == tc and str(p.get("to")).upper() == fc: | |
| inv = float(p.get("mid_rate")) | |
| mid = 1.0 / inv if inv else None | |
| bps = int(p.get("margin_bps", bps)) | |
| break | |
| if mid is None: | |
| mid = 1.0 | |
| applied = mid * (1.0 - bps / 10000.0) | |
| converted = float(amount) * applied | |
| return { | |
| "from": fc, | |
| "to": tc, | |
| "mid_rate": round(mid, 6), | |
| "applied_rate": round(applied, 6), | |
| "margin_bps": bps, | |
| "converted_amount": round(converted, 2), | |
| } | |
| def calculate_wire_fee(kind: str, amount: float, from_currency: str, to_currency: str, payer: str) -> Dict[str, Any]: | |
| fees = _load_fixture("fee_schedules.json") | |
| k = (kind or "").strip().upper() | |
| payer_opt = (payer or "SHA").strip().upper() | |
| if k not in ("DOMESTIC", "INTERNATIONAL"): | |
| return {"error": "invalid_type", "message": "type must be DOMESTIC or INTERNATIONAL"} | |
| if payer_opt not in ("OUR", "SHA", "BEN"): | |
| return {"error": "invalid_payer", "message": "payer must be OUR, SHA, or BEN"} | |
| breakdown: Dict[str, float] = {} | |
| if k == "DOMESTIC": | |
| breakdown["DOMESTIC_BASE"] = float(fees.get("DOMESTIC", {}).get("base_fee", 15.0)) | |
| else: | |
| intl = fees.get("INTERNATIONAL", {}) | |
| breakdown["INTERNATIONAL_BASE"] = float(intl.get("base_fee", 25.0)) | |
| breakdown["SWIFT"] = float(intl.get("swift_network_fee", 5.0)) | |
| breakdown["CORRESPONDENT"] = float(intl.get("correspondent_fee", 10.0)) | |
| breakdown["LIFTING"] = float(intl.get("lifting_fee", 5.0)) | |
| initiator = 0.0 | |
| recipient = 0.0 | |
| for code, fee in breakdown.items(): | |
| if payer_opt == "OUR": | |
| initiator += fee | |
| elif payer_opt == "SHA": | |
| # Sender pays origin bank fees (base, swift); recipient pays intermediary (correspondent/lifting) | |
| if code in ("DOMESTIC_BASE", "INTERNATIONAL_BASE", "SWIFT"): | |
| initiator += fee | |
| else: | |
| recipient += fee | |
| elif payer_opt == "BEN": | |
| recipient += fee | |
| return { | |
| "type": k, | |
| "payer": payer_opt, | |
| "from_currency": from_currency.upper(), | |
| "to_currency": to_currency.upper(), | |
| "amount": float(amount), | |
| "initiator_fees_total": round(initiator, 2), | |
| "recipient_fees_total": round(recipient, 2), | |
| "breakdown": {k: round(v, 2) for k, v in breakdown.items()}, | |
| } | |
| def screen_sanctions(name: str, country: str) -> Dict[str, Any]: | |
| data = _load_fixture("sanctions_list.json") | |
| blocked = data.get("blocked", []) | |
| nm = (name or "").strip().lower() | |
| cc = (country or "").strip().upper() | |
| for e in blocked: | |
| if str(e.get("name", "")).strip().lower() == nm and str(e.get("country", "")).strip().upper() == cc: | |
| return {"cleared": False, "reason": "Sanctions match"} | |
| return {"cleared": True} | |
| def check_wire_limits(account_id: str, amount: float) -> Dict[str, Any]: | |
| acc = _find_account_by_id(account_id) or {} | |
| if not acc: | |
| return {"ok": False, "reason": "account_not_found"} | |
| bal = float(acc.get("balance", 0.0)) | |
| lim = float(acc.get("daily_wire_limit", 0.0)) | |
| if not bool(acc.get("wire_enabled", False)): | |
| return {"ok": False, "reason": "wire_not_enabled"} | |
| if amount > lim: | |
| return {"ok": False, "reason": "exceeds_daily_limit", "limit": lim} | |
| if amount > bal: | |
| return {"ok": False, "reason": "insufficient_funds", "balance": bal} | |
| return {"ok": True, "balance": bal, "limit": lim} | |
| def get_cutoff_and_eta(kind: str, country: str) -> Dict[str, Any]: | |
| cfg = _load_fixture("cutoff_times.json") | |
| k = (kind or "").strip().upper() | |
| key = "DOMESTIC" if k == "DOMESTIC" else "INTERNATIONAL" | |
| info = cfg.get(key, {}) | |
| return { | |
| "cutoff_local": info.get("cutoff_local", "17:00"), | |
| "eta_hours": list(info.get("eta_hours", [24, 72])), | |
| "country": country | |
| } | |
| def get_country_requirements(code: str) -> List[str]: | |
| data = _load_fixture("country_requirements.json") | |
| return list(data.get(code.upper(), [])) | |
| def validate_beneficiary(country_code: str, beneficiary: Dict[str, Any]) -> Dict[str, Any]: | |
| required = get_country_requirements(country_code) | |
| missing: List[str] = [] | |
| for field in required: | |
| if not isinstance(beneficiary.get(field), str) or not str(beneficiary.get(field)).strip(): | |
| missing.append(field) | |
| return {"ok": len(missing) == 0, "missing": missing} | |
| def save_beneficiary(customer_id: str, beneficiary: Dict[str, Any]) -> Dict[str, Any]: | |
| arr = _BENEFICIARIES_DB.setdefault(customer_id, []) | |
| bid = beneficiary.get("beneficiary_id") or f"B-{uuid.uuid4().hex[:6]}" | |
| entry = dict(beneficiary) | |
| entry["beneficiary_id"] = bid | |
| arr.append(entry) | |
| return {"beneficiary_id": bid} | |
| def generate_otp(customer_id: str) -> Dict[str, Any]: | |
| # Prefer static OTP from fixture for predictable testing | |
| static = None | |
| try: | |
| data = _load_fixture("otps.json") | |
| if isinstance(data, dict): | |
| byc = data.get("by_customer", {}) or {} | |
| static = byc.get(customer_id) or data.get("default") | |
| except Exception: | |
| static = None | |
| code = str(static or f"{uuid.uuid4().int % 1000000:06d}").zfill(6) | |
| _OTP_DB[customer_id] = {"otp": code, "created_at": datetime.utcnow().isoformat() + "Z"} | |
| # In real world, send to phone/email; here we mask | |
| resp = {"sent": True, "destination": "on-file", "masked": "***-***-****"} | |
| try: | |
| if os.getenv("WIRE_DEBUG_OTP", "0").lower() not in ("", "0", "false"): # dev convenience | |
| resp["debug_code"] = code | |
| except Exception: | |
| pass | |
| return resp | |
| def verify_otp(customer_id: str, otp: str) -> Dict[str, Any]: | |
| rec = _OTP_DB.get(customer_id) or {} | |
| ok = str(rec.get("otp")) == str(otp) | |
| if ok: | |
| rec["used_at"] = datetime.utcnow().isoformat() + "Z" | |
| _OTP_DB[customer_id] = rec | |
| return {"verified": ok} | |
| def authenticate_user_wire(session_id: str, customer_id: Optional[str], full_name: Optional[str], dob_yyyy_mm_dd: Optional[str], ssn_last4: Optional[str], secret_answer: Optional[str]) -> Dict[str, Any]: | |
| session = _SESSIONS.get(session_id) or {"verified": False, "customer_id": customer_id, "name": full_name} | |
| if isinstance(customer_id, str) and customer_id: | |
| session["customer_id"] = customer_id | |
| if isinstance(full_name, str) and full_name: | |
| session["name"] = full_name | |
| if isinstance(dob_yyyy_mm_dd, str) and dob_yyyy_mm_dd: | |
| session["dob"] = dob_yyyy_mm_dd | |
| if isinstance(ssn_last4, str) and ssn_last4: | |
| session["ssn_last4"] = ssn_last4 | |
| if isinstance(secret_answer, str) and secret_answer: | |
| session["secret"] = secret_answer | |
| ok = False | |
| cid = session.get("customer_id") | |
| if isinstance(cid, str): | |
| prof = get_profile(cid) | |
| user_dob_norm = _normalize_dob(session.get("dob")) | |
| prof_dob_norm = _normalize_dob(prof.get("dob")) | |
| dob_ok = (user_dob_norm is not None) and (user_dob_norm == prof_dob_norm) | |
| ssn_ok = str(session.get("ssn_last4") or "") == str(prof.get("ssn_last4") or "") | |
| def _norm(x: Optional[str]) -> str: | |
| return (x or "").strip().lower() | |
| secret_ok = _norm(session.get("secret")) == _norm(prof.get("secret_answer")) | |
| if dob_ok and (ssn_ok or secret_ok): | |
| ok = True | |
| session["verified"] = ok | |
| _SESSIONS[session_id] = session | |
| need: List[str] = [] | |
| if _normalize_dob(session.get("dob")) is None: | |
| need.append("dob") | |
| if not session.get("ssn_last4") and not session.get("secret"): | |
| need.append("ssn_last4_or_secret") | |
| if not session.get("customer_id"): | |
| need.append("customer") | |
| resp: Dict[str, Any] = {"session_id": session_id, "verified": ok, "needs": need, "profile": {"name": session.get("name")}} | |
| try: | |
| if isinstance(session.get("customer_id"), str): | |
| prof = get_profile(session.get("customer_id")) | |
| if isinstance(prof, dict) and prof.get("secret_question"): | |
| resp["question"] = prof.get("secret_question") | |
| except Exception: | |
| pass | |
| return resp | |
| def quote_wire(kind: str, from_account_id: str, beneficiary: Dict[str, Any], amount: float, from_currency: str, to_currency: str, payer: str) -> Dict[str, Any]: | |
| # FX | |
| fx = get_exchange_rate(from_currency, to_currency, amount) | |
| converted_amount = fx["converted_amount"] | |
| # Fees | |
| fee = calculate_wire_fee(kind, amount, from_currency, to_currency, payer) | |
| # Limits and balance | |
| limits = check_wire_limits(from_account_id, amount) | |
| if not limits.get("ok"): | |
| return {"error": "limit_or_balance", "details": limits} | |
| # Sanctions | |
| sanc = screen_sanctions(str(beneficiary.get("account_name") or beneficiary.get("name") or ""), str(beneficiary.get("country") or "")) | |
| if not sanc.get("cleared"): | |
| return {"error": "sanctions", "details": sanc} | |
| # ETA | |
| eta = get_cutoff_and_eta(kind, str(beneficiary.get("country") or "")) | |
| payer_opt = (payer or "SHA").upper() | |
| initiator_fees = float(fee.get("initiator_fees_total", 0.0)) | |
| recipient_fees = float(fee.get("recipient_fees_total", 0.0)) | |
| net_sent = float(amount) + (initiator_fees if payer_opt in ("OUR", "SHA") else 0.0) | |
| # recipient side fees reduce the amount received when SHA/BEN | |
| net_received = float(converted_amount) | |
| if payer_opt in ("SHA", "BEN"): | |
| net_received = max(0.0, net_received - recipient_fees) | |
| qid = f"Q-{uuid.uuid4().hex[:8]}" | |
| quote = { | |
| "quote_id": qid, | |
| "type": kind.upper(), | |
| "from_account_id": from_account_id, | |
| "amount": float(amount), | |
| "from_currency": from_currency.upper(), | |
| "to_currency": to_currency.upper(), | |
| "payer": payer_opt, | |
| "fx": fx, | |
| "fees": fee, | |
| "net_sent": round(net_sent, 2), | |
| "net_received": round(net_received, 2), | |
| "eta": eta, | |
| "created_at": datetime.utcnow().isoformat() + "Z", | |
| "expires_at": (datetime.utcnow().isoformat() + "Z") | |
| } | |
| _QUOTES[qid] = quote | |
| return quote | |
| def wire_transfer_domestic(quote_id: str, otp: str) -> Dict[str, Any]: | |
| q = _QUOTES.get(quote_id) | |
| if not q or q.get("type") != "DOMESTIC": | |
| return {"error": "invalid_quote"} | |
| # OTP expected: we need customer_id context; skip and assume OTP verified externally | |
| conf = f"WD-{uuid.uuid4().hex[:8]}" | |
| return {"confirmation_id": conf, "status": "submitted"} | |
| def wire_transfer_international(quote_id: str, otp: str) -> Dict[str, Any]: | |
| q = _QUOTES.get(quote_id) | |
| if not q or q.get("type") != "INTERNATIONAL": | |
| return {"error": "invalid_quote"} | |
| conf = f"WI-{uuid.uuid4().hex[:8]}" | |
| return {"confirmation_id": conf, "status": "submitted"} | |
| def list_transactions(account_id: str, start: Optional[str], end: Optional[str]) -> List[Dict[str, Any]]: | |
| data = _load_fixture("transactions.json") | |
| txns = list(data.get(account_id, [])) | |
| if start or end: | |
| start_dt = _parse_iso_date(start) or datetime.min | |
| end_dt = _parse_iso_date(end) or datetime.max | |
| out: List[Dict[str, Any]] = [] | |
| for t in txns: | |
| td = _parse_iso_date(t.get("date")) | |
| if td and start_dt <= td <= end_dt: | |
| out.append(t) | |
| return out | |
| return txns | |
| def get_fee_schedule(product_type: str) -> Dict[str, Any]: | |
| data = _load_fixture("fee_schedules.json") | |
| return dict(data.get(product_type.upper(), {})) | |
| def detect_fees(transactions: List[Dict[str, Any]], schedule: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| results: List[Dict[str, Any]] = [] | |
| for t in transactions: | |
| if str(t.get("entry_type")).upper() == "FEE": | |
| fee_code = (t.get("fee_code") or "").upper() | |
| sched_entry = None | |
| for s in schedule.get("fees", []) or []: | |
| if str(s.get("code", "")).upper() == fee_code: | |
| sched_entry = s | |
| break | |
| evt = { | |
| "id": t.get("id") or str(uuid.uuid4()), | |
| "posted_date": t.get("date"), | |
| "amount": float(t.get("amount", 0)), | |
| "description": t.get("description") or fee_code, | |
| "fee_code": fee_code, | |
| "schedule": sched_entry or None, | |
| } | |
| results.append(evt) | |
| try: | |
| results.sort(key=lambda x: x.get("posted_date") or "") | |
| except Exception: | |
| pass | |
| return results | |
| def explain_fee(fee_event: Dict[str, Any]) -> str: | |
| openai_api_key = os.getenv("OPENAI_API_KEY") | |
| code = (fee_event.get("fee_code") or "").upper() | |
| name = fee_event.get("schedule", {}).get("name") or code.title() | |
| posted = fee_event.get("posted_date") or "" | |
| amount = float(fee_event.get("amount") or 0) | |
| policy = fee_event.get("schedule", {}).get("policy") or "" | |
| if not openai_api_key: | |
| base = f"You were charged {name} on {posted} for CAD {amount:.2f}." | |
| if code == "NSF": | |
| return base + " This is applied when a payment is attempted but the account balance was insufficient." | |
| if code == "MAINTENANCE": | |
| return base + " This is the monthly account fee as per your account plan." | |
| if code == "ATM": | |
| return base + " This fee applies to certain ATM withdrawals." | |
| return base + " This fee was identified based on your recent transactions." | |
| llm = ChatOpenAI(model=os.getenv("EXPLAIN_MODEL", "gpt-4o"), api_key=openai_api_key) | |
| chain = EXPLAIN_FEE_PROMPT | llm | |
| out = chain.invoke( | |
| { | |
| "fee_code": code, | |
| "posted_date": posted, | |
| "amount": f"{amount:.2f}", | |
| "schedule_name": name, | |
| "schedule_policy": policy, | |
| } | |
| ) | |
| text = getattr(out, "content", None) | |
| return text if isinstance(text, str) and text.strip() else f"You were charged {name} on {posted} for CAD {amount:.2f}." | |
| def check_dispute_eligibility(fee_event: Dict[str, Any]) -> Dict[str, Any]: | |
| code = (fee_event.get("fee_code") or "").upper() | |
| amount = float(fee_event.get("amount", 0)) | |
| first_time = bool(fee_event.get("first_time_90d", False)) | |
| eligible = False | |
| reason = "" | |
| if code in {"NSF", "ATM", "MAINTENANCE", "WITHDRAWAL"} and amount <= 20.0 and first_time: | |
| eligible = True | |
| reason = "First occurrence in 90 days and small amount" | |
| return {"eligible": eligible, "reason": reason} | |
| def create_dispute_case(fee_event: Dict[str, Any], idempotency_key: str) -> Dict[str, Any]: | |
| if idempotency_key in _DISPUTES_DB: | |
| return _DISPUTES_DB[idempotency_key] | |
| case = { | |
| "case_id": str(uuid.uuid4()), | |
| "status": "submitted", | |
| "fee_id": fee_event.get("id"), | |
| "created_at": datetime.utcnow().isoformat() + "Z", | |
| } | |
| _DISPUTES_DB[idempotency_key] = case | |
| return case | |
| def authenticate_user(session_id: str, name: Optional[str], dob_yyyy_mm_dd: Optional[str], last4: Optional[str], secret_answer: Optional[str], customer_id: Optional[str] = None) -> Dict[str, Any]: | |
| """Mock identity verification. | |
| Rules (mock): | |
| - If dob == 1990-01-01 and last4 == 6001 or secret_answer == "blue", auth succeeds. | |
| - Otherwise, remains pending with which fields are still missing. | |
| Persists per session_id. | |
| """ | |
| session = _SESSIONS.get(session_id) or {"verified": False, "name": name, "customer_id": customer_id} | |
| if isinstance(name, str) and name: | |
| session["name"] = name | |
| if isinstance(customer_id, str) and customer_id: | |
| session["customer_id"] = customer_id | |
| if isinstance(dob_yyyy_mm_dd, str) and dob_yyyy_mm_dd: | |
| # Normalize DOB to YYYY-MM-DD | |
| norm = _normalize_dob(dob_yyyy_mm_dd) | |
| session["dob"] = norm or dob_yyyy_mm_dd | |
| if isinstance(last4, str) and last4: | |
| session["last4"] = last4 | |
| if isinstance(secret_answer, str) and secret_answer: | |
| session["secret"] = secret_answer | |
| ok = False | |
| # If a specific customer is in context, validate against their profile and accounts | |
| if isinstance(session.get("customer_id"), str): | |
| prof = get_profile(session.get("customer_id")) | |
| accts = get_accounts(session.get("customer_id")) | |
| dob_ok = _normalize_dob(session.get("dob")) == _normalize_dob(prof.get("dob")) and bool(session.get("dob")) | |
| last4s = {str(a.get("account_number"))[-4:] for a in accts if a.get("account_number")} | |
| last4_ok = isinstance(session.get("last4"), str) and session.get("last4") in last4s | |
| def _norm_secret(x: Optional[str]) -> str: | |
| return (x or "").strip().lower() | |
| secret_ok = _norm_secret(session.get("secret")) == _norm_secret(prof.get("secret_answer")) | |
| if dob_ok and (last4_ok or secret_ok): | |
| ok = True | |
| else: | |
| # Optional demo fallback (disabled by default) | |
| allow_fallback = os.getenv("RBC_FEES_ALLOW_GLOBAL_FALLBACK", "0") not in ("", "0", "false", "False") | |
| if allow_fallback and session.get("dob") == "1990-01-01" and (session.get("last4") == "6001" or (session.get("secret") or "").strip().lower() == "blue"): | |
| ok = True | |
| session["verified"] = ok | |
| _SESSIONS[session_id] = session | |
| need: list[str] = [] | |
| if not session.get("dob"): | |
| need.append("dob") | |
| if not session.get("last4") and not session.get("secret"): | |
| need.append("last4_or_secret") | |
| if not session.get("customer_id"): | |
| need.append("customer") | |
| resp: Dict[str, Any] = {"session_id": session_id, "verified": ok, "needs": need, "profile": {"name": session.get("name")}} | |
| try: | |
| if isinstance(session.get("customer_id"), str): | |
| prof = get_profile(session.get("customer_id")) | |
| if isinstance(prof, dict) and prof.get("secret_question"): | |
| resp["question"] = prof.get("secret_question") | |
| except Exception: | |
| pass | |
| return resp | |
| # --- Healthcare demo logic (patients, triage, providers, pharmacies) --- | |
| _HC_SESSIONS: Dict[str, Dict[str, Any]] = {} | |
| _HC_APPOINTMENTS: List[Dict[str, Any]] = [] | |
| _HC_CALL_LOG: List[Dict[str, Any]] = [] | |
| def _hc_fixtures_dir() -> Path: | |
| return Path(__file__).parent / "mock_data" | |
| def _hc_load_fixture(name: str) -> Any: | |
| # Use a separate cache key namespace to avoid collisions | |
| key = f"hc::{name}" | |
| if key in _FIXTURE_CACHE: | |
| return _FIXTURE_CACHE[key] | |
| p = _hc_fixtures_dir() / name | |
| with p.open("r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| _FIXTURE_CACHE[key] = data | |
| return data | |
| def _hc_get_patient_blob(patient_id: str) -> Dict[str, Any]: | |
| data = _hc_load_fixture("patients.json") | |
| return dict((data.get("patients") or {}).get(patient_id, {})) | |
| def find_patient_by_name(first_name: str, last_name: str) -> Dict[str, Any]: | |
| data = _hc_load_fixture("patients.json") | |
| patients = data.get("patients", {}) | |
| fn = (first_name or "").strip().lower() | |
| ln = (last_name or "").strip().lower() | |
| for pid, blob in patients.items(): | |
| prof = blob.get("profile") if isinstance(blob, dict) else None | |
| if isinstance(prof, dict): | |
| pfn = str(prof.get("first_name") or "").strip().lower() | |
| pln = str(prof.get("last_name") or "").strip().lower() | |
| if fn == pfn and ln == pln: | |
| return {"patient_id": pid, "profile": prof} | |
| return {} | |
| def find_patient_by_full_name(full_name: str) -> Dict[str, Any]: | |
| data = _hc_load_fixture("patients.json") | |
| patients = data.get("patients", {}) | |
| target = (full_name or "").strip().lower() | |
| for pid, blob in patients.items(): | |
| prof = blob.get("profile") if isinstance(blob, dict) else None | |
| if isinstance(prof, dict): | |
| fn = f"{str(prof.get('first_name') or '').strip()} {str(prof.get('last_name') or '').strip()}".strip().lower() | |
| ff = str(prof.get("full_name") or "").strip().lower() | |
| if target and (target == fn or target == ff): | |
| return {"patient_id": pid, "profile": prof} | |
| return {} | |
| def get_patient_profile(patient_id: str) -> Dict[str, Any]: | |
| blob = _hc_get_patient_blob(patient_id) | |
| if not blob: | |
| return {} | |
| prof = dict(blob.get("profile", {})) | |
| return { | |
| "profile": prof, | |
| "allergies": list(blob.get("allergies", [])), | |
| "medications": list(blob.get("medications", [])), | |
| "conditions": list(blob.get("conditions", [])), | |
| "recent_visits": list(blob.get("recent_visits", [])), | |
| "vitals": dict(blob.get("vitals", {})), | |
| } | |
| def authenticate_patient(session_id: str, patient_id: Optional[str], full_name: Optional[str], dob_yyyy_mm_dd: Optional[str], mrn_last4: Optional[str], secret_answer: Optional[str]) -> Dict[str, Any]: | |
| session = _HC_SESSIONS.get(session_id) or {"verified": False, "patient_id": patient_id, "name": full_name} | |
| if isinstance(patient_id, str) and patient_id: | |
| session["patient_id"] = patient_id | |
| if isinstance(full_name, str) and full_name: | |
| session["name"] = full_name | |
| if isinstance(dob_yyyy_mm_dd, str) and dob_yyyy_mm_dd: | |
| session["dob"] = _normalize_dob(dob_yyyy_mm_dd) or dob_yyyy_mm_dd | |
| if isinstance(mrn_last4, str) and mrn_last4: | |
| session["mrn_last4"] = mrn_last4 | |
| if isinstance(secret_answer, str) and secret_answer: | |
| session["secret"] = secret_answer | |
| ok = False | |
| pid = session.get("patient_id") | |
| if isinstance(pid, str): | |
| prof = get_patient_profile(pid).get("profile", {}) | |
| user_dob_norm = _normalize_dob(session.get("dob")) | |
| prof_dob_norm = _normalize_dob(prof.get("dob")) | |
| dob_ok = (user_dob_norm is not None) and (user_dob_norm == prof_dob_norm) | |
| mrn_ok = str(session.get("mrn_last4") or "") == str(prof.get("mrn_last4") or "") | |
| def _norm(x: Optional[str]) -> str: | |
| return (x or "").strip().lower() | |
| secret_ok = _norm(session.get("secret")) == _norm(prof.get("secret_answer")) | |
| if dob_ok and (mrn_ok or secret_ok): | |
| ok = True | |
| session["verified"] = ok | |
| _HC_SESSIONS[session_id] = session | |
| need: List[str] = [] | |
| if _normalize_dob(session.get("dob")) is None: | |
| need.append("dob") | |
| if not session.get("mrn_last4") and not session.get("secret"): | |
| need.append("mrn_last4_or_secret") | |
| if not session.get("patient_id"): | |
| need.append("patient") | |
| resp: Dict[str, Any] = {"session_id": session_id, "verified": ok, "needs": need, "profile": {"name": session.get("name")}} | |
| try: | |
| if isinstance(session.get("patient_id"), str): | |
| prof = get_patient_profile(session.get("patient_id")).get("profile", {}) | |
| if isinstance(prof, dict) and prof.get("secret_question"): | |
| resp["question"] = prof.get("secret_question") | |
| except Exception: | |
| pass | |
| return resp | |
| def get_preferred_pharmacy(patient_id: str) -> Dict[str, Any]: | |
| prof = get_patient_profile(patient_id).get("profile", {}) | |
| ph_id = prof.get("preferred_pharmacy_id") | |
| if not ph_id: | |
| return {} | |
| data = _hc_load_fixture("pharmacies.json") | |
| ph = (data.get("pharmacies") or {}).get(ph_id) or {} | |
| return {"pharmacy_id": ph_id, **ph} | |
| def list_providers(specialty: Optional[str] = None) -> List[Dict[str, Any]]: | |
| data = _hc_load_fixture("providers.json") | |
| providers = data.get("providers", {}) | |
| out: List[Dict[str, Any]] = [] | |
| for pid, p in providers.items(): | |
| if specialty and str(p.get("specialty", "")).lower() != specialty.strip().lower(): | |
| continue | |
| out.append({"provider_id": pid, **p}) | |
| return out | |
| def get_provider_slots(provider_id: str, count: int = 3) -> List[str]: | |
| data = _hc_load_fixture("providers.json") | |
| providers = data.get("providers", {}) | |
| p = providers.get(provider_id) or {} | |
| return list((p.get("next_available") or [])[:count]) | |
| def schedule_appointment(provider_id: str, slot_iso: str, patient_id: Optional[str]) -> Dict[str, Any]: | |
| appt = { | |
| "appointment_id": f"A-{uuid.uuid4().hex[:8]}", | |
| "provider_id": provider_id, | |
| "slot": slot_iso, | |
| "patient_id": patient_id, | |
| "created_at": datetime.utcnow().isoformat() + "Z", | |
| "status": "booked", | |
| } | |
| _HC_APPOINTMENTS.append(appt) | |
| return appt | |
| def _patient_age_years(patient_id: Optional[str]) -> Optional[int]: | |
| try: | |
| if not patient_id: | |
| return None | |
| prof = get_patient_profile(patient_id).get("profile", {}) | |
| dob = _normalize_dob(prof.get("dob")) | |
| if not dob: | |
| return None | |
| y, m, d = [int(x) for x in dob.split("-")] | |
| today = datetime.utcnow().date() | |
| age = today.year - y - ((today.month, today.day) < (m, d)) | |
| return age | |
| except Exception: | |
| return None | |
| def triage_symptoms(patient_id: Optional[str], symptoms_text: str) -> Dict[str, Any]: | |
| txt = (symptoms_text or "").lower() | |
| rules = _hc_load_fixture("triage_rules.json").get("rules", []) | |
| age = _patient_age_years(patient_id) or 0 | |
| def contains_any(needles: List[str]) -> bool: | |
| for n in needles: | |
| if n.lower() in txt: | |
| return True | |
| return False | |
| chosen: Dict[str, Any] | None = None | |
| red_flags_hit: List[str] = [] | |
| for r in rules: | |
| matches = r.get("match", []) | |
| if matches and not contains_any(matches): | |
| continue | |
| rflags = r.get("red_flags", []) | |
| if rflags: | |
| red_flags_hit = [rf for rf in rflags if rf.lower() in txt] | |
| if red_flags_hit: | |
| chosen = r | |
| break | |
| crit = r.get("criteria", []) | |
| if crit: | |
| if "age_over_50" in crit and age > 50: | |
| chosen = r | |
| break | |
| if not r.get("red_flags") and not r.get("criteria"): | |
| chosen = r | |
| # do not break; prefer a more specific rule if later | |
| if not chosen and rules: | |
| chosen = rules[-1] | |
| if not chosen: | |
| return {"risk": "self_care", "advice": "If symptoms persist or worsen, contact us or seek care.", "red_flags": []} | |
| return { | |
| "risk": chosen.get("escalate", "self_care"), | |
| "advice": chosen.get("advice", ""), | |
| "red_flags": red_flags_hit, | |
| "rule": chosen.get("name", "") | |
| } | |
| def log_call(session_id: str, patient_id: Optional[str], notes: Optional[str], triage: Optional[Dict[str, Any]]) -> Dict[str, Any]: | |
| entry = { | |
| "log_id": f"L-{uuid.uuid4().hex[:8]}", | |
| "session_id": session_id, | |
| "patient_id": patient_id, | |
| "notes": notes or "", | |
| "triage": triage or {}, | |
| "timestamp": datetime.utcnow().isoformat() + "Z", | |
| } | |
| _HC_CALL_LOG.append(entry) | |
| try: | |
| # Also mirror to app.log for visibility | |
| logging.getLogger("HealthcareAgent").info("call_log: %s", json.dumps(entry)[:500]) | |
| except Exception: | |
| pass | |
| return {"logged": True, "log_id": entry["log_id"]} | |