import streamlit as st import pandas as pd from prophet import Prophet from datetime import datetime, timedelta import numpy as np import plotly.graph_objects as go import os from dotenv import load_dotenv from simple_salesforce import Salesforce import logging from reportlab.lib.pagesizes import letter from reportlab.pdfgen import canvas from reportlab.lib.units import inch from io import BytesIO import base64 from reportlab.platypus import Image import plotly.io as pio import sys import argparse # Load environment variables from .env file load_dotenv() # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Salesforce connection try: sf = Salesforce( username=os.getenv("SF_USERNAME"), password=os.getenv("SF_PASSWORD"), security_token=os.getenv("SF_SECURITY_TOKEN"), instance_url=os.getenv("SF_INSTANCE_URL") ) logger.info("✅ Connected to Salesforce") logger.info(f"Connected Salesforce user: {sf.username}") except Exception as e: logger.error(f"❌ Salesforce connection failed: {e}") sf = None # File to store forecast data DATA_FILE = "/public/forecast_data.csv" def prepare_prophet_data(usage_series): end_date = datetime.now() start_date = end_date - timedelta(days=len(usage_series) - 1) dates = [start_date + timedelta(days=i) for i in range(len(usage_series))] prophet_df = pd.DataFrame({'ds': dates, 'y': usage_series}) prophet_df['cap'] = 60 prophet_df['floor'] = 0 return prophet_df def train_model_with_usage(usage_series): prophet_df = prepare_prophet_data(usage_series) model = Prophet( yearly_seasonality=False, weekly_seasonality=True, daily_seasonality=True, changepoint_prior_scale=0.002, growth='logistic' ) model.fit(prophet_df) return model def make_forecast(model, periods): future = model.make_future_dataframe(periods=periods) future['cap'] = 60 future['floor'] = 0 forecast = model.predict(future) daily_forecasts = forecast['yhat'].tail(periods).tolist() return round(sum(max(0, y) for y in daily_forecasts)) def get_daily_forecasts(model, periods=30): future = model.make_future_dataframe(periods=periods) future['cap'] = 60 future['floor'] = 0 forecast = model.predict(future) daily_forecasts = forecast[['ds', 'yhat']].tail(periods) daily_forecasts['yhat'] = daily_forecasts['yhat'].apply(lambda x: max(0, round(x))) return daily_forecasts def calculate_reorder_date(model, current_stock, lead_time_days=3, safety_threshold=0): future = model.make_future_dataframe(periods=30) future['cap'] = 60 future['floor'] = 0 forecast = model.predict(future) daily_forecasts = forecast[['ds', 'yhat']].tail(30) stock = current_stock for _, row in daily_forecasts.iterrows(): daily_usage = max(0, round(row['yhat'])) stock -= daily_usage if stock <= safety_threshold: stockout_date = row['ds'] reorder_date = stockout_date - timedelta(days=lead_time_days) if reorder_date < datetime.now(): reorder_date = datetime.now().date() return reorder_date.strftime('%Y-%m-%d') return None def validate_usage_series(usage_str): try: usage_list = [float(x) for x in usage_str.split(',')] logger.info(f"Input usage series length: {len(usage_list)}") if len(usage_list) != 60: return None, f"Usage series must contain exactly 60 values. Found {len(usage_list)} values." if any(x < 0 for x in usage_list): return None, "Usage values must be non-negative." return usage_list, None except: return None, "Invalid usage series format. Please enter 60 comma-separated numbers." def generate_forecast_pdf(forecast_data: dict, daily_forecasts: pd.DataFrame, alert_status: list, current_stock: int, forecast_7: int, forecast_14: int, forecast_30: int, fig_daily: go.Figure, fig_alerts: go.Figure, usage_series: str) -> BytesIO: try: logger.info("Starting PDF generation") if not isinstance(forecast_data, dict) or not forecast_data: logger.error("Invalid forecast_data: Must be a non-empty dictionary") return None if not isinstance(daily_forecasts, pd.DataFrame) or daily_forecasts.empty: logger.error("Invalid daily_forecasts: Must be a non-empty DataFrame") return None if not isinstance(alert_status, list) or len(alert_status) != 3: logger.error("Invalid alert_status: Must be a list of 3 booleans") return None if not isinstance(usage_series, str) or not usage_series: logger.error("Invalid usage_series: Must be a non-empty string") return None if not isinstance(fig_daily, go.Figure) or not isinstance(fig_alerts, go.Figure): logger.error("Invalid Plotly figures: fig_daily and fig_alerts must be valid go.Figure objects") return None pdf_file = BytesIO() c = canvas.Canvas(pdf_file, pagesize=letter) c.setFont("Helvetica", 12) c.drawString(1 * inch, 10 * inch, "Consumables Forecast Report") c.setFont("Helvetica", 10) y_position = 9.5 * inch logger.info("Initialized PDF canvas") logger.info("Writing forecast data") for key, value in forecast_data.items(): display_key = key.replace('_', ' ').title() value_str = str(value) c.drawString(1 * inch, y_position, f"{display_key}: {value_str}") y_position -= 0.3 * inch y_position -= 0.3 * inch c.drawString(1 * inch, y_position, "Last 60 Days Usage (comma-separated):") y_position -= 0.3 * inch text_object = c.beginText(1 * inch, y_position) text_object.setFont("Helvetica", 10) text_lines = [usage_series[i:i+50] for i in range(0, len(usage_series), 50)] for line in text_lines: text_object.textLine(line) y_position -= 0.3 * inch c.drawText(text_object) logger.info("Added usage series") y_position -= 0.3 * inch c.drawString(1 * inch, y_position, "Daily Forecast Values (Next 30 Days):") y_position -= 0.3 * inch daily_values = ", ".join([str(int(x)) for x in daily_forecasts['yhat'].tolist()]) text_object = c.beginText(1 * inch, y_position) text_object.setFont("Helvetica", 10) text_lines = [daily_values[i:i+50] for i in range(0, len(daily_values), 50)] for line in text_lines: text_object.textLine(line) y_position -= 0.3 * inch c.drawText(text_object) logger.info("Added daily forecast values") y_position -= 0.3 * inch c.drawString(1 * inch, y_position, "Threshold Alerts:") y_position -= 0.3 * inch for forecast, period, alert in zip([forecast_7, forecast_14, forecast_30], ['7-day', '14-day', '30-day'], alert_status): flag_indicator = "[Flag] " if alert else "" if alert: alert_text = f"{flag_indicator}Alert: Current stock ({current_stock}) is below {period} forecast ({forecast})." else: alert_text = f"No alert for {period} forecast." c.drawString(1 * inch, y_position, alert_text) y_position -= 0.3 * inch logger.info("Added threshold alerts") y_position -= 0.3 * inch c.drawString(1 * inch, y_position, "Daily Forecast Visualization Data (Next 30 Days):") y_position -= 0.3 * inch for index, row in daily_forecasts.iterrows(): date_str = row['ds'].strftime('%Y-%m-%d') forecast_value = int(row['yhat']) c.drawString(1 * inch, y_position, f"Date: {date_str}, Forecast: {forecast_value} units") y_position -= 0.3 * inch if y_position < 1 * inch: c.showPage() c.setFont("Helvetica", 10) y_position = 10 * inch logger.info("Added daily forecast visualization data") y_position -= 0.3 * inch if y_position < 4 * inch: c.showPage() y_position = 10 * inch c.drawString(1 * inch, y_position, "Daily Forecast Visualization (Next 30 Days):") y_position -= 0.3 * inch daily_chart_img = BytesIO() try: pio.write_image(fig_daily, daily_chart_img, format='png', width=600, height=400) daily_chart_img.seek(0) img = Image(daily_chart_img, width=6 * inch, height=4 * inch) img.drawOn(c, 1 * inch, y_position - 4 * inch) logger.info("Added daily forecast visualization image") except Exception as e: logger.error(f"Failed to export daily forecast image: {str(e)}") c.drawString(1 * inch, y_position - 0.3 * inch, "Error: Could not include daily forecast visualization.") y_position -= 4.5 * inch if y_position < 2 * inch: c.showPage() c.setFont("Helvetica", 10) y_position = 10 * inch c.drawString(1 * inch, y_position, "Threshold Alerts Visualization Data:") y_position -= 0.3 * inch alert_data = pd.DataFrame({ 'Category': ['Current Stock', '7-Day Forecast', '14-Day Forecast', '30-Day Forecast'], 'Units': [current_stock, forecast_7, forecast_14, forecast_30], 'Alert': [False] + alert_status }) for _, row in alert_data.iterrows(): alert_text = f"Category: {row['Category']}, Units: {row['Units']}, Alert: {'Yes' if row['Alert'] else 'No'}" c.drawString(1 * inch, y_position, alert_text) y_position -= 0.3 * inch if y_position < 1 * inch: c.showPage() c.setFont("Helvetica", 10) y_position = 10 * inch logger.info("Added threshold alerts visualization data") y_position -= 0.3 * inch if y_position < 4 * inch: c.showPage() y_position = 10 * inch c.drawString(1 * inch, y_position, "Threshold Alerts Visualization:") y_position -= 0.3 * inch alerts_chart_img = BytesIO() try: pio.write_image(fig_alerts, alerts_chart_img, format='png', width=600, height=400) alerts_chart_img.seek(0) img = Image(alerts_chart_img, width=6 * inch, height=4 * inch) img.drawOn(c, 1 * inch, y_position - 4 * inch) logger.info("Added threshold alerts visualization image") except Exception as e: logger.error(f"Failed to export alerts visualization image: {str(e)}") c.drawString(1 * inch, y_position - 0.3 * inch, "Error: Could not include threshold alerts visualization.") c.showPage() c.save() pdf_file.seek(0) logger.info("PDF generation completed successfully") return pdf_file except Exception as e: logger.error(f"Error generating PDF: {str(e)}") return None def upload_pdf_to_salesforce(pdf_file: BytesIO, consumable_type: str, record_id: str) -> str: try: if not sf: return None encoded_pdf_data = base64.b64encode(pdf_file.getvalue()).decode('utf-8') content_version_data = { "Title": f"{consumable_type} - Consumables Forecast PDF", "PathOnClient": f"{consumable_type}_Consumables_Forecast.pdf", "VersionData": encoded_pdf_data, "FirstPublishLocationId": record_id } content_version = sf.ContentVersion.create(content_version_data) content_version_id = content_version["id"] result = sf.query(f"SELECT Id, ContentDocumentId FROM ContentVersion WHERE Id = '{content_version_id}'") if not result['records']: return None file_url = f"https://{sf.sf_instance}/sfc/servlet.shepherd/version/download/{content_version_id}" return file_url except Exception as e: logger.error(f"Error uploading PDF to Salesforce: {str(e)}") return None def save_forecast_data(consumable_type, usage_series, current_stock, daily_forecasts): try: usage_str = ','.join(map(str, usage_series)) forecast_data = { 'consumable_type': [consumable_type], 'usage_series': [usage_str], 'current_stock': [current_stock], 'forecast_date': [daily_forecasts['ds'].astype(str).tolist()], 'forecast_yhat': [daily_forecasts['yhat'].tolist()] } df = pd.DataFrame(forecast_data) if os.path.exists(DATA_FILE): existing_df = pd.read_csv(DATA_FILE) existing_df = existing_df[existing_df['consumable_type'] != consumable_type] df = pd.concat([existing_df, df], ignore_index=True) df.to_csv(DATA_FILE, index=False) logger.info(f"Saved forecast data for {consumable_type} to {DATA_FILE}") except Exception as e: logger.error(f"Error saving forecast data: {str(e)}") def load_forecast_data(consumable_type): try: if not os.path.exists(DATA_FILE): logger.warning(f"No forecast data file found at {DATA_FILE}") return None, None, None df = pd.read_csv(DATA_FILE) row = df[df['consumable_type'] == consumable_type] if row.empty: logger.warning(f"No data found for {consumable_type} in {DATA_FILE}") return None, None, None usage_series = [float(x) for x in row['usage_series'].iloc[0].split(',')] current_stock = float(row['current_stock'].iloc[0]) forecast_dates = eval(row['forecast_date'].iloc[0]) forecast_yhat = eval(row['forecast_yhat'].iloc[0]) daily_forecasts = pd.DataFrame({'ds': pd.to_datetime(forecast_dates), 'yhat': forecast_yhat}) return usage_series, current_stock, daily_forecasts except Exception as e: logger.error(f"Error loading forecast data: {str(e)}") return None, None, None def process_forecast(consumable_type, usage_series, current_stock, is_automated=False): usage_list, error = validate_usage_series(','.join(map(str, usage_series))) if error: logger.error(error) if not is_automated: st.error(error) return None try: model = train_model_with_usage(usage_list) except Exception as e: logger.error(f"Error training model: {str(e)}") if not is_automated: st.error(f"Error training model: {str(e)}") return None forecast_7 = make_forecast(model, 7) forecast_14 = make_forecast(model, 14) forecast_30 = make_forecast(model, 30) daily_forecasts = get_daily_forecasts(model, 30) reorder_date = calculate_reorder_date(model, current_stock) if not is_automated: st.header("Forecast Results") col1, col2, col3 = st.columns(3) col1.metric("7-Day Forecast", f"{forecast_7} units") col2.metric("14-Day Forecast", f"{forecast_14} units") col3.metric("30-Day Forecast", f"{forecast_30} units") st.header("Daily Forecast Values (Next 30 Days)") daily_values = ", ".join([str(int(x)) for x in daily_forecasts['yhat'].tolist()]) st.text_area("Comma-separated daily forecasts", daily_values, height=100) st.header("Threshold Alerts") alert_status = [] for forecast, period in zip([forecast_7, forecast_14, forecast_30], ['7-day', '14-day', '30-day']): if current_stock < forecast: st.warning(f"Alert: Current stock ({current_stock}) is below {period} forecast ({forecast}). 🚩") alert_status.append(True) else: st.info(f"No alert for {period} forecast.") alert_status.append(False) st.header("Order Suggestions") st.write(f"**For 7 Days**: Order {max(0, forecast_7 - current_stock)} additional units.") st.write(f"**For 14 Days**: Order {max(0, forecast_14 - current_stock)} additional units.") st.write(f"**For 30 Days**: Order {max(0, forecast_30 - current_stock)} additional units.") st.header("Reorder Information") if any(alert_status): st.warning(f"Reorder recommended. Suggested reorder date: {reorder_date if reorder_date else 'Not within 30 days'}") else: st.info("No reorder required within 30 days.") st.header("Daily Forecast Visualization (Next 30 Days)") fig_daily = go.Figure() fig_daily.add_trace(go.Scatter( x=daily_forecasts['ds'], y=daily_forecasts['yhat'], mode='lines+markers', name='Daily Forecast', line=dict(color='royalblue', width=2), marker=dict(size=8, color='darkorange', line=dict(width=2, color='black')), fill='tozeroy', fillcolor='rgba(0, 176, 246, 0.2)' )) y_values = daily_forecasts['yhat'].tolist() fig_daily.update_layout( title='Daily Consumable Usage Forecast (30 Days)', xaxis_title='Date', yaxis_title='Units', template='plotly_white', xaxis=dict(tickformat="%Y-%m-%d", tickangle=45, tickmode='auto', nticks=10), yaxis=dict(range=[max(0, min(y_values) - 5), max(y_values) + 5], tickmode='linear', dtick=2), showlegend=True, legend=dict(x=0.01, y=0.99), hovermode='x unified', plot_bgcolor='rgba(0,0,0,0)', paper_bgcolor='rgba(0,0,0,0)', margin=dict(l=50, r=50, t=50, b=100) ) st.plotly_chart(fig_daily, use_container_width=True) st.header("Threshold Alerts Visualization") alert_data = pd.DataFrame({ 'Category': ['Current Stock', '7-Day Forecast', '14-Day Forecast', '30-Day Forecast'], 'Units': [current_stock, forecast_7, forecast_14, forecast_30], 'Alert': [False] + alert_status }) fig_alerts = go.Figure() fig_alerts.add_trace(go.Bar( x=alert_data['Category'], y=alert_data['Units'], marker_color=['green'] + ['red' if alert else 'blue' for alert in alert_data['Alert'][1:]], text=[f"🚩" if alert else "" for alert in alert_data['Alert']], textposition='auto' )) fig_alerts.update_layout( title='Stock vs Forecast with Alerts (🚩 indicates low stock)', xaxis_title='Category', yaxis_title='Units', template='plotly_white' ) st.plotly_chart(fig_alerts) else: alert_status = [current_stock < forecast for forecast in [forecast_7, forecast_14, forecast_30]] fig_daily = go.Figure() fig_daily.add_trace(go.Scatter( x=daily_forecasts['ds'], y=daily_forecasts['yhat'], mode='lines+markers', name='Daily Forecast', line=dict(color='royalblue', width=2), marker=dict(size=8, color='darkorange', line=dict(width=2, color='black')), fill='tozeroy', fillcolor='rgba(0, 176, 246, 0.2)' )) y_values = daily_forecasts['yhat'].tolist() fig_daily.update_layout( title='Daily Consumable Usage Forecast (30 Days)', xaxis_title='Date', yaxis_title='Units', template='plotly_white', xaxis=dict(tickformat="%Y-%m-%d", tickangle=45, tickmode='auto', nticks=10), yaxis=dict(range=[max(0, min(y_values) - 5), max(y_values) + 5], tickmode='linear', dtick=2), showlegend=True, legend=dict(x=0.01, y=0.99), hovermode='x unified', plot_bgcolor='rgba(0,0,0,0)', paper_bgcolor='rgba(0,0,0,0)', margin=dict(l=50, r=50, t=50, b=100) ) alert_data = pd.DataFrame({ 'Category': ['Current Stock', '7-Day Forecast', '14-Day Forecast', '30-Day Forecast'], 'Units': [current_stock, forecast_7, forecast_14, forecast_30], 'Alert': [False] + alert_status }) fig_alerts = go.Figure() fig_alerts.add_trace(go.Bar( x=alert_data['Category'], y=alert_data['Units'], marker_color=['green'] + ['red' if alert else 'blue' for alert in alert_data['Alert'][1:]], text=[f"🚩" if alert else "" for alert in alert_data['Alert']], textposition='auto' )) fig_alerts.update_layout( title='Stock vs Forecast with Alerts (🚩 indicates low stock)', xaxis_title='Category', yaxis_title='Units', template='plotly_white' ) if sf is not None: try: order_suggestions_text = f"7 Days: {max(0, forecast_7 - current_stock)} units, 14 Days: {max(0, forecast_14 - current_stock)} units, 30 Days: {max(0, forecast_30 - current_stock)} units" forecast_data = { "Consumable Type": consumable_type, "Current Stock": current_stock, "7-Day Forecast": f"{forecast_7} units", "14-Day Forecast": f"{forecast_14} units", "30-Day Forecast": f"{forecast_30} units", "Order Suggestions": order_suggestions_text, "Reorder Recommendation": "Yes" if any(alert_status) else "No", "Reorder Date": reorder_date if reorder_date else "Not within 30 days" } pdf_file = generate_forecast_pdf(forecast_data, daily_forecasts, alert_status, current_stock, forecast_7, forecast_14, forecast_30, fig_daily, fig_alerts, ','.join(map(str, usage_series))) sf_data = { 'Consumable_Type__c': consumable_type, 'Forecast_Period__c': '7days', 'ForeCasted_Quantity__c': float(forecast_7), 'ForeCasted_Quantity_14days__c': float(forecast_14), 'ForeCasted_Quantity_30days__c': float(forecast_30), 'Current_Stock__c': float(current_stock), 'Order_Suggestions__c': order_suggestions_text, 'Reorder_Recommendation__c': any(alert_status), 'Reorder_Date__c': reorder_date, 'Pdf_report__c': '' } result = sf.Consumables_Forecaste__c.create(sf_data) logger.info(f"Salesforce record created: {result}") if pdf_file: pdf_url = upload_pdf_to_salesforce(pdf_file, consumable_type, result['id']) if pdf_url: sf.Consumables_Forecaste__c.update( result['id'], {"Pdf_report__c": pdf_url} ) logger.info(f"PDF uploaded to Salesforce: {pdf_url}") logger.info(f"PDF Report generated and uploaded to Salesforce: {pdf_url}") else: logger.error("Failed to upload PDF to Salesforce") if not is_automated: st.error("Failed to upload PDF to Salesforce") else: logger.error("Failed to generate PDF") if not is_automated: st.error("Failed to generate PDF") except Exception as e: logger.error(f"Error creating Salesforce record or uploading PDF: {e}", exc_info=True) if not is_automated: st.error(f"Error saving to Salesforce: {str(e)}") return None return daily_forecasts def automate_daily_forecast(): consumable_types = ['Filters', 'Reagents', 'Vials'] for consumable_type in consumable_types: logger.info(f"Processing automated forecast for {consumable_type}") usage_series, current_stock, prev_daily_forecasts = load_forecast_data(consumable_type) if usage_series is None or current_stock is None or prev_daily_forecasts is None: logger.warning(f"No previous data for {consumable_type}. Skipping automation.") continue # Shift usage series: Remove oldest day, append forecasted usage for today next_day_usage = prev_daily_forecasts['yhat'].iloc[0] # Forecasted usage for today usage_series = usage_series[1:] + [next_day_usage] # Update stock: Subtract yesterday's forecasted usage yesterday_usage = prev_daily_forecasts['yhat'].iloc[0] current_stock = max(0, current_stock - yesterday_usage) # Generate new forecast with updated data daily_forecasts = process_forecast(consumable_type, usage_series, current_stock, is_automated=True) if daily_forecasts is not None: save_forecast_data(consumable_type, usage_series, current_stock, daily_forecasts) logger.info(f"Completed automated forecast for {consumable_type}") else: logger.error(f"Failed to process forecast for {consumable_type}") def main(): parser = argparse.ArgumentParser(description="SmartLab Consumables Forecast") parser.add_argument('--automated', action='store_true', help="Run in automated mode") args = parser.parse_args() if args.automated: automate_daily_forecast() return st.title("SmartLab Consumables Forecast") st.header("Input Parameters") consumable_type_label = st.selectbox("Consumable Type", ['Filters', 'Reagents', 'Vials']) consumable_type = consumable_type_label usage_series = st.text_input("Last 60 Days Usage (comma-separated)", "") current_stock = st.number_input("Current Stock", min_value=0, value=0) if st.button("Generate Forecast"): usage_list, error = validate_usage_series(usage_series) if error: st.error(error) return daily_forecasts = process_forecast(consumable_type, usage_list, current_stock, is_automated=False) if daily_forecasts is not None: save_forecast_data(consumable_type, usage_list, current_stock, daily_forecasts) if __name__ == "__main__": main() sf = None