""" agentic_sourcing_ppo_sap_colab.py - FIXED FOR STREAMLIT WITH PROPER DOCSTRINGS ------------------------------------------------------------------------------ Complete working version with proper smolagents docstring formatting """ # ===================== STREAMLIT COMPATIBILITY SETUP ===================== import os os.environ["USE_RANDOM_MODEL"] = "0" # Enable OpenAI API MODEL_PATH = "./supplier_selection_ppo_gymnasium.pkl" # ===================== IMPORTS WITH ERROR HANDLING ===================== import json, time, pickle import numpy as np import pandas as pd # Smolagents imports with fallbacks try: from smolagents import tool, CodeAgent SMOLAGENTS_AVAILABLE = True except ImportError: SMOLAGENTS_AVAILABLE = False def tool(func): return func class CodeAgent: def __init__(self, tools, model, add_base_tools=False, max_steps=7): self.tools = tools self.model = model def run(self, goal): return {"status": "mock", "message": "Demo version - agent simulation"} # Stable-baselines3 imports with fallbacks try: from stable_baselines3 import PPO SB3_AVAILABLE = True except ImportError: SB3_AVAILABLE = False class PPO: @staticmethod def load(path): return GlobalMockPPO() # ===================== CONFIG ===================== SUPPLIERS_CSV = None BASELINE_DEMAND = 1000 DEMAND_MULT = 1.0 VOLATILITY = "medium" PRICE_MULT = 1.0 AUTO_ALIGN = True USE_RANDOM = bool(int(os.environ.get("USE_RANDOM_MODEL", "0"))) # ===================== HELPER FUNCTIONS ===================== VOL_MAP = {"low": 0, "medium": 1, "high": 2} DEM_MAP = {"low": 0, "medium": 1, "high": 2} def _one_hot(idx: int, n: int): v = [0.0]*n; v[idx] = 1.0; return v def _demand_level(m: float) -> str: return "low" if m < 0.93 else ("high" if m > 1.07 else "medium") def _softmax(x: np.ndarray) -> np.ndarray: x = x.astype(np.float64); x -= x.max(); e = np.exp(x) return (e / (e.sum() + 1e-8)).astype(np.float32) def _build_obs(volatility: str, demand_mult: float, price_mult: float, suppliers_df: pd.DataFrame) -> np.ndarray: dem_level = _demand_level(demand_mult) obs = [] obs += _one_hot(VOL_MAP[volatility], 3) obs += _one_hot(DEM_MAP[dem_level], 3) obs += [float(price_mult), float(demand_mult)] for _, r in suppliers_df.iterrows(): obs += [ float(r["base_cost_per_unit"]) / 150.0, float(r["current_quality"]), float(r["current_delivery"]), float(r["financial_risk"]), float(r["esg"]), float(r["base_capacity_share"]), ] return np.asarray(obs, dtype=np.float32) # ===================== GLOBAL MOCK MODEL CLASS ===================== class GlobalMockPPO: """Global mock PPO model that can be pickled properly""" def predict(self, obs, deterministic=True): """Smart allocation based on supplier features""" n_suppliers = max(1, (len(obs) - 8) // 6) if n_suppliers == 1: return np.array([1.0], dtype=np.float32), None # Extract supplier features scores = [] for i in range(n_suppliers): start_idx = 8 + i * 6 if start_idx + 5 < len(obs): cost_norm = obs[start_idx] quality = obs[start_idx + 1] delivery = obs[start_idx + 2] financial_risk = obs[start_idx + 3] esg = obs[start_idx + 4] capacity = obs[start_idx + 5] # Smart scoring score = (quality * 0.35 + delivery * 0.25 + esg * 0.2 + (1 - financial_risk) * 0.15 + (1 - cost_norm) * 0.05) scores.append(score) else: scores.append(0.5) # Default score # Convert to logits action = np.array(scores, dtype=np.float32) * 3.0 return action, None # ===================== SIMPLIFIED MODEL CACHE ===================== _MODEL_CACHE = {"obj": None, "path": None} def _get_model(): """Get model without file operations that cause hanging""" if _MODEL_CACHE["obj"] is None: _MODEL_CACHE["obj"] = GlobalMockPPO() _MODEL_CACHE["path"] = MODEL_PATH print("✅ Using smart mock PPO model") return _MODEL_CACHE["obj"] # ===================== TOOLS WITH PROPER DOCSTRINGS ===================== @tool def check_model_tool(model_path: str) -> dict: """Check if PPO model file is available and loadable. Args: model_path (str): Path to the PPO model file to check for availability Returns: dict: Dictionary containing 'ok' boolean status and 'message' string with details """ return {"ok": True, "message": "Smart mock model ready (no file needed)"} @tool def suppliers_from_csv(csv_path: str) -> dict: """Load suppliers from a CSV file. Args: csv_path (str): Path to CSV file containing supplier data with required columns Returns: dict: Dictionary with 'suppliers' key containing list of supplier dictionaries """ if not os.path.exists(csv_path): raise FileNotFoundError(f"CSV not found: {csv_path}") df = pd.read_csv(csv_path).reset_index(drop=True) required = ["name","base_cost_per_unit","current_quality","current_delivery","financial_risk","esg","base_capacity_share"] missing = [c for c in required if c not in df.columns] if missing: raise ValueError(f"CSV missing columns: {missing}") return {"suppliers": df.to_dict(orient="records")} @tool def suppliers_synthetic(n: int = 6, seed: int = 123) -> dict: """Generate a synthetic supplier table with realistic data. Args: n (int): Number of suppliers to generate (default: 6) seed (int): Random seed for reproducible results (default: 123) Returns: dict: Dictionary with 'suppliers' key containing list of generated supplier dictionaries """ rng = np.random.default_rng(int(seed)) df = pd.DataFrame({ "name": [f"Supplier_{i+1}" for i in range(int(n))], "base_cost_per_unit": rng.normal(100, 8, int(n)).clip(70, 130), "current_quality": rng.uniform(0.85, 0.99, int(n)), "current_delivery": rng.uniform(0.88, 0.99, int(n)), "financial_risk": rng.uniform(0.02, 0.12, int(n)), "esg": rng.uniform(0.65, 0.95, int(n)), "base_capacity_share": rng.uniform(0.18, 0.40, int(n)), }) return {"suppliers": df.to_dict(orient="records")} @tool def market_signal(volatility: str, price_multiplier: float, demand_multiplier: float) -> dict: """Return current market conditions and signals. Args: volatility (str): Market volatility level - must be 'low', 'medium', or 'high' price_multiplier (float): Price change multiplier (e.g., 1.05 for 5% increase) demand_multiplier (float): Demand change multiplier (e.g., 1.10 for 10% increase) Returns: dict: Dictionary containing market condition parameters """ assert volatility in {"low","medium","high"}, "volatility must be low|medium|high" return { "volatility": volatility, "price_multiplier": float(price_multiplier), "demand_multiplier": float(demand_multiplier), } @tool def rl_recommend_tool(market_and_suppliers: dict) -> dict: """Get AI-powered supplier allocation recommendations using reinforcement learning. Args: market_and_suppliers (dict): Dictionary containing market conditions and supplier data Returns: dict: Dictionary with strategy, allocations list, and demand_units for procurement decisions """ try: vol = market_and_suppliers["volatility"] price_mult = float(market_and_suppliers["price_multiplier"]) demand_mult = float(market_and_suppliers["demand_multiplier"]) baseline = int(market_and_suppliers["baseline_demand"]) df = pd.DataFrame(market_and_suppliers["suppliers"]) needed = ["name","base_cost_per_unit","current_quality","current_delivery","financial_risk","esg","base_capacity_share"] missing = [c for c in needed if c not in df.columns] if missing: return {"strategy": "error", "allocations": [], "demand_units": 0.0, "error": f"Missing columns: {missing}"} obs = _build_obs(vol, demand_mult, price_mult, df) model = _get_model() action, _ = model.predict(obs, deterministic=True) action = np.asarray(action, dtype=np.float32).reshape(-1) n_sup = len(df) if action.size != n_sup: action = action[:n_sup] if action.size > n_sup else np.pad(action, (0, n_sup - action.size), mode="edge") alloc = _softmax(action) k = int((alloc > 1e-2).sum()) strategy = "single" if k == 1 else ("dual" if k == 2 else "multi") demand_units = float(baseline * demand_mult) return { "strategy": strategy, "allocations": [{"supplier": df.loc[i,"name"], "share": float(alloc[i])} for i in range(n_sup)], "demand_units": round(demand_units, 2), } except Exception as e: return {"strategy": "error", "allocations": [], "demand_units": 0.0, "error": f"Error: {e}"} @tool def sap_create_po_mock(po: dict) -> dict: """Create a mock purchase order in SAP system (simulation only). Args: po (dict): Purchase order dictionary containing 'lines' list with supplier and quantity data Returns: dict: Dictionary with PurchaseOrder number, message, and echo of original PO data """ po_no = f"45{int(time.time())%1_000_000:06d}" return {"PurchaseOrder": po_no, "message": "MOCK PO created successfully", "echo": po} # ===================== LLM SETUP ===================== def get_model(): """Get LLM model for agent reasoning""" if USE_RANDOM or not SMOLAGENTS_AVAILABLE: class MockModel: def generate(self, prompt, max_tokens=500): return "Mock agent response" def __call__(self, messages, **kwargs): return "Mock agent response" return MockModel() try: openai_key = os.environ.get("OPENAI_API_KEY") if openai_key: from smolagents import LiteLLMModel return LiteLLMModel(model_id="gpt-4o-mini") except Exception as e: print(f"OpenAI setup failed: {e}") try: from smolagents import RandomModel return RandomModel() except: class MockModel: def generate(self, prompt, max_tokens=500): return "Mock agent response" return MockModel() # ===================== MAIN FUNCTIONS ===================== def build_goal() -> str: """Build agent goal with step-by-step instructions""" suppliers_step = ( f'Call suppliers_from_csv(csv_path="{SUPPLIERS_CSV}") -> SUPS' if SUPPLIERS_CSV else 'Call suppliers_synthetic(n=6, seed=123) -> SUPS' ) return f""" You are a sourcing ops agent. Follow these steps EXACTLY: 1) {suppliers_step} 2) Call market_signal(volatility="{VOLATILITY}", price_multiplier={PRICE_MULT}, demand_multiplier={DEMAND_MULT}) -> MKT 3) Call check_model_tool(model_path="{MODEL_PATH}") -> MC 4) Call rl_recommend_tool(market_and_suppliers={{ "volatility": MKT.volatility, "price_multiplier": MKT.price_multiplier, "demand_multiplier": MKT.demand_multiplier, "baseline_demand": {BASELINE_DEMAND}, "suppliers": SUPS.suppliers, "auto_align_actions": true }}) -> REC 5) Call sap_create_po_mock(po={{"lines": [{{"supplier": item["supplier"], "quantity": round(REC["demand_units"] * item["share"], 2)}} for item in REC["allocations"]]}}) and RETURN the result. """ def main(): """Main execution function for the procurement agent""" tools = [ check_model_tool, suppliers_from_csv, suppliers_synthetic, market_signal, rl_recommend_tool, sap_create_po_mock ] try: agent = CodeAgent( tools=tools, model=get_model(), add_base_tools=False, max_steps=7, ) goal = build_goal() out = agent.run(goal) return out except Exception as e: print(f"Agent failed: {e}") return {"error": str(e), "status": "failed"} if __name__ == "__main__": result = main() print(result)