import os import time import json import random from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple import numpy as np import pandas as pd import streamlit as st import plotly.express as px from streamlit_option_menu import option_menu from faker import Faker from datetime import datetime, timedelta # ============================= # Page / Theme Configuration # ============================= st.set_page_config( page_title="SAP S/4HANA Agentic AI Procurement Analytics", page_icon="π€", layout="wide", initial_sidebar_state="expanded", ) # --- CSS --- st.markdown( """ """, unsafe_allow_html=True, ) # ============================= # Currency helpers (βΉ with Indian grouping) # ============================= CURRENCY = "βΉ" def format_inr(x: float) -> str: """Format number with Indian digit grouping, no decimals.""" try: n = int(round(float(x))) except Exception: return f"{CURRENCY}{x}" s = str(n) if len(s) <= 3: return f"{CURRENCY}{s}" last3 = s[-3:] rest = s[:-3] parts = [] while len(rest) > 2: parts.insert(0, rest[-2:]) rest = rest[:-2] if rest: parts.insert(0, rest) return f"{CURRENCY}{','.join(parts + [last3])}" def fmt_currency(x: float) -> str: return format_inr(x) # ============================= # LLM client (resilient) # ============================= @dataclass class LLMConfig: base_url: Optional[str] = os.getenv("OPENAI_BASE_URL") api_key: Optional[str] = ( os.getenv("OPENAI_API_KEY") or os.getenv("OPENAI_API_TOKEN") or os.getenv("OPENAI_KEY") ) model: str = os.getenv("OPENAI_MODEL", "gpt-4o-mini") timeout: int = int(os.getenv("OPENAI_TIMEOUT", "45")) max_retries: int = int(os.getenv("OPENAI_MAX_RETRIES", "5")) temperature: float = float(os.getenv("OPENAI_TEMPERATURE", "0.5")) def _post_json(url: str, headers: Dict[str, str], payload: Dict[str, Any], timeout: int): import requests return requests.post(url, headers=headers, json=payload, timeout=timeout) class UniversalLLMClient: def __init__(self, cfg: LLMConfig): self.cfg = cfg self.available = bool(cfg.api_key) self.last_error: Optional[str] = None if self.available: self._smoke_test() def _headers(self) -> Dict[str, str]: return {"Authorization": f"Bearer {self.cfg.api_key}", "Content-Type": "application/json"} def _base_url(self) -> str: return (self.cfg.base_url or "https://api.openai.com/v1").rstrip("/") def _smoke_test(self) -> None: try: _ = self.chat([{"role": "user", "content": "ping"}], max_tokens=4) except Exception as e: self.available = False self.last_error = str(e) def chat(self, messages: List[Dict[str, str]], max_tokens: int = 400) -> str: if not self.available: raise RuntimeError("No API key configured") headers = self._headers() base = self._base_url() chat_url = f"{base}/chat/completions" payload = { "model": self.cfg.model, "messages": messages, "max_tokens": max_tokens, "temperature": self.cfg.temperature, } delay = 1.0 for attempt in range(self.cfg.max_retries): try: resp = _post_json(chat_url, headers, payload, self.cfg.timeout) if resp.status_code == 200: data = resp.json() return data["choices"][0]["message"]["content"].strip() if resp.status_code in (429, 500, 502, 503, 504): time.sleep(delay) delay = min(delay * 2, 8.0) continue try: j = resp.json() msg = j.get("error", {}).get("message", str(j)) except Exception: msg = resp.text raise RuntimeError(f"API error {resp.status_code}: {msg}") except Exception as e: if attempt == self.cfg.max_retries - 1: self.last_error = str(e) raise time.sleep(delay) delay = min(delay * 2, 8.0) raise RuntimeError("Exhausted retries") # ============================= # Data Generation # ============================= @st.cache_data(show_spinner=False) def generate_synthetic_procurement_data(seed: int = 42) -> Tuple[pd.DataFrame, pd.DataFrame]: fake = Faker() np.random.seed(seed) random.seed(seed) vendors = [ "Siemens AG", "BASF SE", "BMW Group", "Mercedes-Benz", "Bosch GmbH", "ThyssenKrupp", "Bayer AG", "Continental AG", "Henkel AG", "SAP SE", ] categories = [ "Raw Materials", "Components", "Packaging", "Services", "IT Equipment", "Office Supplies", "Machinery", "Chemicals", ] purchase_orders: List[Dict[str, Any]] = [] for i in range(900): order_date = fake.date_between(start_date='-24m', end_date='today') promised_days = random.randint(3, 30) promised_date = order_date + timedelta(days=promised_days) actual_lag = max(1, int(np.random.normal(promised_days, 5))) delivery_date = order_date + timedelta(days=actual_lag) late = delivery_date > promised_date unit_price = round(random.uniform(10, 500), 2) qty = random.randint(1, 1200) order_value = round(unit_price * qty, 2) po = { "po_number": f"PO{str(i+1).zfill(6)}", "vendor": random.choice(vendors), "material_category": random.choice(categories), "order_date": order_date, "promised_date": promised_date, "delivery_date": delivery_date, "late_delivery": late, "order_value": order_value, "quantity": qty, "unit_price": unit_price, "status": random.choice(["Open", "Delivered", "Invoiced", "Paid"]), "plant": random.choice(["Plant_001", "Plant_002", "Plant_003"]), "buyer": fake.name(), "currency": "INR", "payment_terms": random.choice(["30 Days", "45 Days", "60 Days", "90 Days"]), "quality_score": round(np.clip(np.random.normal(8.5, 0.8), 5.0, 10.0), 1), } purchase_orders.append(po) spend_rows = [] for v in vendors: for c in categories: spend_rows.append({ "vendor": v, "category": c, "total_spend": round(random.uniform(10000, 700000), 2), "contract_compliance": round(random.uniform(78, 100), 1), "risk_score": round(random.uniform(1, 10), 1), "savings_potential": round(random.uniform(5, 25), 1), }) po_df = pd.DataFrame(purchase_orders) spend_df = pd.DataFrame(spend_rows) return po_df, spend_df # ============================= # Analytics Engine # ============================= class ProcurementAnalytics: def __init__(self, po_df: pd.DataFrame): self.df = po_df.copy() self.df["order_date"] = pd.to_datetime(self.df["order_date"]) self.df["month"] = self.df["order_date"].dt.to_period("M").dt.to_timestamp() def kpis(self) -> Dict[str, Any]: df = self.df return { "total_spend": float(df["order_value"].sum()), "avg_order_value": float(df["order_value"].mean()), "active_vendors": int(df["vendor"].nunique()), "on_time_rate": float((~df["late_delivery"]).mean()), "quality_avg": float(df["quality_score"].mean()), } def category_spend(self) -> pd.DataFrame: return ( self.df.groupby("material_category", as_index=False)["order_value"] .sum() .sort_values("order_value", ascending=False) ) def vendor_spend(self, top_n: int = 8) -> pd.DataFrame: return ( self.df.groupby("vendor", as_index=False)["order_value"] .sum() .sort_values("order_value", ascending=False) .head(top_n) ) def monthly_spend(self) -> pd.DataFrame: return ( self.df.groupby("month", as_index=False)["order_value"] .sum() .sort_values("month") ) def vendor_performance(self) -> pd.DataFrame: g = self.df.groupby("vendor").agg( total_spend=("order_value", "sum"), on_time=("late_delivery", lambda s: 1 - s.mean()), quality=("quality_score", "mean"), orders=("po_number", "count"), ) g["on_time"] = (g["on_time"] * 100).round(1) g["quality"] = g["quality"].round(2) g["total_spend"] = g["total_spend"].round(2) return g.sort_values("total_spend", ascending=False) def top_n_categories(self, n: int = 3) -> List[Tuple[str, float]]: cat = self.category_spend() total = float(cat["order_value"].sum()) or 1.0 return [(r["material_category"], (r["order_value"] / total) * 100) for _, r in cat.head(n).iterrows()] def top_n_vendors(self, n: int = 3) -> List[Tuple[str, float]]: ven = ( self.df.groupby("vendor", as_index=False)["order_value"] .sum() .sort_values("order_value", ascending=False) ) total = float(ven["order_value"].sum()) or 1.0 return [(r["vendor"], (r["order_value"] / total) * 100) for _, r in ven.head(n).iterrows()] # ============================= # Agent with tighter prompts & INR formatting # ============================= class UniversalProcurementAgent: def __init__(self, po_df: pd.DataFrame, spend_df: pd.DataFrame, client: UniversalLLMClient): self.po_data = po_df self.spend_data = spend_df self.llm = client self.analytics = ProcurementAnalytics(po_df) def executive_summary(self) -> str: if not self.llm.available: return self._rule_summary() k = self.analytics.kpis() top_cats = self.analytics.top_n_categories(3) top_vens = self.analytics.top_n_vendors(3) data_summary = { "total_spend": k["total_spend"], "total_orders": int(len(self.po_data)), "vendor_count": int(self.po_data["vendor"].nunique()), "avg_order_value": k["avg_order_value"], "on_time_delivery": k["on_time_rate"], "avg_quality": k["quality_avg"], "top_categories": top_cats, "top_vendors": top_vens, } messages = [ { "role": "system", "content": ( "You are a senior procurement analyst. Use bullet points, be concise, " "and always use the βΉ symbol. When summarizing, include top categories " "and vendors with percentages, then 2-3 quantified actions." ), }, { "role": "user", "content": ( "Executive summary. Format amounts with Indian commas (e.g., βΉ12,34,567).\n\n" f"Data: {json.dumps(data_summary)}" ), }, ] try: return ( "π§ **[AI-Powered Analysis]**\n\n" + self.llm.chat(messages, max_tokens=550) ) except Exception as e: return self._rule_summary() + f"\n\n*AI fallback due to: {e}*" def _rule_summary(self) -> str: k = self.analytics.kpis() top_c = self.analytics.top_n_categories(3) top_v = self.analytics.top_n_vendors(3) topc_str = ", ".join([f"{n} β {s:.0f}%" for n, s in top_c]) topv_str = ", ".join([f"{n} β {s:.0f}%" for n, s in top_v]) return ( "π€ **[Rule-Based Summary]**\n" + f"β’ Total spend: {fmt_currency(k['total_spend'])} across {len(self.po_data):,} POs\n" + f"β’ On-time delivery: {k['on_time_rate']*100:.1f}% | Avg quality: {k['quality_avg']:.1f}/10\n" + f"β’ Top categories: {topc_str}\n" + f"β’ Top vendors: {topv_str}\n" + "Actions: Consolidate long tail; multi-year terms with top vendors; auto-approve low-value POs." ) def chat_with_data(self, question: str) -> str: if not self.llm.available: return self._rule_answer(question) k = self.analytics.kpis() top_c = self.analytics.top_n_categories(3) top_v = self.analytics.top_n_vendors(3) context = { "total_spend": k["total_spend"], "orders": int(len(self.po_data)), "vendors": int(self.po_data["vendor"].nunique()), "on_time": k["on_time_rate"], "quality": k["quality_avg"], "top_categories": top_c, "top_vendors": top_v, } style_rules = ( "Rules: Answer in β€6 bullet points, use βΉ, no generic how-to steps. " "If question mentions spend, list top 3 categories and top 3 vendors with shares. " "If vendors, show best & worst by on-time and spend. If risk, show late % and actions." ) messages = [ {"role": "system", "content": "You are a precise procurement co-pilot. Be direct, metric-first, and action-oriented."}, {"role": "user", "content": f"Q: {question}\n\nContext: {json.dumps(context)}\n\n{style_rules}"}, ] try: return ( "π§ **[AI Response]**\n\n" + self.llm.chat(messages, max_tokens=450) ) except Exception as e: return self._rule_answer(question) + f"\n\n*AI fallback due to: {e}*" def _rule_answer(self, question: str) -> str: q = question.lower() k = self.analytics.kpis() top_c = self.analytics.top_n_categories(3) top_v = self.analytics.top_n_vendors(3) if ("spend" in q) or ("spending" in q) or ("cost" in q): lines = [ f"β’ Total spend: {fmt_currency(k['total_spend'])}", "β’ Top categories: " + ", ".join([f"{n} β {s:.0f}%" for n, s in top_c]), "β’ Top vendors: " + ", ".join([f"{n} β {s:.0f}%" for n, s in top_v]), "β’ Action: Run sourcing events for top 2 categories; target 8β12% savings via volume tiers.", ] return "π€ **[Rule-Based Spend]**\n" + "\n".join(lines) if ("vendor" in q) or ("supplier" in q) or ("partner" in q): vp = self.po_data.groupby("vendor").agg( spend=("order_value", "sum"), late_rate=("late_delivery", "mean"), quality=("quality_score", "mean"), ).sort_values("spend", ascending=False) best = vp.head(1) worst = vp.sort_values("late_rate", ascending=False).head(1) bname, wname = best.index[0], worst.index[0] blate = float(best.iloc[0]["late_rate"]) * 100 wlate = float(worst.iloc[0]["late_rate"]) * 100 lines = [ f"β’ Best by spend: {bname} (late {blate:.1f}%)", f"β’ Worst by late deliveries: {wname} (late {wlate:.1f}%)", "β’ Action: Extend terms with best performer; corrective plan and SLA penalties for the worst.", ] return "π€ **[Rule-Based Vendor]**\n" + "\n".join(lines) if ("risk" in q) or ("late" in q) or ("delay" in q): late = float(self.po_data["late_delivery"].mean()) * 100 lines = [ f"β’ Late delivery rate: {late:.1f}%", "β’ Action: Add 5β10 day buffers; fast-track chronic offenders; add service credits for misses.", ] return "π€ **[Rule-Based Risk]**\n" + "\n".join(lines) return ( "π€ **[Rule-Based]**\n" + "β’ I can analyze spend (top categories/vendors), vendor performance (best/worst), risk (late %), and trends.\n" + f"β’ Snapshot: {fmt_currency(k['total_spend'])}, {len(self.po_data):,} POs, {self.po_data['vendor'].nunique()} vendors, on-time {k['on_time_rate']*100:.1f}%" ) # ============================= # App State & Initialization # ============================= if "data_loaded" not in st.session_state: with st.spinner("π Generating synthetic SAP S/4HANA procurement data..."): st.session_state.po_df, st.session_state.spend_df = generate_synthetic_procurement_data() st.session_state.data_loaded = True @st.cache_resource(show_spinner=False) def get_llm_client() -> UniversalLLMClient: return UniversalLLMClient(LLMConfig()) client = get_llm_client() agent = UniversalProcurementAgent(st.session_state.po_df, st.session_state.spend_df, client) analytics = ProcurementAnalytics(st.session_state.po_df) status = { "available": client.available, "last_error": client.last_error or "OK", "model": client.cfg.model, } api_status = "π’ Connected" if status["available"] else "π΄ Not Connected" # ============================= # Header # ============================= st.markdown( ( "
Autonomous Intelligence for Procurement Excellence
" + f"LLM: {api_status} Β· Data: {len(st.session_state.po_df):,} POs" "π Active Portfolio
" "π Order Efficiency
" "π€ Strategic Partners
" "β± Performance
" "Ask me anything about your procurement data. I will answer with crisp bullets and actual metrics.
" + f"Status: {api_status} | Model: {status['model']}
" "{rec}
" + "π€ Universal AI Procurement Analytics | Crisp, metric-first answers in βΉ
" + f"Demo with synthetic data β’ {len(st.session_state.po_df):,} orders β’ LLM {api_status}
" "