import streamlit as st import numpy as np import pandas as pd import plotly.express as px import plotly.graph_objects as go from plotly.subplots import make_subplots from datetime import datetime, timedelta from sklearn.linear_model import LinearRegression import warnings warnings.filterwarnings('ignore') st.set_page_config( page_title="Profitability Intelligence Suite", page_icon="📊", layout="wide", initial_sidebar_state="collapsed" ) # Custom CSS st.markdown(""" """, unsafe_allow_html=True) @st.cache_data(show_spinner=False) def generate_synthetic_data(days=60, seed=42, rows_per_day=600): """Generate data with REALISTIC variance patterns""" rng = np.random.default_rng(seed) start_date = datetime.today().date() - timedelta(days=days) dates = pd.date_range(start_date, periods=days, freq="D") products = ["Premium Widget", "Standard Widget", "Economy Widget", "Deluxe Widget"] regions = ["Americas", "EMEA", "Asia Pacific"] channels = ["Direct Sales", "Distribution Partners", "E-Commerce"] base_price = {"Premium Widget": 120, "Standard Widget": 135, "Economy Widget": 110, "Deluxe Widget": 150} base_cost = {"Premium Widget": 70, "Standard Widget": 88, "Economy Widget": 60, "Deluxe Widget": 95} region_price_bump = {"Americas": 1.00, "EMEA": 1.03, "Asia Pacific": 0.97} region_cost_bump = {"Americas": 1.00, "EMEA": 1.02, "Asia Pacific": 1.01} channel_discount_mean = {"Direct Sales": 0.06, "Distribution Partners": 0.12, "E-Commerce": 0.04} channel_discount_std = {"Direct Sales": 0.02, "Distribution Partners": 0.03, "E-Commerce": 0.02} # Elasticity varies by segment seg_epsilon = {} for p in products: for r in regions: for c in channels: # More realistic elasticity range: -0.5 to -2.5 base_eps = rng.uniform(-2.5, -0.5) if c == "Distribution Partners": base_eps -= rng.uniform(0.3, 0.8) # More price sensitive if c == "E-Commerce": base_eps -= rng.uniform(0.2, 0.5) # Also price sensitive seg_epsilon[(p, r, c)] = base_eps records = [] for idx, d in enumerate(dates): dow = d.weekday() dow_mult = 1.0 + (0.08 if dow in (5, 6) else 0) # Add realistic seasonality and random shocks seasonal = 1.0 + 0.05*np.sin((d.toordinal()%365)/365*2*np.pi) # Random market shocks (some days have big changes) if rng.random() < 0.15: # 15% of days have shocks market_shock = rng.uniform(0.85, 1.15) else: market_shock = 1.0 # Gradual cost trends cost_trend = 1.0 + (idx / len(dates)) * 0.03 # 3% cost increase over period n = int(rows_per_day * market_shock * seasonal) prod = rng.choice(products, size=n, p=[0.35, 0.3, 0.2, 0.15]) reg = rng.choice(regions, size=n, p=[0.4, 0.35, 0.25]) ch = rng.choice(channels, size=n, p=[0.45, 0.35, 0.20]) base_p = np.array([base_price[x] for x in prod]) * np.array([region_price_bump[x] for x in reg]) base_c = np.array([base_cost[x] for x in prod]) * np.array([region_cost_bump[x] for x in reg]) * cost_trend # More variance in discounts discount = np.clip( np.array([channel_discount_mean[x] for x in ch]) + rng.normal(0, [channel_discount_std[x] * 2 for x in ch]), # Double the variance 0, 0.45 ) list_price = rng.normal(base_p, 8) # More price variance net_price = np.clip(list_price * (1 - discount), 20, None) unit_cost = np.clip(rng.normal(base_c, 6), 10, None) eps = np.array([seg_epsilon[(pp, rr, cc)] for pp, rr, cc in zip(prod, reg, ch)]) ref_price = np.array([base_price[x] for x in prod]) qty_mu = np.exp(eps * (net_price - ref_price) / np.maximum(ref_price, 1e-6)) qty = np.maximum(1, rng.poisson(8 * dow_mult * seasonal * market_shock * qty_mu)) revenue = net_price * qty cogs = unit_cost * qty gm_val = revenue - cogs gm_pct = np.where(revenue > 0, gm_val / revenue, 0.0) for i in range(n): records.append({ "date": d, "product": prod[i], "region": reg[i], "channel": ch[i], "list_price": float(list_price[i]), "discount_pct": float(discount[i]), "net_price": float(net_price[i]), "unit_cost": float(unit_cost[i]), "qty": int(qty[i]), "revenue": float(revenue[i]), "cogs": float(cogs[i]), "gm_value": float(gm_val[i]), "gm_pct": float(gm_pct[i]), "dow": dow }) df = pd.DataFrame(records) return df def analyze_margin_bridge(df, current_date, prior_date): """Professional Price-Volume-Mix (PVM) analysis""" current_data = df[df["date"] == current_date].copy() prior_data = df[df["date"] == prior_date].copy() current_total_revenue = current_data["revenue"].sum() current_total_cogs = current_data["cogs"].sum() current_total_gm = current_total_revenue - current_total_cogs current_gm_pct = current_total_gm / current_total_revenue if current_total_revenue > 0 else 0 prior_total_revenue = prior_data["revenue"].sum() prior_total_cogs = prior_data["cogs"].sum() prior_total_gm = prior_total_revenue - prior_total_cogs prior_gm_pct = prior_total_gm / prior_total_revenue if prior_total_revenue > 0 else 0 total_gm_variance = current_total_gm - prior_total_gm current_seg = current_data.groupby(["product", "region", "channel"]).agg({ "revenue": "sum", "cogs": "sum", "qty": "sum", "net_price": "mean", "unit_cost": "mean" }).reset_index() current_seg["gm"] = current_seg["revenue"] - current_seg["cogs"] current_seg["gm_pct"] = current_seg["gm"] / current_seg["revenue"] prior_seg = prior_data.groupby(["product", "region", "channel"]).agg({ "revenue": "sum", "cogs": "sum", "qty": "sum", "net_price": "mean", "unit_cost": "mean" }).reset_index() prior_seg["gm"] = prior_seg["revenue"] - prior_seg["cogs"] prior_seg["gm_pct"] = prior_seg["gm"] / prior_seg["revenue"] merged = pd.merge( current_seg, prior_seg, on=["product", "region", "channel"], suffixes=("_curr", "_prior"), how="outer" ).fillna(0) # PVM Decomposition merged["price_effect"] = (merged["net_price_curr"] - merged["net_price_prior"]) * merged["qty_curr"] merged["volume_effect"] = (merged["qty_curr"] - merged["qty_prior"]) * merged["net_price_prior"] * merged["gm_pct_prior"] merged["cost_effect"] = -(merged["unit_cost_curr"] - merged["unit_cost_prior"]) * merged["qty_curr"] merged["gm_variance"] = merged["gm_curr"] - merged["gm_prior"] merged["mix_effect"] = merged["gm_variance"] - (merged["price_effect"] + merged["volume_effect"] + merged["cost_effect"]) return merged, { "total_gm_variance": total_gm_variance, "price_effect_total": merged["price_effect"].sum(), "volume_effect_total": merged["volume_effect"].sum(), "cost_effect_total": merged["cost_effect"].sum(), "mix_effect_total": merged["mix_effect"].sum(), "current_gm": current_total_gm, "prior_gm": prior_total_gm, "current_gm_pct": current_gm_pct, "prior_gm_pct": prior_gm_pct } def estimate_segment_elasticity(df, product, region, channel): """Estimate price elasticity for a segment""" seg_df = df[(df["product"]==product)&(df["region"]==region)&(df["channel"]==channel)] if len(seg_df) < 100 or seg_df["net_price"].std() < 1e-6 or seg_df["qty"].std() < 1e-6: return -1.2, False # Default elasticity try: x = np.log(np.clip(seg_df["net_price"].values, 1e-6, None)).reshape(-1,1) y = np.log(np.clip(seg_df["qty"].values, 1e-6, None)) lin = LinearRegression().fit(x, y) elasticity = float(lin.coef_[0]) # Bound elasticity to realistic range elasticity = np.clip(elasticity, -5.0, -0.3) return elasticity, True except: return -1.2, False def find_optimal_discount(base_data, elasticity, search_range=(-10, 10)): """ Find profit-maximizing discount using price elasticity of demand Can recommend INCREASING or DECREASING discount """ current_discount = base_data["discount_pct"] current_list_price = base_data["list_price"] current_price = base_data["net_price"] current_cost = base_data["unit_cost"] current_qty = base_data["qty"] # Test discount changes from -10pp to +10pp discount_changes = np.linspace(search_range[0], search_range[1], 41) results = [] for disc_change in discount_changes: new_discount = np.clip(current_discount + (disc_change/100), 0.0, 0.50) new_price = current_list_price * (1 - new_discount) # Apply elasticity if current_price > 0: price_ratio = new_price / current_price new_qty = current_qty * (price_ratio ** elasticity) else: new_qty = current_qty new_revenue = new_price * new_qty new_cogs = current_cost * new_qty new_gm = new_revenue - new_cogs results.append({ "discount_change": disc_change, "new_discount": new_discount * 100, "new_price": new_price, "new_qty": new_qty, "new_gm": new_gm, "new_revenue": new_revenue }) results_df = pd.DataFrame(results) optimal_idx = results_df["new_gm"].idxmax() optimal = results_df.iloc[optimal_idx] current_gm = current_price * current_qty - current_cost * current_qty return { "current_discount": current_discount * 100, "optimal_discount": optimal["new_discount"], "discount_change": optimal["discount_change"], "current_price": current_price, "optimal_price": optimal["new_price"], "current_qty": current_qty, "optimal_qty": optimal["new_qty"], "current_gm": current_gm, "optimal_gm": optimal["new_gm"], "gm_uplift": optimal["new_gm"] - current_gm, "elasticity": elasticity, "all_scenarios": results_df } # Main App st.markdown('

