Spaces:
Sleeping
Sleeping
import streamlit as st | |
import pandas as pd | |
from prophet import Prophet | |
from datetime import datetime, timedelta | |
import numpy as np | |
import plotly.graph_objects as go | |
import os | |
from dotenv import load_dotenv | |
from simple_salesforce import Salesforce | |
import logging | |
from reportlab.lib.pagesizes import letter | |
from reportlab.pdfgen import canvas | |
from reportlab.lib.units import inch | |
from io import BytesIO | |
import base64 | |
from reportlab.platypus import Image | |
import plotly.io as pio | |
import sys | |
import argparse | |
# Load environment variables from .env file | |
load_dotenv() | |
# Setup logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Salesforce connection | |
try: | |
sf = Salesforce( | |
username=os.getenv("SF_USERNAME"), | |
password=os.getenv("SF_PASSWORD"), | |
security_token=os.getenv("SF_SECURITY_TOKEN"), | |
instance_url=os.getenv("SF_INSTANCE_URL") | |
) | |
logger.info("β Connected to Salesforce") | |
logger.info(f"Connected Salesforce user: {sf.username}") | |
except Exception as e: | |
logger.error(f"β Salesforce connection failed: {e}") | |
sf = None | |
# File to store forecast data | |
DATA_FILE = "/public/forecast_data.csv" | |
def prepare_prophet_data(usage_series): | |
end_date = datetime.now() | |
start_date = end_date - timedelta(days=len(usage_series) - 1) | |
dates = [start_date + timedelta(days=i) for i in range(len(usage_series))] | |
prophet_df = pd.DataFrame({'ds': dates, 'y': usage_series}) | |
prophet_df['cap'] = 60 | |
prophet_df['floor'] = 0 | |
return prophet_df | |
def train_model_with_usage(usage_series): | |
prophet_df = prepare_prophet_data(usage_series) | |
model = Prophet( | |
yearly_seasonality=False, | |
weekly_seasonality=True, | |
daily_seasonality=True, | |
changepoint_prior_scale=0.002, | |
growth='logistic' | |
) | |
model.fit(prophet_df) | |
return model | |
def make_forecast(model, periods): | |
future = model.make_future_dataframe(periods=periods) | |
future['cap'] = 60 | |
future['floor'] = 0 | |
forecast = model.predict(future) | |
daily_forecasts = forecast['yhat'].tail(periods).tolist() | |
return round(sum(max(0, y) for y in daily_forecasts)) | |
def get_daily_forecasts(model, periods=30): | |
future = model.make_future_dataframe(periods=periods) | |
future['cap'] = 60 | |
future['floor'] = 0 | |
forecast = model.predict(future) | |
daily_forecasts = forecast[['ds', 'yhat']].tail(periods) | |
daily_forecasts['yhat'] = daily_forecasts['yhat'].apply(lambda x: max(0, round(x))) | |
return daily_forecasts | |
def calculate_reorder_date(model, current_stock, lead_time_days=3, safety_threshold=0): | |
future = model.make_future_dataframe(periods=30) | |
future['cap'] = 60 | |
future['floor'] = 0 | |
forecast = model.predict(future) | |
daily_forecasts = forecast[['ds', 'yhat']].tail(30) | |
stock = current_stock | |
for _, row in daily_forecasts.iterrows(): | |
daily_usage = max(0, round(row['yhat'])) | |
stock -= daily_usage | |
if stock <= safety_threshold: | |
stockout_date = row['ds'] | |
reorder_date = stockout_date - timedelta(days=lead_time_days) | |
if reorder_date < datetime.now(): | |
reorder_date = datetime.now().date() | |
return reorder_date.strftime('%Y-%m-%d') | |
return None | |
def validate_usage_series(usage_str): | |
try: | |
usage_list = [float(x) for x in usage_str.split(',')] | |
logger.info(f"Input usage series length: {len(usage_list)}") | |
if len(usage_list) != 60: | |
return None, f"Usage series must contain exactly 60 values. Found {len(usage_list)} values." | |
if any(x < 0 for x in usage_list): | |
return None, "Usage values must be non-negative." | |
return usage_list, None | |
except: | |
return None, "Invalid usage series format. Please enter 60 comma-separated numbers." | |
def generate_forecast_pdf(forecast_data: dict, daily_forecasts: pd.DataFrame, alert_status: list, current_stock: int, forecast_7: int, forecast_14: int, forecast_30: int, fig_daily: go.Figure, fig_alerts: go.Figure, usage_series: str) -> BytesIO: | |
try: | |
logger.info("Starting PDF generation") | |
if not isinstance(forecast_data, dict) or not forecast_data: | |
logger.error("Invalid forecast_data: Must be a non-empty dictionary") | |
return None | |
if not isinstance(daily_forecasts, pd.DataFrame) or daily_forecasts.empty: | |
logger.error("Invalid daily_forecasts: Must be a non-empty DataFrame") | |
return None | |
if not isinstance(alert_status, list) or len(alert_status) != 3: | |
logger.error("Invalid alert_status: Must be a list of 3 booleans") | |
return None | |
if not isinstance(usage_series, str) or not usage_series: | |
logger.error("Invalid usage_series: Must be a non-empty string") | |
return None | |
if not isinstance(fig_daily, go.Figure) or not isinstance(fig_alerts, go.Figure): | |
logger.error("Invalid Plotly figures: fig_daily and fig_alerts must be valid go.Figure objects") | |
return None | |
pdf_file = BytesIO() | |
c = canvas.Canvas(pdf_file, pagesize=letter) | |
c.setFont("Helvetica", 12) | |
c.drawString(1 * inch, 10 * inch, "Consumables Forecast Report") | |
c.setFont("Helvetica", 10) | |
y_position = 9.5 * inch | |
logger.info("Initialized PDF canvas") | |
logger.info("Writing forecast data") | |
for key, value in forecast_data.items(): | |
display_key = key.replace('_', ' ').title() | |
value_str = str(value) | |
c.drawString(1 * inch, y_position, f"{display_key}: {value_str}") | |
y_position -= 0.3 * inch | |
y_position -= 0.3 * inch | |
c.drawString(1 * inch, y_position, "Last 60 Days Usage (comma-separated):") | |
y_position -= 0.3 * inch | |
text_object = c.beginText(1 * inch, y_position) | |
text_object.setFont("Helvetica", 10) | |
text_lines = [usage_series[i:i+50] for i in range(0, len(usage_series), 50)] | |
for line in text_lines: | |
text_object.textLine(line) | |
y_position -= 0.3 * inch | |
c.drawText(text_object) | |
logger.info("Added usage series") | |
y_position -= 0.3 * inch | |
c.drawString(1 * inch, y_position, "Daily Forecast Values (Next 30 Days):") | |
y_position -= 0.3 * inch | |
daily_values = ", ".join([str(int(x)) for x in daily_forecasts['yhat'].tolist()]) | |
text_object = c.beginText(1 * inch, y_position) | |
text_object.setFont("Helvetica", 10) | |
text_lines = [daily_values[i:i+50] for i in range(0, len(daily_values), 50)] | |
for line in text_lines: | |
text_object.textLine(line) | |
y_position -= 0.3 * inch | |
c.drawText(text_object) | |
logger.info("Added daily forecast values") | |
y_position -= 0.3 * inch | |
c.drawString(1 * inch, y_position, "Threshold Alerts:") | |
y_position -= 0.3 * inch | |
for forecast, period, alert in zip([forecast_7, forecast_14, forecast_30], ['7-day', '14-day', '30-day'], alert_status): | |
flag_indicator = "[Flag] " if alert else "" | |
if alert: | |
alert_text = f"{flag_indicator}Alert: Current stock ({current_stock}) is below {period} forecast ({forecast})." | |
else: | |
alert_text = f"No alert for {period} forecast." | |
c.drawString(1 * inch, y_position, alert_text) | |
y_position -= 0.3 * inch | |
logger.info("Added threshold alerts") | |
y_position -= 0.3 * inch | |
c.drawString(1 * inch, y_position, "Daily Forecast Visualization Data (Next 30 Days):") | |
y_position -= 0.3 * inch | |
for index, row in daily_forecasts.iterrows(): | |
date_str = row['ds'].strftime('%Y-%m-%d') | |
forecast_value = int(row['yhat']) | |
c.drawString(1 * inch, y_position, f"Date: {date_str}, Forecast: {forecast_value} units") | |
y_position -= 0.3 * inch | |
if y_position < 1 * inch: | |
c.showPage() | |
c.setFont("Helvetica", 10) | |
y_position = 10 * inch | |
logger.info("Added daily forecast visualization data") | |
y_position -= 0.3 * inch | |
if y_position < 4 * inch: | |
c.showPage() | |
y_position = 10 * inch | |
c.drawString(1 * inch, y_position, "Daily Forecast Visualization (Next 30 Days):") | |
y_position -= 0.3 * inch | |
daily_chart_img = BytesIO() | |
try: | |
pio.write_image(fig_daily, daily_chart_img, format='png', width=600, height=400) | |
daily_chart_img.seek(0) | |
img = Image(daily_chart_img, width=6 * inch, height=4 * inch) | |
img.drawOn(c, 1 * inch, y_position - 4 * inch) | |
logger.info("Added daily forecast visualization image") | |
except Exception as e: | |
logger.error(f"Failed to export daily forecast image: {str(e)}") | |
c.drawString(1 * inch, y_position - 0.3 * inch, "Error: Could not include daily forecast visualization.") | |
y_position -= 4.5 * inch | |
if y_position < 2 * inch: | |
c.showPage() | |
c.setFont("Helvetica", 10) | |
y_position = 10 * inch | |
c.drawString(1 * inch, y_position, "Threshold Alerts Visualization Data:") | |
y_position -= 0.3 * inch | |
alert_data = pd.DataFrame({ | |
'Category': ['Current Stock', '7-Day Forecast', '14-Day Forecast', '30-Day Forecast'], | |
'Units': [current_stock, forecast_7, forecast_14, forecast_30], | |
'Alert': [False] + alert_status | |
}) | |
for _, row in alert_data.iterrows(): | |
alert_text = f"Category: {row['Category']}, Units: {row['Units']}, Alert: {'Yes' if row['Alert'] else 'No'}" | |
c.drawString(1 * inch, y_position, alert_text) | |
y_position -= 0.3 * inch | |
if y_position < 1 * inch: | |
c.showPage() | |
c.setFont("Helvetica", 10) | |
y_position = 10 * inch | |
logger.info("Added threshold alerts visualization data") | |
y_position -= 0.3 * inch | |
if y_position < 4 * inch: | |
c.showPage() | |
y_position = 10 * inch | |
c.drawString(1 * inch, y_position, "Threshold Alerts Visualization:") | |
y_position -= 0.3 * inch | |
alerts_chart_img = BytesIO() | |
try: | |
pio.write_image(fig_alerts, alerts_chart_img, format='png', width=600, height=400) | |
alerts_chart_img.seek(0) | |
img = Image(alerts_chart_img, width=6 * inch, height=4 * inch) | |
img.drawOn(c, 1 * inch, y_position - 4 * inch) | |
logger.info("Added threshold alerts visualization image") | |
except Exception as e: | |
logger.error(f"Failed to export alerts visualization image: {str(e)}") | |
c.drawString(1 * inch, y_position - 0.3 * inch, "Error: Could not include threshold alerts visualization.") | |
c.showPage() | |
c.save() | |
pdf_file.seek(0) | |
logger.info("PDF generation completed successfully") | |
return pdf_file | |
except Exception as e: | |
logger.error(f"Error generating PDF: {str(e)}") | |
return None | |
def upload_pdf_to_salesforce(pdf_file: BytesIO, consumable_type: str, record_id: str) -> str: | |
try: | |
if not sf: | |
return None | |
encoded_pdf_data = base64.b64encode(pdf_file.getvalue()).decode('utf-8') | |
content_version_data = { | |
"Title": f"{consumable_type} - Consumables Forecast PDF", | |
"PathOnClient": f"{consumable_type}_Consumables_Forecast.pdf", | |
"VersionData": encoded_pdf_data, | |
"FirstPublishLocationId": record_id | |
} | |
content_version = sf.ContentVersion.create(content_version_data) | |
content_version_id = content_version["id"] | |
result = sf.query(f"SELECT Id, ContentDocumentId FROM ContentVersion WHERE Id = '{content_version_id}'") | |
if not result['records']: | |
return None | |
file_url = f"https://{sf.sf_instance}/sfc/servlet.shepherd/version/download/{content_version_id}" | |
return file_url | |
except Exception as e: | |
logger.error(f"Error uploading PDF to Salesforce: {str(e)}") | |
return None | |
def save_forecast_data(consumable_type, usage_series, current_stock, daily_forecasts): | |
try: | |
usage_str = ','.join(map(str, usage_series)) | |
forecast_data = { | |
'consumable_type': [consumable_type], | |
'usage_series': [usage_str], | |
'current_stock': [current_stock], | |
'forecast_date': [daily_forecasts['ds'].astype(str).tolist()], | |
'forecast_yhat': [daily_forecasts['yhat'].tolist()] | |
} | |
df = pd.DataFrame(forecast_data) | |
if os.path.exists(DATA_FILE): | |
existing_df = pd.read_csv(DATA_FILE) | |
existing_df = existing_df[existing_df['consumable_type'] != consumable_type] | |
df = pd.concat([existing_df, df], ignore_index=True) | |
df.to_csv(DATA_FILE, index=False) | |
logger.info(f"Saved forecast data for {consumable_type} to {DATA_FILE}") | |
except Exception as e: | |
logger.error(f"Error saving forecast data: {str(e)}") | |
def load_forecast_data(consumable_type): | |
try: | |
if not os.path.exists(DATA_FILE): | |
logger.warning(f"No forecast data file found at {DATA_FILE}") | |
return None, None, None | |
df = pd.read_csv(DATA_FILE) | |
row = df[df['consumable_type'] == consumable_type] | |
if row.empty: | |
logger.warning(f"No data found for {consumable_type} in {DATA_FILE}") | |
return None, None, None | |
usage_series = [float(x) for x in row['usage_series'].iloc[0].split(',')] | |
current_stock = float(row['current_stock'].iloc[0]) | |
forecast_dates = eval(row['forecast_date'].iloc[0]) | |
forecast_yhat = eval(row['forecast_yhat'].iloc[0]) | |
daily_forecasts = pd.DataFrame({'ds': pd.to_datetime(forecast_dates), 'yhat': forecast_yhat}) | |
return usage_series, current_stock, daily_forecasts | |
except Exception as e: | |
logger.error(f"Error loading forecast data: {str(e)}") | |
return None, None, None | |
def process_forecast(consumable_type, usage_series, current_stock, is_automated=False): | |
usage_list, error = validate_usage_series(','.join(map(str, usage_series))) | |
if error: | |
logger.error(error) | |
if not is_automated: | |
st.error(error) | |
return None | |
try: | |
model = train_model_with_usage(usage_list) | |
except Exception as e: | |
logger.error(f"Error training model: {str(e)}") | |
if not is_automated: | |
st.error(f"Error training model: {str(e)}") | |
return None | |
forecast_7 = make_forecast(model, 7) | |
forecast_14 = make_forecast(model, 14) | |
forecast_30 = make_forecast(model, 30) | |
daily_forecasts = get_daily_forecasts(model, 30) | |
reorder_date = calculate_reorder_date(model, current_stock) | |
if not is_automated: | |
st.header("Forecast Results") | |
col1, col2, col3 = st.columns(3) | |
col1.metric("7-Day Forecast", f"{forecast_7} units") | |
col2.metric("14-Day Forecast", f"{forecast_14} units") | |
col3.metric("30-Day Forecast", f"{forecast_30} units") | |
st.header("Daily Forecast Values (Next 30 Days)") | |
daily_values = ", ".join([str(int(x)) for x in daily_forecasts['yhat'].tolist()]) | |
st.text_area("Comma-separated daily forecasts", daily_values, height=100) | |
st.header("Threshold Alerts") | |
alert_status = [] | |
for forecast, period in zip([forecast_7, forecast_14, forecast_30], ['7-day', '14-day', '30-day']): | |
if current_stock < forecast: | |
st.warning(f"Alert: Current stock ({current_stock}) is below {period} forecast ({forecast}). π©") | |
alert_status.append(True) | |
else: | |
st.info(f"No alert for {period} forecast.") | |
alert_status.append(False) | |
st.header("Order Suggestions") | |
st.write(f"**For 7 Days**: Order {max(0, forecast_7 - current_stock)} additional units.") | |
st.write(f"**For 14 Days**: Order {max(0, forecast_14 - current_stock)} additional units.") | |
st.write(f"**For 30 Days**: Order {max(0, forecast_30 - current_stock)} additional units.") | |
st.header("Reorder Information") | |
if any(alert_status): | |
st.warning(f"Reorder recommended. Suggested reorder date: {reorder_date if reorder_date else 'Not within 30 days'}") | |
else: | |
st.info("No reorder required within 30 days.") | |
st.header("Daily Forecast Visualization (Next 30 Days)") | |
fig_daily = go.Figure() | |
fig_daily.add_trace(go.Scatter( | |
x=daily_forecasts['ds'], | |
y=daily_forecasts['yhat'], | |
mode='lines+markers', | |
name='Daily Forecast', | |
line=dict(color='royalblue', width=2), | |
marker=dict(size=8, color='darkorange', line=dict(width=2, color='black')), | |
fill='tozeroy', | |
fillcolor='rgba(0, 176, 246, 0.2)' | |
)) | |
y_values = daily_forecasts['yhat'].tolist() | |
fig_daily.update_layout( | |
title='Daily Consumable Usage Forecast (30 Days)', | |
xaxis_title='Date', | |
yaxis_title='Units', | |
template='plotly_white', | |
xaxis=dict(tickformat="%Y-%m-%d", tickangle=45, tickmode='auto', nticks=10), | |
yaxis=dict(range=[max(0, min(y_values) - 5), max(y_values) + 5], tickmode='linear', dtick=2), | |
showlegend=True, | |
legend=dict(x=0.01, y=0.99), | |
hovermode='x unified', | |
plot_bgcolor='rgba(0,0,0,0)', | |
paper_bgcolor='rgba(0,0,0,0)', | |
margin=dict(l=50, r=50, t=50, b=100) | |
) | |
st.plotly_chart(fig_daily, use_container_width=True) | |
st.header("Threshold Alerts Visualization") | |
alert_data = pd.DataFrame({ | |
'Category': ['Current Stock', '7-Day Forecast', '14-Day Forecast', '30-Day Forecast'], | |
'Units': [current_stock, forecast_7, forecast_14, forecast_30], | |
'Alert': [False] + alert_status | |
}) | |
fig_alerts = go.Figure() | |
fig_alerts.add_trace(go.Bar( | |
x=alert_data['Category'], | |
y=alert_data['Units'], | |
marker_color=['green'] + ['red' if alert else 'blue' for alert in alert_data['Alert'][1:]], | |
text=[f"π©" if alert else "" for alert in alert_data['Alert']], | |
textposition='auto' | |
)) | |
fig_alerts.update_layout( | |
title='Stock vs Forecast with Alerts (π© indicates low stock)', | |
xaxis_title='Category', | |
yaxis_title='Units', | |
template='plotly_white' | |
) | |
st.plotly_chart(fig_alerts) | |
else: | |
alert_status = [current_stock < forecast for forecast in [forecast_7, forecast_14, forecast_30]] | |
fig_daily = go.Figure() | |
fig_daily.add_trace(go.Scatter( | |
x=daily_forecasts['ds'], | |
y=daily_forecasts['yhat'], | |
mode='lines+markers', | |
name='Daily Forecast', | |
line=dict(color='royalblue', width=2), | |
marker=dict(size=8, color='darkorange', line=dict(width=2, color='black')), | |
fill='tozeroy', | |
fillcolor='rgba(0, 176, 246, 0.2)' | |
)) | |
y_values = daily_forecasts['yhat'].tolist() | |
fig_daily.update_layout( | |
title='Daily Consumable Usage Forecast (30 Days)', | |
xaxis_title='Date', | |
yaxis_title='Units', | |
template='plotly_white', | |
xaxis=dict(tickformat="%Y-%m-%d", tickangle=45, tickmode='auto', nticks=10), | |
yaxis=dict(range=[max(0, min(y_values) - 5), max(y_values) + 5], tickmode='linear', dtick=2), | |
showlegend=True, | |
legend=dict(x=0.01, y=0.99), | |
hovermode='x unified', | |
plot_bgcolor='rgba(0,0,0,0)', | |
paper_bgcolor='rgba(0,0,0,0)', | |
margin=dict(l=50, r=50, t=50, b=100) | |
) | |
alert_data = pd.DataFrame({ | |
'Category': ['Current Stock', '7-Day Forecast', '14-Day Forecast', '30-Day Forecast'], | |
'Units': [current_stock, forecast_7, forecast_14, forecast_30], | |
'Alert': [False] + alert_status | |
}) | |
fig_alerts = go.Figure() | |
fig_alerts.add_trace(go.Bar( | |
x=alert_data['Category'], | |
y=alert_data['Units'], | |
marker_color=['green'] + ['red' if alert else 'blue' for alert in alert_data['Alert'][1:]], | |
text=[f"π©" if alert else "" for alert in alert_data['Alert']], | |
textposition='auto' | |
)) | |
fig_alerts.update_layout( | |
title='Stock vs Forecast with Alerts (π© indicates low stock)', | |
xaxis_title='Category', | |
yaxis_title='Units', | |
template='plotly_white' | |
) | |
if sf is not None: | |
try: | |
order_suggestions_text = f"7 Days: {max(0, forecast_7 - current_stock)} units, 14 Days: {max(0, forecast_14 - current_stock)} units, 30 Days: {max(0, forecast_30 - current_stock)} units" | |
forecast_data = { | |
"Consumable Type": consumable_type, | |
"Current Stock": current_stock, | |
"7-Day Forecast": f"{forecast_7} units", | |
"14-Day Forecast": f"{forecast_14} units", | |
"30-Day Forecast": f"{forecast_30} units", | |
"Order Suggestions": order_suggestions_text, | |
"Reorder Recommendation": "Yes" if any(alert_status) else "No", | |
"Reorder Date": reorder_date if reorder_date else "Not within 30 days" | |
} | |
pdf_file = generate_forecast_pdf(forecast_data, daily_forecasts, alert_status, current_stock, forecast_7, forecast_14, forecast_30, fig_daily, fig_alerts, ','.join(map(str, usage_series))) | |
sf_data = { | |
'Consumable_Type__c': consumable_type, | |
'Forecast_Period__c': '7days', | |
'ForeCasted_Quantity__c': float(forecast_7), | |
'ForeCasted_Quantity_14days__c': float(forecast_14), | |
'ForeCasted_Quantity_30days__c': float(forecast_30), | |
'Current_Stock__c': float(current_stock), | |
'Order_Suggestions__c': order_suggestions_text, | |
'Reorder_Recommendation__c': any(alert_status), | |
'Reorder_Date__c': reorder_date, | |
'Pdf_report__c': '' | |
} | |
result = sf.Consumables_Forecaste__c.create(sf_data) | |
logger.info(f"Salesforce record created: {result}") | |
if pdf_file: | |
pdf_url = upload_pdf_to_salesforce(pdf_file, consumable_type, result['id']) | |
if pdf_url: | |
sf.Consumables_Forecaste__c.update( | |
result['id'], | |
{"Pdf_report__c": pdf_url} | |
) | |
logger.info(f"PDF uploaded to Salesforce: {pdf_url}") | |
logger.info(f"PDF Report generated and uploaded to Salesforce: {pdf_url}") | |
else: | |
logger.error("Failed to upload PDF to Salesforce") | |
if not is_automated: | |
st.error("Failed to upload PDF to Salesforce") | |
else: | |
logger.error("Failed to generate PDF") | |
if not is_automated: | |
st.error("Failed to generate PDF") | |
except Exception as e: | |
logger.error(f"Error creating Salesforce record or uploading PDF: {e}", exc_info=True) | |
if not is_automated: | |
st.error(f"Error saving to Salesforce: {str(e)}") | |
return None | |
return daily_forecasts | |
def automate_daily_forecast(): | |
consumable_types = ['Filters', 'Reagents', 'Vials'] | |
for consumable_type in consumable_types: | |
logger.info(f"Processing automated forecast for {consumable_type}") | |
usage_series, current_stock, prev_daily_forecasts = load_forecast_data(consumable_type) | |
if usage_series is None or current_stock is None or prev_daily_forecasts is None: | |
logger.warning(f"No previous data for {consumable_type}. Skipping automation.") | |
continue | |
# Shift usage series: Remove oldest day, append forecasted usage for today | |
next_day_usage = prev_daily_forecasts['yhat'].iloc[0] # Forecasted usage for today | |
usage_series = usage_series[1:] + [next_day_usage] | |
# Update stock: Subtract yesterday's forecasted usage | |
yesterday_usage = prev_daily_forecasts['yhat'].iloc[0] | |
current_stock = max(0, current_stock - yesterday_usage) | |
# Generate new forecast with updated data | |
daily_forecasts = process_forecast(consumable_type, usage_series, current_stock, is_automated=True) | |
if daily_forecasts is not None: | |
save_forecast_data(consumable_type, usage_series, current_stock, daily_forecasts) | |
logger.info(f"Completed automated forecast for {consumable_type}") | |
else: | |
logger.error(f"Failed to process forecast for {consumable_type}") | |
def main(): | |
parser = argparse.ArgumentParser(description="SmartLab Consumables Forecast") | |
parser.add_argument('--automated', action='store_true', help="Run in automated mode") | |
args = parser.parse_args() | |
if args.automated: | |
automate_daily_forecast() | |
return | |
st.title("SmartLab Consumables Forecast") | |
st.header("Input Parameters") | |
consumable_type_label = st.selectbox("Consumable Type", ['Filters', 'Reagents', 'Vials']) | |
consumable_type = consumable_type_label | |
usage_series = st.text_input("Last 60 Days Usage (comma-separated)", "") | |
current_stock = st.number_input("Current Stock", min_value=0, value=0) | |
if st.button("Generate Forecast"): | |
usage_list, error = validate_usage_series(usage_series) | |
if error: | |
st.error(error) | |
return | |
daily_forecasts = process_forecast(consumable_type, usage_list, current_stock, is_automated=False) | |
if daily_forecasts is not None: | |
save_forecast_data(consumable_type, usage_list, current_stock, daily_forecasts) | |
if __name__ == "__main__": | |
main() | |
sf = None |