AgenticProcurement / agentic_sourcing_ppo_sap_colab.py
PD03's picture
Update agentic_sourcing_ppo_sap_colab.py
107143e verified
"""
agentic_sourcing_ppo_sap_colab.py - FIXED FOR STREAMLIT WITH PROPER DOCSTRINGS
------------------------------------------------------------------------------
Complete working version with proper smolagents docstring formatting
"""
# ===================== STREAMLIT COMPATIBILITY SETUP =====================
import os
os.environ["USE_RANDOM_MODEL"] = "0" # Enable OpenAI API
MODEL_PATH = "./supplier_selection_ppo_gymnasium.pkl"
# ===================== IMPORTS WITH ERROR HANDLING =====================
import json, time, pickle
import numpy as np
import pandas as pd
# Smolagents imports with fallbacks
try:
from smolagents import tool, CodeAgent
SMOLAGENTS_AVAILABLE = True
except ImportError:
SMOLAGENTS_AVAILABLE = False
def tool(func):
return func
class CodeAgent:
def __init__(self, tools, model, add_base_tools=False, max_steps=7):
self.tools = tools
self.model = model
def run(self, goal):
return {"status": "mock", "message": "Demo version - agent simulation"}
# Stable-baselines3 imports with fallbacks
try:
from stable_baselines3 import PPO
SB3_AVAILABLE = True
except ImportError:
SB3_AVAILABLE = False
class PPO:
@staticmethod
def load(path):
return GlobalMockPPO()
# ===================== CONFIG =====================
SUPPLIERS_CSV = None
BASELINE_DEMAND = 1000
DEMAND_MULT = 1.0
VOLATILITY = "medium"
PRICE_MULT = 1.0
AUTO_ALIGN = True
USE_RANDOM = bool(int(os.environ.get("USE_RANDOM_MODEL", "0")))
# ===================== HELPER FUNCTIONS =====================
VOL_MAP = {"low": 0, "medium": 1, "high": 2}
DEM_MAP = {"low": 0, "medium": 1, "high": 2}
def _one_hot(idx: int, n: int):
v = [0.0]*n; v[idx] = 1.0; return v
def _demand_level(m: float) -> str:
return "low" if m < 0.93 else ("high" if m > 1.07 else "medium")
def _softmax(x: np.ndarray) -> np.ndarray:
x = x.astype(np.float64); x -= x.max(); e = np.exp(x)
return (e / (e.sum() + 1e-8)).astype(np.float32)
def _build_obs(volatility: str, demand_mult: float, price_mult: float, suppliers_df: pd.DataFrame) -> np.ndarray:
dem_level = _demand_level(demand_mult)
obs = []
obs += _one_hot(VOL_MAP[volatility], 3)
obs += _one_hot(DEM_MAP[dem_level], 3)
obs += [float(price_mult), float(demand_mult)]
for _, r in suppliers_df.iterrows():
obs += [
float(r["base_cost_per_unit"]) / 150.0,
float(r["current_quality"]),
float(r["current_delivery"]),
float(r["financial_risk"]),
float(r["esg"]),
float(r["base_capacity_share"]),
]
return np.asarray(obs, dtype=np.float32)
# ===================== GLOBAL MOCK MODEL CLASS =====================
class GlobalMockPPO:
"""Global mock PPO model that can be pickled properly"""
def predict(self, obs, deterministic=True):
"""Smart allocation based on supplier features"""
n_suppliers = max(1, (len(obs) - 8) // 6)
if n_suppliers == 1:
return np.array([1.0], dtype=np.float32), None
# Extract supplier features
scores = []
for i in range(n_suppliers):
start_idx = 8 + i * 6
if start_idx + 5 < len(obs):
cost_norm = obs[start_idx]
quality = obs[start_idx + 1]
delivery = obs[start_idx + 2]
financial_risk = obs[start_idx + 3]
esg = obs[start_idx + 4]
capacity = obs[start_idx + 5]
# Smart scoring
score = (quality * 0.35 + delivery * 0.25 + esg * 0.2 +
(1 - financial_risk) * 0.15 + (1 - cost_norm) * 0.05)
scores.append(score)
else:
scores.append(0.5) # Default score
# Convert to logits
action = np.array(scores, dtype=np.float32) * 3.0
return action, None
# ===================== SIMPLIFIED MODEL CACHE =====================
_MODEL_CACHE = {"obj": None, "path": None}
def _get_model():
"""Get model without file operations that cause hanging"""
if _MODEL_CACHE["obj"] is None:
_MODEL_CACHE["obj"] = GlobalMockPPO()
_MODEL_CACHE["path"] = MODEL_PATH
print("✅ Using smart mock PPO model")
return _MODEL_CACHE["obj"]
# ===================== TOOLS WITH PROPER DOCSTRINGS =====================
@tool
def check_model_tool(model_path: str) -> dict:
"""Check if PPO model file is available and loadable.
Args:
model_path (str): Path to the PPO model file to check for availability
Returns:
dict: Dictionary containing 'ok' boolean status and 'message' string with details
"""
return {"ok": True, "message": "Smart mock model ready (no file needed)"}
@tool
def suppliers_from_csv(csv_path: str) -> dict:
"""Load suppliers from a CSV file.
Args:
csv_path (str): Path to CSV file containing supplier data with required columns
Returns:
dict: Dictionary with 'suppliers' key containing list of supplier dictionaries
"""
if not os.path.exists(csv_path):
raise FileNotFoundError(f"CSV not found: {csv_path}")
df = pd.read_csv(csv_path).reset_index(drop=True)
required = ["name","base_cost_per_unit","current_quality","current_delivery","financial_risk","esg","base_capacity_share"]
missing = [c for c in required if c not in df.columns]
if missing:
raise ValueError(f"CSV missing columns: {missing}")
return {"suppliers": df.to_dict(orient="records")}
@tool
def suppliers_synthetic(n: int = 6, seed: int = 123) -> dict:
"""Generate a synthetic supplier table with realistic data.
Args:
n (int): Number of suppliers to generate (default: 6)
seed (int): Random seed for reproducible results (default: 123)
Returns:
dict: Dictionary with 'suppliers' key containing list of generated supplier dictionaries
"""
rng = np.random.default_rng(int(seed))
df = pd.DataFrame({
"name": [f"Supplier_{i+1}" for i in range(int(n))],
"base_cost_per_unit": rng.normal(100, 8, int(n)).clip(70, 130),
"current_quality": rng.uniform(0.85, 0.99, int(n)),
"current_delivery": rng.uniform(0.88, 0.99, int(n)),
"financial_risk": rng.uniform(0.02, 0.12, int(n)),
"esg": rng.uniform(0.65, 0.95, int(n)),
"base_capacity_share": rng.uniform(0.18, 0.40, int(n)),
})
return {"suppliers": df.to_dict(orient="records")}
@tool
def market_signal(volatility: str, price_multiplier: float, demand_multiplier: float) -> dict:
"""Return current market conditions and signals.
Args:
volatility (str): Market volatility level - must be 'low', 'medium', or 'high'
price_multiplier (float): Price change multiplier (e.g., 1.05 for 5% increase)
demand_multiplier (float): Demand change multiplier (e.g., 1.10 for 10% increase)
Returns:
dict: Dictionary containing market condition parameters
"""
assert volatility in {"low","medium","high"}, "volatility must be low|medium|high"
return {
"volatility": volatility,
"price_multiplier": float(price_multiplier),
"demand_multiplier": float(demand_multiplier),
}
@tool
def rl_recommend_tool(market_and_suppliers: dict) -> dict:
"""Get AI-powered supplier allocation recommendations using reinforcement learning.
Args:
market_and_suppliers (dict): Dictionary containing market conditions and supplier data
Returns:
dict: Dictionary with strategy, allocations list, and demand_units for procurement decisions
"""
try:
vol = market_and_suppliers["volatility"]
price_mult = float(market_and_suppliers["price_multiplier"])
demand_mult = float(market_and_suppliers["demand_multiplier"])
baseline = int(market_and_suppliers["baseline_demand"])
df = pd.DataFrame(market_and_suppliers["suppliers"])
needed = ["name","base_cost_per_unit","current_quality","current_delivery","financial_risk","esg","base_capacity_share"]
missing = [c for c in needed if c not in df.columns]
if missing:
return {"strategy": "error", "allocations": [], "demand_units": 0.0,
"error": f"Missing columns: {missing}"}
obs = _build_obs(vol, demand_mult, price_mult, df)
model = _get_model()
action, _ = model.predict(obs, deterministic=True)
action = np.asarray(action, dtype=np.float32).reshape(-1)
n_sup = len(df)
if action.size != n_sup:
action = action[:n_sup] if action.size > n_sup else np.pad(action, (0, n_sup - action.size), mode="edge")
alloc = _softmax(action)
k = int((alloc > 1e-2).sum())
strategy = "single" if k == 1 else ("dual" if k == 2 else "multi")
demand_units = float(baseline * demand_mult)
return {
"strategy": strategy,
"allocations": [{"supplier": df.loc[i,"name"], "share": float(alloc[i])} for i in range(n_sup)],
"demand_units": round(demand_units, 2),
}
except Exception as e:
return {"strategy": "error", "allocations": [], "demand_units": 0.0,
"error": f"Error: {e}"}
@tool
def sap_create_po_mock(po: dict) -> dict:
"""Create a mock purchase order in SAP system (simulation only).
Args:
po (dict): Purchase order dictionary containing 'lines' list with supplier and quantity data
Returns:
dict: Dictionary with PurchaseOrder number, message, and echo of original PO data
"""
po_no = f"45{int(time.time())%1_000_000:06d}"
return {"PurchaseOrder": po_no, "message": "MOCK PO created successfully", "echo": po}
# ===================== LLM SETUP =====================
def get_model():
"""Get LLM model for agent reasoning"""
if USE_RANDOM or not SMOLAGENTS_AVAILABLE:
class MockModel:
def generate(self, prompt, max_tokens=500):
return "Mock agent response"
def __call__(self, messages, **kwargs):
return "Mock agent response"
return MockModel()
try:
openai_key = os.environ.get("OPENAI_API_KEY")
if openai_key:
from smolagents import LiteLLMModel
return LiteLLMModel(model_id="gpt-4o-mini")
except Exception as e:
print(f"OpenAI setup failed: {e}")
try:
from smolagents import RandomModel
return RandomModel()
except:
class MockModel:
def generate(self, prompt, max_tokens=500):
return "Mock agent response"
return MockModel()
# ===================== MAIN FUNCTIONS =====================
def build_goal() -> str:
"""Build agent goal with step-by-step instructions"""
suppliers_step = (
f'Call suppliers_from_csv(csv_path="{SUPPLIERS_CSV}") -> SUPS'
if SUPPLIERS_CSV else
'Call suppliers_synthetic(n=6, seed=123) -> SUPS'
)
return f"""
You are a sourcing ops agent. Follow these steps EXACTLY:
1) {suppliers_step}
2) Call market_signal(volatility="{VOLATILITY}", price_multiplier={PRICE_MULT}, demand_multiplier={DEMAND_MULT}) -> MKT
3) Call check_model_tool(model_path="{MODEL_PATH}") -> MC
4) Call rl_recommend_tool(market_and_suppliers={{
"volatility": MKT.volatility,
"price_multiplier": MKT.price_multiplier,
"demand_multiplier": MKT.demand_multiplier,
"baseline_demand": {BASELINE_DEMAND},
"suppliers": SUPS.suppliers,
"auto_align_actions": true
}}) -> REC
5) Call sap_create_po_mock(po={{"lines": [{{"supplier": item["supplier"], "quantity": round(REC["demand_units"] * item["share"], 2)}} for item in REC["allocations"]]}}) and RETURN the result.
"""
def main():
"""Main execution function for the procurement agent"""
tools = [
check_model_tool,
suppliers_from_csv,
suppliers_synthetic,
market_signal,
rl_recommend_tool,
sap_create_po_mock
]
try:
agent = CodeAgent(
tools=tools,
model=get_model(),
add_base_tools=False,
max_steps=7,
)
goal = build_goal()
out = agent.run(goal)
return out
except Exception as e:
print(f"Agent failed: {e}")
return {"error": str(e), "status": "failed"}
if __name__ == "__main__":
result = main()
print(result)