🎯 Daily Profitability Variance Analysis

', unsafe_allow_html=True) st.markdown('

Understanding What Drives Daily Margin Changes

', unsafe_allow_html=True) # Generate data with realistic variance with st.spinner("🔄 Loading business data..."): df = generate_synthetic_data(days=60, seed=42, rows_per_day=600) # Calculate daily aggregates daily = df.groupby("date").agg( revenue=("revenue","sum"), cogs=("cogs","sum"), gm_value=("gm_value","sum"), qty=("qty","sum") ).reset_index() daily["gm_pct"] = np.where(daily["revenue"]>0, daily["gm_value"]/daily["revenue"], 0.0) current_date = daily["date"].max() prior_date = current_date - timedelta(days=1) current_row = daily[daily["date"]==current_date].iloc[0] prior_row = daily[daily["date"]==prior_date].iloc[0] week_ago_row = daily.iloc[-8] if len(daily) > 7 else current_row roll7 = daily["gm_pct"].tail(7).mean() gm_variance_pp = (current_row["gm_pct"] - prior_row["gm_pct"]) * 100 gm_variance_dollar = current_row["gm_value"] - prior_row["gm_value"] # Executive Dashboard st.markdown("### 📊 Executive Summary") col1, col2, col3, col4 = st.columns(4) with col1: st.metric( label="Gross Margin %", value=f"{current_row['gm_pct']*100:.2f}%", delta=f"{gm_variance_pp:+.2f}pp", delta_color="normal" ) with col2: st.metric( label="Gross Margin $", value=f"${current_row['gm_value']/1e6:.2f}M", delta=f"${gm_variance_dollar/1e6:+.2f}M", delta_color="normal" ) with col3: revenue_var_pct = ((current_row["revenue"] - prior_row["revenue"]) / prior_row["revenue"] * 100) if prior_row["revenue"] > 0 else 0 st.metric( label="Revenue", value=f"${current_row['revenue']/1e6:.2f}M", delta=f"{revenue_var_pct:+.1f}%", delta_color="normal" ) with col4: volume_var_pct = ((current_row["qty"] - prior_row["qty"]) / prior_row["qty"] * 100) if prior_row["qty"] > 0 else 0 st.metric( label="Volume (Units)", value=f"{current_row['qty']:,.0f}", delta=f"{volume_var_pct:+.1f}%", delta_color="normal" ) # Trend chart with REAL variance st.markdown("#### 📈 Gross Margin Trend (Last 30 Days)") recent_daily = daily.tail(30) fig_trend = go.Figure() fig_trend.add_trace(go.Scatter( x=recent_daily["date"], y=recent_daily["gm_pct"]*100, mode='lines+markers', name="GM%", line=dict(color="#1f77b4", width=3), fill='tozeroy', fillcolor="rgba(31, 119, 180, 0.1)" )) fig_trend.add_hline(y=roll7*100, line_dash="dash", line_color="red", annotation_text=f"7-Day Avg: {roll7*100:.2f}%", annotation_position="right") fig_trend.update_layout( xaxis_title="Date", yaxis_title="Gross Margin %", height=350, hovermode="x unified" ) st.plotly_chart(fig_trend, use_container_width=True) st.markdown("---") # Perform margin bridge analysis with st.spinner("🔬 Performing Price-Volume-Mix analysis..."): variance_detail, summary = analyze_margin_bridge(df, current_date, prior_date) # Main Tabs tab1, tab2, tab3 = st.tabs(["📊 Margin Bridge (PVM)", "🔍 Segment Deep Dive", "💡 Optimal Pricing"]) with tab1: st.markdown(f"### Gross Margin Bridge: {prior_date.strftime('%b %d')} → {current_date.strftime('%b %d')}") st.markdown(f"""
📋 Variance Summary:
Gross margin changed by ${gm_variance_dollar/1000:+.1f}K ({gm_variance_pp:+.2f} percentage points)
from {prior_row['gm_pct']*100:.2f}% to {current_row['gm_pct']*100:.2f}%
""", unsafe_allow_html=True) # Waterfall Chart st.markdown("#### Price-Volume-Mix (PVM) Waterfall Analysis") waterfall_data = pd.DataFrame({ "Category": [ f"{prior_date.strftime('%b %d')}
Gross Margin", "Price
Effect", "Volume
Effect", "Cost
Effect", "Mix
Effect", f"{current_date.strftime('%b %d')}
Gross Margin" ], "Value": [ summary["prior_gm"], summary["price_effect_total"], summary["volume_effect_total"], summary["cost_effect_total"], summary["mix_effect_total"], summary["current_gm"] ], "Type": ["absolute", "relative", "relative", "relative", "relative", "total"] }) fig_waterfall = go.Figure(go.Waterfall( orientation="v", measure=waterfall_data["Type"], x=waterfall_data["Category"], y=waterfall_data["Value"], text=[f"${v/1000:.1f}K" if abs(v) > 100 else f"${v:.0f}" for v in waterfall_data["Value"]], textposition="outside", connector={"line": {"color": "rgb(63, 63, 63)"}}, increasing={"marker": {"color": "#28a745"}}, decreasing={"marker": {"color": "#dc3545"}}, totals={"marker": {"color": "#1f77b4"}} )) fig_waterfall.update_layout( title="Gross Margin Variance Breakdown", showlegend=False, height=450, yaxis_title="Gross Margin ($)" ) st.plotly_chart(fig_waterfall, use_container_width=True) # Explanations col_exp1, col_exp2 = st.columns(2) with col_exp1: st.markdown(f"""
💰 Price Effect: ${summary['price_effect_total']/1000:+.1f}K
Impact of changes in realized selling prices
""", unsafe_allow_html=True) st.markdown(f"""
📦 Volume Effect: ${summary['volume_effect_total']/1000:+.1f}K
Impact of selling more/fewer units
""", unsafe_allow_html=True) with col_exp2: st.markdown(f"""
🏭 Cost Effect: ${summary['cost_effect_total']/1000:+.1f}K
Impact of changes in unit costs
""", unsafe_allow_html=True) st.markdown(f"""
🔀 Mix Effect: ${summary['mix_effect_total']/1000:+.1f}K
Impact of product/channel mix shifts
""", unsafe_allow_html=True) with tab2: st.markdown("### Segment-Level Variance Analysis") variance_detail_sorted = variance_detail.sort_values("gm_variance", ascending=False) col_seg1, col_seg2 = st.columns(2) with col_seg1: st.markdown("#### 📈 Top 5 Margin Gainers") for _, row in variance_detail_sorted.head(5).iterrows(): if row["gm_variance"] > 0: st.markdown(f"""
{row['product']}
{row['region']} • {row['channel']}
+${row['gm_variance']:.2f}
Price: ${row['price_effect']:+.2f} | Volume: ${row['volume_effect']:+.2f} | Cost: ${row['cost_effect']:+.2f}
""", unsafe_allow_html=True) with col_seg2: st.markdown("#### 📉 Top 5 Margin Losers") for _, row in variance_detail_sorted.tail(5).iterrows(): if row["gm_variance"] < 0: st.markdown(f"""
{row['product']}
{row['region']} • {row['channel']}
${row['gm_variance']:.2f}
Price: ${row['price_effect']:+.2f} | Volume: ${row['volume_effect']:+.2f} | Cost: ${row['cost_effect']:+.2f}
""", unsafe_allow_html=True) with tab3: st.markdown("### Optimal Pricing Analysis") st.markdown("""
🎯 Profit Maximization: Using price elasticity of demand to find the optimal discount level.
May recommend increasing or decreasing discount depending on elasticity.
""", unsafe_allow_html=True) # Get segments with meaningful volume recent_segments = df[df["date"] >= (current_date - timedelta(days=7))].groupby(["product", "region", "channel"]).agg({ "qty": "sum", "gm_value": "sum" }).reset_index() recent_segments = recent_segments[recent_segments["qty"] > 100] # Minimum volume threshold optimization_results = [] for _, seg in recent_segments.iterrows(): p, r, c = seg["product"], seg["region"], seg["channel"] hist = df[(df["product"]==p)&(df["region"]==r)&(df["channel"]==c)].sort_values("date") if hist.empty or len(hist) < 100: continue elasticity, is_valid = estimate_segment_elasticity(hist, p, r, c) if not is_valid: continue current_state = hist.iloc[-1] optimal_result = find_optimal_discount(current_state, elasticity) if abs(optimal_result["gm_uplift"]) > 5: # Only show meaningful opportunities optimization_results.append({ "Product": p, "Region": r, "Channel": c, "Current Discount": optimal_result["current_discount"], "Optimal Discount": optimal_result["optimal_discount"], "Discount Change": optimal_result["discount_change"], "Price Elasticity": elasticity, "Current GM/Day": optimal_result["current_gm"], "Optimal GM/Day": optimal_result["optimal_gm"], "Daily GM Uplift": optimal_result["gm_uplift"], "Direction": "Increase Discount" if optimal_result["discount_change"] > 0 else "Decrease Discount", "all_scenarios": optimal_result["all_scenarios"] }) opt_df = pd.DataFrame(optimization_results).sort_values("Daily GM Uplift", ascending=False) if len(opt_df) > 0: st.markdown("#### 🏆 Top 5 Optimization Opportunities") for i, (_, rec) in enumerate(opt_df.head(5).iterrows()): direction_color = "#ff7f0e" if rec["Direction"] == "Increase Discount" else "#1f77b4" st.markdown(f"""

