|
import streamlit as st |
|
import pandas as pd |
|
from prophet import Prophet |
|
from datetime import datetime, timedelta |
|
import numpy as np |
|
import plotly.graph_objects as go |
|
import os |
|
from dotenv import load_dotenv |
|
from simple_salesforce import Salesforce |
|
import logging |
|
from reportlab.lib.pagesizes import letter |
|
from reportlab.pdfgen import canvas |
|
from reportlab.lib.units import inch |
|
from io import BytesIO |
|
import base64 |
|
from reportlab.platypus import Image |
|
import plotly.io as pio |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
try: |
|
sf = Salesforce( |
|
username=os.getenv("SF_USERNAME"), |
|
password=os.getenv("SF_PASSWORD"), |
|
security_token=os.getenv("SF_SECURITY_TOKEN"), |
|
instance_url=os.getenv("SF_INSTANCE_URL") |
|
) |
|
logger.info("β
Connected to Salesforce") |
|
logger.info(f"Connected Salesforce user: {sf.username}") |
|
except Exception as e: |
|
logger.error(f"β Salesforce connection failed: {e}") |
|
sf = None |
|
|
|
def prepare_prophet_data(usage_series): |
|
end_date = datetime.now() |
|
start_date = end_date - timedelta(days=len(usage_series) - 1) |
|
dates = [start_date + timedelta(days=i) for i in range(len(usage_series))] |
|
prophet_df = pd.DataFrame({'ds': dates, 'y': usage_series}) |
|
prophet_df['cap'] = 60 |
|
prophet_df['floor'] = 0 |
|
return prophet_df |
|
|
|
def train_model_with_usage(usage_series): |
|
prophet_df = prepare_prophet_data(usage_series) |
|
model = Prophet( |
|
yearly_seasonality=False, |
|
weekly_seasonality=True, |
|
daily_seasonality=True, |
|
changepoint_prior_scale=0.002, |
|
growth='logistic' |
|
) |
|
model.fit(prophet_df) |
|
return model |
|
|
|
def make_forecast(model, periods): |
|
future = model.make_future_dataframe(periods=periods) |
|
future['cap'] = 60 |
|
future['floor'] = 0 |
|
forecast = model.predict(future) |
|
daily_forecasts = forecast['yhat'].tail(periods).tolist() |
|
return round(sum(max(0, y) for y in daily_forecasts)) |
|
|
|
def get_daily_forecasts(model, periods=30): |
|
future = model.make_future_dataframe(periods=periods) |
|
future['cap'] = 60 |
|
future['floor'] = 0 |
|
forecast = model.predict(future) |
|
daily_forecasts = forecast[['ds', 'yhat']].tail(periods) |
|
daily_forecasts['yhat'] = daily_forecasts['yhat'].apply(lambda x: max(0, round(x))) |
|
return daily_forecasts |
|
|
|
def calculate_reorder_date(model, current_stock, lead_time_days=3, safety_threshold=0): |
|
future = model.make_future_dataframe(periods=30) |
|
future['cap'] = 60 |
|
future['floor'] = 0 |
|
forecast = model.predict(future) |
|
daily_forecasts = forecast[['ds', 'yhat']].tail(30) |
|
|
|
stock = current_stock |
|
for _, row in daily_forecasts.iterrows(): |
|
daily_usage = max(0, round(row['yhat'])) |
|
stock -= daily_usage |
|
if stock <= safety_threshold: |
|
stockout_date = row['ds'] |
|
reorder_date = stockout_date - timedelta(days=lead_time_days) |
|
if reorder_date < datetime.now(): |
|
reorder_date = datetime.now().date() |
|
return reorder_date.strftime('%Y-%m-%d') |
|
return None |
|
|
|
def validate_usage_series(usage_str): |
|
try: |
|
usage_list = [float(x) for x in usage_str.split(',')] |
|
logger.info(f"Input usage series length: {len(usage_list)}") |
|
if len(usage_list) != 60: |
|
return None, f"Usage series must contain exactly 60 values. Found {len(usage_list)} values." |
|
if any(x < 0 for x in usage_list): |
|
return None, "Usage values must be non-negative." |
|
return usage_list, None |
|
except: |
|
return None, "Invalid usage series format. Please enter 60 comma-separated numbers." |
|
|
|
def generate_forecast_pdf(forecast_data: dict, daily_forecasts: pd.DataFrame, alert_status: list, current_stock: int, forecast_7: int, forecast_14: int, forecast_30: int, fig_daily: go.Figure, fig_alerts: go.Figure, usage_series: str) -> BytesIO: |
|
try: |
|
logger.info("Starting PDF generation") |
|
|
|
if not isinstance(forecast_data, dict) or not forecast_data: |
|
logger.error("Invalid forecast_data: Must be a non-empty dictionary") |
|
return None |
|
if not isinstance(daily_forecasts, pd.DataFrame) or daily_forecasts.empty: |
|
logger.error("Invalid daily_forecasts: Must be a non-empty DataFrame") |
|
return None |
|
if not isinstance(alert_status, list) or len(alert_status) != 3: |
|
logger.error("Invalid alert_status: Must be a list of 3 booleans") |
|
return None |
|
if not isinstance(usage_series, str) or not usage_series: |
|
logger.error("Invalid usage_series: Must be a non-empty string") |
|
return None |
|
if not isinstance(fig_daily, go.Figure) or not isinstance(fig_alerts, go.Figure): |
|
logger.error("Invalid Plotly figures: fig_daily and fig_alerts must be valid go.Figure objects") |
|
return None |
|
|
|
pdf_file = BytesIO() |
|
c = canvas.Canvas(pdf_file, pagesize=letter) |
|
c.setFont("Helvetica", 12) |
|
c.drawString(1 * inch, 10 * inch, "Consumables Forecast Report") |
|
c.setFont("Helvetica", 10) |
|
y_position = 9.5 * inch |
|
logger.info("Initialized PDF canvas") |
|
|
|
|
|
logger.info("Writing forecast data") |
|
for key, value in forecast_data.items(): |
|
display_key = key.replace('_', ' ').title() |
|
value_str = str(value) |
|
c.drawString(1 * inch, y_position, f"{display_key}: {value_str}") |
|
y_position -= 0.3 * inch |
|
|
|
|
|
y_position -= 0.3 * inch |
|
c.drawString(1 * inch, y_position, "Last 60 Days Usage (comma-separated):") |
|
y_position -= 0.3 * inch |
|
text_object = c.beginText(1 * inch, y_position) |
|
text_object.setFont("Helvetica", 10) |
|
text_lines = [usage_series[i:i+50] for i in range(0, len(usage_series), 50)] |
|
for line in text_lines: |
|
text_object.textLine(line) |
|
y_position -= 0.3 * inch |
|
c.drawText(text_object) |
|
logger.info("Added usage series") |
|
|
|
|
|
y_position -= 0.3 * inch |
|
c.drawString(1 * inch, y_position, "Daily Forecast Values (Next 30 Days):") |
|
y_position -= 0.3 * inch |
|
daily_values = ", ".join([str(int(x)) for x in daily_forecasts['yhat'].tolist()]) |
|
text_object = c.beginText(1 * inch, y_position) |
|
text_object.setFont("Helvetica", 10) |
|
text_lines = [daily_values[i:i+50] for i in range(0, len(daily_values), 50)] |
|
for line in text_lines: |
|
text_object.textLine(line) |
|
y_position -= 0.3 * inch |
|
c.drawText(text_object) |
|
logger.info("Added daily forecast values") |
|
|
|
|
|
y_position -= 0.3 * inch |
|
c.drawString(1 * inch, y_position, "Threshold Alerts:") |
|
y_position -= 0.3 * inch |
|
for forecast, period, alert in zip([forecast_7, forecast_14, forecast_30], ['7-day', '14-day', '30-day'], alert_status): |
|
flag_indicator = "[Flag] " if alert else "" |
|
if alert: |
|
alert_text = f"{flag_indicator}Alert: Current stock ({current_stock}) is below {period} forecast ({forecast})." |
|
else: |
|
alert_text = f"No alert for {period} forecast." |
|
c.drawString(1 * inch, y_position, alert_text) |
|
y_position -= 0.3 * inch |
|
logger.info("Added threshold alerts") |
|
|
|
|
|
y_position -= 0.3 * inch |
|
c.drawString(1 * inch, y_position, "Daily Forecast Visualization Data (Next 30 Days):") |
|
y_position -= 0.3 * inch |
|
for index, row in daily_forecasts.iterrows(): |
|
date_str = row['ds'].strftime('%Y-%m-%d') |
|
forecast_value = int(row['yhat']) |
|
c.drawString(1 * inch, y_position, f"Date: {date_str}, Forecast: {forecast_value} units") |
|
y_position -= 0.3 * inch |
|
if y_position < 1 * inch: |
|
c.showPage() |
|
c.setFont("Helvetica", 10) |
|
y_position = 10 * inch |
|
logger.info("Added daily forecast visualization data") |
|
|
|
|
|
y_position -= 0.3 * inch |
|
if y_position < 4 * inch: |
|
c.showPage() |
|
y_position = 10 * inch |
|
c.drawString(1 * inch, y_position, "Daily Forecast Visualization (Next 30 Days):") |
|
y_position -= 0.3 * inch |
|
daily_chart_img = BytesIO() |
|
try: |
|
pio.write_image(fig_daily, daily_chart_img, format='png', width=600, height=400) |
|
daily_chart_img.seek(0) |
|
img = Image(daily_chart_img, width=6 * inch, height=4 * inch) |
|
img.drawOn(c, 1 * inch, y_position - 4 * inch) |
|
logger.info("Added daily forecast visualization image") |
|
except Exception as e: |
|
logger.error(f"Failed to export daily forecast image: {str(e)}") |
|
c.drawString(1 * inch, y_position - 0.3 * inch, "Error: Could not include daily forecast visualization.") |
|
y_position -= 4.5 * inch |
|
|
|
|
|
if y_position < 2 * inch: |
|
c.showPage() |
|
c.setFont("Helvetica", 10) |
|
y_position = 10 * inch |
|
c.drawString(1 * inch, y_position, "Threshold Alerts Visualization Data:") |
|
y_position -= 0.3 * inch |
|
alert_data = pd.DataFrame({ |
|
'Category': ['Current Stock', '7-Day Forecast', '14-Day Forecast', '30-Day Forecast'], |
|
'Units': [current_stock, forecast_7, forecast_14, forecast_30], |
|
'Alert': [False] + alert_status |
|
}) |
|
for _, row in alert_data.iterrows(): |
|
alert_text = f"Category: {row['Category']}, Units: {row['Units']}, Alert: {'Yes' if row['Alert'] else 'No'}" |
|
c.drawString(1 * inch, y_position, alert_text) |
|
y_position -= 0.3 * inch |
|
if y_position < 1 * inch: |
|
c.showPage() |
|
c.setFont("Helvetica", 10) |
|
y_position = 10 * inch |
|
logger.info("Added threshold alerts visualization data") |
|
|
|
|
|
y_position -= 0.3 * inch |
|
if y_position < 4 * inch: |
|
c.showPage() |
|
y_position = 10 * inch |
|
c.drawString(1 * inch, y_position, "Threshold Alerts Visualization:") |
|
y_position -= 0.3 * inch |
|
alerts_chart_img = BytesIO() |
|
try: |
|
pio.write_image(fig_alerts, alerts_chart_img, format='png', width=600, height=400) |
|
alerts_chart_img.seek(0) |
|
img = Image(alerts_chart_img, width=6 * inch, height=4 * inch) |
|
img.drawOn(c, 1 * inch, y_position - 4 * inch) |
|
logger.info("Added threshold alerts visualization image") |
|
except Exception as e: |
|
logger.error(f"Failed to export alerts visualization image: {str(e)}") |
|
c.drawString(1 * inch, y_position - 0.3 * inch, "Error: Could not include threshold alerts visualization.") |
|
|
|
c.showPage() |
|
c.save() |
|
pdf_file.seek(0) |
|
logger.info("PDF generation completed successfully") |
|
return pdf_file |
|
except Exception as e: |
|
logger.error(f"Error generating PDF: {str(e)}") |
|
return None |
|
|
|
def upload_pdf_to_salesforce(pdf_file: BytesIO, consumable_type: str, record_id: str) -> str: |
|
try: |
|
if not sf: |
|
return None |
|
|
|
encoded_pdf_data = base64.b64encode(pdf_file.getvalue()).decode('utf-8') |
|
content_version_data = { |
|
"Title": f"{consumable_type} - Consumables Forecast PDF", |
|
"PathOnClient": f"{consumable_type}_Consumables_Forecast.pdf", |
|
"VersionData": encoded_pdf_data, |
|
"FirstPublishLocationId": record_id |
|
} |
|
|
|
content_version = sf.ContentVersion.create(content_version_data) |
|
content_version_id = content_version["id"] |
|
|
|
result = sf.query(f"SELECT Id, ContentDocumentId FROM ContentVersion WHERE Id = '{content_version_id}'") |
|
if not result['records']: |
|
return None |
|
|
|
file_url = f"https://{sf.sf_instance}/sfc/servlet.shepherd/version/download/{content_version_id}" |
|
return file_url |
|
except Exception as e: |
|
logger.error(f"Error uploading PDF to Salesforce: {str(e)}") |
|
return None |
|
|
|
def main(): |
|
st.title("SmartLab Consumables Forecast") |
|
st.header("Input Parameters") |
|
|
|
consumable_type_label = st.selectbox("Consumable Type", ['Filters', 'Reagents', 'Vials']) |
|
consumable_type = consumable_type_label |
|
usage_series = st.text_input("Last 60 Days Usage (comma-separated)", "") |
|
current_stock = st.number_input("Current Stock", min_value=0, value=0) |
|
|
|
if st.button("Generate Forecast"): |
|
usage_list, error = validate_usage_series(usage_series) |
|
if error: |
|
st.error(error) |
|
return |
|
|
|
try: |
|
model = train_model_with_usage(usage_list) |
|
except Exception as e: |
|
st.error(f"Error training model: {str(e)}") |
|
return |
|
|
|
forecast_7 = make_forecast(model, 7) |
|
forecast_14 = make_forecast(model, 14) |
|
forecast_30 = make_forecast(model, 30) |
|
daily_forecasts = get_daily_forecasts(model, 30) |
|
reorder_date = calculate_reorder_date(model, current_stock) |
|
|
|
st.header("Forecast Results") |
|
col1, col2, col3 = st.columns(3) |
|
col1.metric("7-Day Forecast", f"{forecast_7} units") |
|
col2.metric("14-Day Forecast", f"{forecast_14} units") |
|
col3.metric("30-Day Forecast", f"{forecast_30} units") |
|
|
|
st.header("Daily Forecast Values (Next 30 Days)") |
|
daily_values = ", ".join([str(int(x)) for x in daily_forecasts['yhat'].tolist()]) |
|
st.text_area("Comma-separated daily forecasts", daily_values, height=100) |
|
|
|
st.header("Threshold Alerts") |
|
alert_status = [] |
|
for forecast, period in zip([forecast_7, forecast_14, forecast_30], ['7-day', '14-day', '30-day']): |
|
if current_stock < forecast: |
|
st.warning(f"Alert: Current stock ({current_stock}) is below {period} forecast ({forecast}). π©") |
|
alert_status.append(True) |
|
else: |
|
st.info(f"No alert for {period} forecast.") |
|
alert_status.append(False) |
|
|
|
st.header("Order Suggestions") |
|
st.write(f"**For 7 Days**: Order {max(0, forecast_7 - current_stock)} additional units.") |
|
st.write(f"**For 14 Days**: Order {max(0, forecast_14 - current_stock)} additional units.") |
|
st.write(f"**For 30 Days**: Order {max(0, forecast_30 - current_stock)} additional units.") |
|
|
|
st.header("Reorder Information") |
|
if any(alert_status): |
|
st.warning(f"Reorder recommended. Suggested reorder date: {reorder_date if reorder_date else 'Not within 30 days'}") |
|
else: |
|
st.info("No reorder required within 30 days.") |
|
|
|
st.header("Daily Forecast Visualization (Next 30 Days)") |
|
fig_daily = go.Figure() |
|
fig_daily.add_trace(go.Scatter( |
|
x=daily_forecasts['ds'], |
|
y=daily_forecasts['yhat'], |
|
mode='lines+markers', |
|
name='Daily Forecast', |
|
line=dict(color='royalblue', width=2), |
|
marker=dict(size=8, color='darkorange', line=dict(width=2, color='black')), |
|
fill='tozeroy', |
|
fillcolor='rgba(0, 176, 246, 0.2)' |
|
)) |
|
y_values = daily_forecasts['yhat'].tolist() |
|
fig_daily.update_layout( |
|
title='Daily Consumable Usage Forecast (30 Days)', |
|
xaxis_title='Date', |
|
yaxis_title='Units', |
|
template='plotly_white', |
|
xaxis=dict(tickformat="%Y-%m-%d", tickangle=45, tickmode='auto', nticks=10), |
|
yaxis=dict(range=[max(0, min(y_values) - 5), max(y_values) + 5], tickmode='linear', dtick=2), |
|
showlegend=True, |
|
legend=dict(x=0.01, y=0.99), |
|
hovermode='x unified', |
|
plot_bgcolor='rgba(0,0,0,0)', |
|
paper_bgcolor='rgba(0,0,0,0)', |
|
margin=dict(l=50, r=50, t=50, b=100) |
|
) |
|
st.plotly_chart(fig_daily, use_container_width=True) |
|
|
|
st.header("Threshold Alerts Visualization") |
|
alert_data = pd.DataFrame({ |
|
'Category': ['Current Stock', '7-Day Forecast', '14-Day Forecast', '30-Day Forecast'], |
|
'Units': [current_stock, forecast_7, forecast_14, forecast_30], |
|
'Alert': [False] + alert_status |
|
}) |
|
fig_alerts = go.Figure() |
|
fig_alerts.add_trace(go.Bar( |
|
x=alert_data['Category'], |
|
y=alert_data['Units'], |
|
marker_color=['green'] + ['red' if alert else 'blue' for alert in alert_data['Alert'][1:]], |
|
text=[f"π©" if alert else "" for alert in alert_data['Alert']], |
|
textposition='auto' |
|
)) |
|
fig_alerts.update_layout( |
|
title='Stock vs Forecast with Alerts (π© indicates low stock)', |
|
xaxis_title='Category', |
|
yaxis_title='Units', |
|
template='plotly_white' |
|
) |
|
st.plotly_chart(fig_alerts) |
|
|
|
|
|
if sf is not None: |
|
try: |
|
order_suggestions_text = f"7 Days: {max(0, forecast_7 - current_stock)} units, 14 Days: {max(0, forecast_14 - current_stock)} units, 30 Days: {max(0, forecast_30 - current_stock)} units" |
|
forecast_data = { |
|
"Consumable Type": consumable_type, |
|
"Current Stock": current_stock, |
|
"7-Day Forecast": f"{forecast_7} units", |
|
"14-Day Forecast": f"{forecast_14} units", |
|
"30-Day Forecast": f"{forecast_30} units", |
|
"Order Suggestions": order_suggestions_text, |
|
"Reorder Recommendation": "Yes" if any(alert_status) else "No", |
|
"Reorder Date": reorder_date if reorder_date else "Not within 30 days" |
|
} |
|
pdf_file = generate_forecast_pdf(forecast_data, daily_forecasts, alert_status, current_stock, forecast_7, forecast_14, forecast_30, fig_daily, fig_alerts, usage_series) |
|
sf_data = { |
|
'Consumable_Type__c': consumable_type, |
|
'Forecast_Period__c': '7days', |
|
'ForeCasted_Quantity__c': float(forecast_7), |
|
'ForeCasted_Quantity_14days__c': float(forecast_14), |
|
'ForeCasted_Quantity_30days__c': float(forecast_30), |
|
'Current_Stock__c': float(current_stock), |
|
'Order_Suggestions__c': order_suggestions_text, |
|
'Reorder_Recommendation__c': any(alert_status), |
|
'Reorder_Date__c': reorder_date, |
|
'Pdf_report__c': '' |
|
} |
|
result = sf.Consumables_Forecaste__c.create(sf_data) |
|
logger.info(f"Salesforce record created: {result}") |
|
|
|
if pdf_file: |
|
pdf_url = upload_pdf_to_salesforce(pdf_file, consumable_type, result['id']) |
|
if pdf_url: |
|
sf.Consumables_Forecaste__c.update( |
|
result['id'], |
|
{"Pdf_report__c": pdf_url} |
|
) |
|
logger.info(f"PDF uploaded to Salesforce: {pdf_url}") |
|
|
|
else: |
|
logger.error("Failed to upload PDF to Salesforce") |
|
st.error("Failed to upload PDF to Salesforce") |
|
else: |
|
logger.error("Failed to generate PDF") |
|
st.error("Failed to generate PDF") |
|
except Exception as e: |
|
logger.error(f"Error creating Salesforce record or uploading PDF: {e}", exc_info=True) |
|
st.error(f"Error saving to Salesforce: {str(e)}") |
|
|
|
if __name__ == "__main__": |
|
main() |
|
sf = None |