|
import streamlit as st |
|
import pandas as pd |
|
import plotly.express as px |
|
from datetime import datetime, timedelta |
|
from simple_salesforce import Salesforce |
|
from transformers import pipeline |
|
from utils import fetch_salesforce_data, detect_anomalies, generate_pdf_report |
|
import os |
|
import logging |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
try: |
|
st.set_page_config(page_title="LabOps Dashboard", layout="wide") |
|
logger.info("Streamlit page configuration set successfully.") |
|
except Exception as e: |
|
logger.error(f"Failed to set Streamlit page configuration: {e}") |
|
raise |
|
|
|
|
|
@st.cache_resource |
|
def init_salesforce(): |
|
logger.info("Initializing Salesforce connection...") |
|
try: |
|
sf = Salesforce( |
|
username=os.getenv("SF_USERNAME", st.secrets.get("sf_username")), |
|
password=os.getenv("SF_PASSWORD", st.secrets.get("sf_password")), |
|
security_token=os.getenv("SF_SECURITY_TOKEN", st.secrets.get("sf_security_token")) |
|
) |
|
logger.info("Salesforce connection initialized successfully.") |
|
return sf |
|
except Exception as e: |
|
logger.error(f"Failed to initialize Salesforce: {e}") |
|
st.error(f"Cannot connect to Salesforce: {e}") |
|
return None |
|
|
|
|
|
@st.cache_resource |
|
def init_anomaly_detector(): |
|
logger.info("Initializing anomaly detector...") |
|
try: |
|
|
|
detector = pipeline( |
|
"text-classification", |
|
model="prajjwal1/bert-tiny", |
|
tokenizer="prajjwal1/bert-tiny", |
|
clean_up_tokenization_spaces=True |
|
) |
|
logger.info("Anomaly detector initialized successfully.") |
|
return detector |
|
except Exception as e: |
|
logger.error(f"Failed to initialize anomaly detector: {e}") |
|
st.error(f"Cannot initialize anomaly detector: {e}") |
|
return None |
|
|
|
|
|
sf = init_salesforce() |
|
anomaly_detector = init_anomaly_detector() |
|
|
|
|
|
@st.cache_data(ttl=10) |
|
def get_filtered_data(lab_site, equipment_type, date_start, date_end): |
|
logger.info(f"Fetching data for lab: {lab_site}, equipment: {equipment_type}, date range: {date_start} to {date_end}") |
|
try: |
|
query = f""" |
|
SELECT Equipment__c, Log_Timestamp__c, Status__c, Usage_Count__c, Lab__c, Equipment_Type__c |
|
FROM SmartLog__c |
|
WHERE Log_Timestamp__c >= {date_start.strftime('%Y-%m-%d')} |
|
AND Log_Timestamp__c <= {date_end.strftime('%Y-%m-%d')} |
|
""" |
|
if lab_site != "All": |
|
query += f" AND Lab__c = '{lab_site}'" |
|
if equipment_type != "All": |
|
query += f" AND Equipment_Type__c = '{equipment_type}'" |
|
query += " LIMIT 100" |
|
data = fetch_salesforce_data(sf, query) |
|
logger.info(f"Fetched {len(data)} records from Salesforce.") |
|
return data |
|
except Exception as e: |
|
logger.error(f"Failed to fetch data: {e}") |
|
return [] |
|
|
|
def main(): |
|
logger.info("Starting main application...") |
|
if sf is None or anomaly_detector is None: |
|
st.error("Application cannot start due to initialization failures. Check logs for details.") |
|
logger.error("Application initialization failed: Salesforce or anomaly detector not available.") |
|
return |
|
|
|
st.title("Multi-Device LabOps Dashboard") |
|
|
|
|
|
col1, col2 = st.columns(2) |
|
with col1: |
|
lab_site = st.selectbox("Select Lab Site", ["All", "Lab1", "Lab2", "Lab3"]) |
|
with col2: |
|
equipment_type = st.selectbox("Equipment Type", ["All", "Cell Analyzer", "Weight Log", "UV Verification"]) |
|
|
|
date_range = st.date_input("Date Range", [datetime.now() - timedelta(days=7), datetime.now()]) |
|
|
|
if len(date_range) != 2: |
|
st.warning("Please select a valid date range.") |
|
logger.warning("Invalid date range selected.") |
|
return |
|
date_start, date_end = date_range |
|
|
|
|
|
with st.spinner("Fetching data..."): |
|
data = get_filtered_data(lab_site, equipment_type, date_start, date_end) |
|
if not data: |
|
st.warning("No data available for the selected filters.") |
|
logger.warning("No data returned for the selected filters.") |
|
return |
|
|
|
df = pd.DataFrame(data) |
|
df["Log_Timestamp__c"] = pd.to_datetime(df["Log_Timestamp__c"]) |
|
df["Anomaly"] = df.apply( |
|
lambda row: detect_anomalies(f"{row['Status__c']} Usage:{row['Usage_Count__c']}", anomaly_detector), |
|
axis=1 |
|
) |
|
|
|
|
|
page_size = 10 |
|
total_pages = max(1, len(df) // page_size + (1 if len(df) % page_size else 0)) |
|
page = st.number_input("Page", min_value=1, max_value=total_pages, value=1, step=1) |
|
start_idx = (page - 1) * page_size |
|
end_idx = start_idx + page_size |
|
paginated_df = df[start_idx:end_idx] |
|
|
|
|
|
st.subheader("Device Status") |
|
for _, row in paginated_df.iterrows(): |
|
anomaly = "⚠️ Anomaly" if row["Anomaly"] == "POSITIVE" else "✅ Normal" |
|
st.markdown(f""" |
|
**{row['Equipment__c']}** | Lab: {row['Lab__c']} | Health: {row['Status__c']} | |
|
Usage: {row['Usage_Count__c']} | Last Log: {row['Log_Timestamp__c'].strftime('%Y-%m-%d %H:%M:%S')} | {anomaly} |
|
""") |
|
|
|
|
|
st.subheader("Usage Trends") |
|
fig = px.line( |
|
df, |
|
x="Log_Timestamp__c", |
|
y="Usage_Count__c", |
|
color="Equipment__c", |
|
title="Daily Usage Trends", |
|
labels={"Log_Timestamp__c": "Timestamp", "Usage_Count__c": "Usage Count"} |
|
) |
|
fig.update_layout(xaxis_title="Timestamp", yaxis_title="Usage Count") |
|
st.plotly_chart(fig, use_container_width=True) |
|
|
|
|
|
st.subheader("Downtime Patterns") |
|
downtime_df = df[df["Status__c"] == "Down"] |
|
if not downtime_df.empty: |
|
fig_downtime = px.histogram( |
|
downtime_df, |
|
x="Log_Timestamp__c", |
|
color="Equipment__c", |
|
title="Downtime Patterns", |
|
labels={"Log_Timestamp__c": "Timestamp"} |
|
) |
|
fig_downtime.update_layout(xaxis_title="Timestamp", yaxis_title="Downtime Count") |
|
st.plotly_chart(fig_downtime, use_container_width=True) |
|
else: |
|
st.info("No downtime events found for the selected filters.") |
|
|
|
|
|
st.subheader("AMC Reminders") |
|
amc_query = "SELECT Equipment__c, AMC_Expiry_Date__c FROM Equipment__c WHERE AMC_Expiry_Date__c <= NEXT_N_DAYS:14" |
|
amc_data = fetch_salesforce_data(sf, amc_query, retries=3) |
|
if amc_data: |
|
for record in amc_data: |
|
st.write(f"Equipment {record['Equipment__c']} - AMC Expiry: {record['AMC_Expiry_Date__c']}") |
|
else: |
|
st.info("No AMC expiries within the next 14 days.") |
|
|
|
|
|
if st.button("Export PDF Report"): |
|
with st.spinner("Generating PDF..."): |
|
try: |
|
pdf_file = generate_pdf_report(df, lab_site, equipment_type, [date_start, date_end]) |
|
with open(pdf_file, "rb") as f: |
|
st.download_button("Download PDF", f, file_name="LabOps_Report.pdf", mime="application/pdf") |
|
logger.info("PDF report generated successfully.") |
|
except Exception as e: |
|
st.error(f"Failed to generate PDF: {e}") |
|
logger.error(f"Failed to generate PDF: {e}") |
|
|
|
if __name__ == "__main__": |
|
try: |
|
logger.info("Application starting...") |
|
main() |
|
except Exception as e: |
|
logger.error(f"Application failed to start: {e}") |
|
raise |