#{i+1}: {rec['Product']} • {rec['Region']} • {rec['Channel']}

Elasticity: {rec['Price Elasticity']:.2f} ({"Elastic" if rec['Price Elasticity'] < -1.5 else "Inelastic"})

Recommendation: {rec['Direction']} by {abs(rec['Discount Change']):.1f}pp

Current: {rec['Current Discount']:.1f}% → Optimal: {rec['Optimal Discount']:.1f}%

💰 Expected Uplift: ${rec['Daily GM Uplift']:.2f}/day

Annual Impact: ${rec['Daily GM Uplift']*365/1000:.1f}K

""", unsafe_allow_html=True) # Show elasticity curve with st.expander(f"📊 View Profit Curve for {rec['Product']} • {rec['Region']} • {rec['Channel']}"): scenario_df = rec["all_scenarios"] fig_curve = go.Figure() fig_curve.add_trace(go.Scatter( x=scenario_df["new_discount"], y=scenario_df["new_gm"], mode='lines', name='Gross Margin', line=dict(color='#1f77b4', width=3) )) fig_curve.add_vline(x=rec["Current Discount"], line_dash="dash", line_color="red", annotation_text=f"Current: {rec['Current Discount']:.1f}%") fig_curve.add_vline(x=rec["Optimal Discount"], line_dash="dash", line_color="green", annotation_text=f"Optimal: {rec['Optimal Discount']:.1f}%") fig_curve.update_layout( title=f"Profit Maximization Curve (Elasticity: {rec['Price Elasticity']:.2f})", xaxis_title="Discount Level (%)", yaxis_title="Expected Gross Margin ($)", height=400 ) st.plotly_chart(fig_curve, use_container_width=True) st.markdown("---") st.markdown("#### Complete Optimization List") display_opt = opt_df[[ "Product", "Region", "Channel", "Current Discount", "Optimal Discount", "Discount Change", "Price Elasticity", "Daily GM Uplift", "Direction" ]].copy() st.dataframe(display_opt.style.format({ "Current Discount": "{:.1f}%", "Optimal Discount": "{:.1f}%", "Discount Change": "{:+.1f}pp", "Price Elasticity": "{:.2f}", "Daily GM Uplift": "${:,.2f}" }).background_gradient(subset=["Daily GM Uplift"], cmap="Greens"), use_container_width=True, height=400) st.download_button( label="📥 Download Optimization Plan (CSV)", data=opt_df.drop(columns=["all_scenarios"]).to_csv(index=False).encode("utf-8"), file_name=f"optimal_pricing_plan_{current_date.strftime('%Y%m%d')}.csv", mime="text/csv" ) else: st.info("All segments are currently near optimal pricing levels.") st.markdown("---") st.markdown("""
🔒 Demo Mode: Using synthetic transaction data with realistic variance patterns
""", unsafe_allow_html=True)