import streamlit as st import pandas as pd import plotly.express as px from datetime import datetime, timedelta from simple_salesforce import Salesforce from transformers import pipeline from utils import fetch_salesforce_data, detect_anomalies, generate_pdf_report import os import logging # Configure logging logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) # Streamlit app configuration try: st.set_page_config(page_title="LabOps Dashboard", layout="wide") logger.info("Streamlit page configuration set successfully.") except Exception as e: logger.error(f"Failed to set Streamlit page configuration: {e}") raise # Cache Salesforce connection @st.cache_resource def init_salesforce(): logger.info("Initializing Salesforce connection...") try: sf = Salesforce( username=os.getenv("SF_USERNAME", st.secrets.get("sf_username")), password=os.getenv("SF_PASSWORD", st.secrets.get("sf_password")), security_token=os.getenv("SF_SECURITY_TOKEN", st.secrets.get("sf_security_token")) ) logger.info("Salesforce connection initialized successfully.") return sf except Exception as e: logger.error(f"Failed to initialize Salesforce: {e}") st.error(f"Cannot connect to Salesforce: {e}") return None # Cache Hugging Face model @st.cache_resource def init_anomaly_detector(): logger.info("Initializing anomaly detector...") try: # Use lighter model for Hugging Face Spaces detector = pipeline( "text-classification", model="prajjwal1/bert-tiny", tokenizer="prajjwal1/bert-tiny", clean_up_tokenization_spaces=True ) logger.info("Anomaly detector initialized successfully.") return detector except Exception as e: logger.error(f"Failed to initialize anomaly detector: {e}") st.error(f"Cannot initialize anomaly detector: {e}") return None # Initialize connections sf = init_salesforce() anomaly_detector = init_anomaly_detector() # Cache data fetching @st.cache_data(ttl=10) def get_filtered_data(lab_site, equipment_type, date_start, date_end): logger.info(f"Fetching data for lab: {lab_site}, equipment: {equipment_type}, date range: {date_start} to {date_end}") try: query = f""" SELECT Equipment__c, Log_Timestamp__c, Status__c, Usage_Count__c, Lab__c, Equipment_Type__c FROM SmartLog__c WHERE Log_Timestamp__c >= {date_start.strftime('%Y-%m-%d')} AND Log_Timestamp__c <= {date_end.strftime('%Y-%m-%d')} """ if lab_site != "All": query += f" AND Lab__c = '{lab_site}'" if equipment_type != "All": query += f" AND Equipment_Type__c = '{equipment_type}'" query += " LIMIT 100" data = fetch_salesforce_data(sf, query) logger.info(f"Fetched {len(data)} records from Salesforce.") return data except Exception as e: logger.error(f"Failed to fetch data: {e}") return [] def main(): logger.info("Starting main application...") if sf is None or anomaly_detector is None: st.error("Application cannot start due to initialization failures. Check logs for details.") logger.error("Application initialization failed: Salesforce or anomaly detector not available.") return st.title("Multi-Device LabOps Dashboard") # Filters col1, col2 = st.columns(2) with col1: lab_site = st.selectbox("Select Lab Site", ["All", "Lab1", "Lab2", "Lab3"]) with col2: equipment_type = st.selectbox("Equipment Type", ["All", "Cell Analyzer", "Weight Log", "UV Verification"]) date_range = st.date_input("Date Range", [datetime.now() - timedelta(days=7), datetime.now()]) if len(date_range) != 2: st.warning("Please select a valid date range.") logger.warning("Invalid date range selected.") return date_start, date_end = date_range # Fetch and process data with st.spinner("Fetching data..."): data = get_filtered_data(lab_site, equipment_type, date_start, date_end) if not data: st.warning("No data available for the selected filters.") logger.warning("No data returned for the selected filters.") return df = pd.DataFrame(data) df["Log_Timestamp__c"] = pd.to_datetime(df["Log_Timestamp__c"]) df["Anomaly"] = df.apply( lambda row: detect_anomalies(f"{row['Status__c']} Usage:{row['Usage_Count__c']}", anomaly_detector), axis=1 ) # Pagination page_size = 10 total_pages = max(1, len(df) // page_size + (1 if len(df) % page_size else 0)) page = st.number_input("Page", min_value=1, max_value=total_pages, value=1, step=1) start_idx = (page - 1) * page_size end_idx = start_idx + page_size paginated_df = df[start_idx:end_idx] # Device Cards st.subheader("Device Status") for _, row in paginated_df.iterrows(): anomaly = "⚠️ Anomaly" if row["Anomaly"] == "POSITIVE" else "✅ Normal" st.markdown(f""" **{row['Equipment__c']}** | Lab: {row['Lab__c']} | Health: {row['Status__c']} | Usage: {row['Usage_Count__c']} | Last Log: {row['Log_Timestamp__c'].strftime('%Y-%m-%d %H:%M:%S')} | {anomaly} """) # Usage Chart st.subheader("Usage Trends") fig = px.line( df, x="Log_Timestamp__c", y="Usage_Count__c", color="Equipment__c", title="Daily Usage Trends", labels={"Log_Timestamp__c": "Timestamp", "Usage_Count__c": "Usage Count"} ) fig.update_layout(xaxis_title="Timestamp", yaxis_title="Usage Count") st.plotly_chart(fig, use_container_width=True) # Downtime Chart st.subheader("Downtime Patterns") downtime_df = df[df["Status__c"] == "Down"] if not downtime_df.empty: fig_downtime = px.histogram( downtime_df, x="Log_Timestamp__c", color="Equipment__c", title="Downtime Patterns", labels={"Log_Timestamp__c": "Timestamp"} ) fig_downtime.update_layout(xaxis_title="Timestamp", yaxis_title="Downtime Count") st.plotly_chart(fig_downtime, use_container_width=True) else: st.info("No downtime events found for the selected filters.") # AMC Reminders st.subheader("AMC Reminders") amc_query = "SELECT Equipment__c, AMC_Expiry_Date__c FROM Equipment__c WHERE AMC_Expiry_Date__c <= NEXT_N_DAYS:14" amc_data = fetch_salesforce_data(sf, amc_query, retries=3) if amc_data: for record in amc_data: st.write(f"Equipment {record['Equipment__c']} - AMC Expiry: {record['AMC_Expiry_Date__c']}") else: st.info("No AMC expiries within the next 14 days.") # Export PDF if st.button("Export PDF Report"): with st.spinner("Generating PDF..."): try: pdf_file = generate_pdf_report(df, lab_site, equipment_type, [date_start, date_end]) with open(pdf_file, "rb") as f: st.download_button("Download PDF", f, file_name="LabOps_Report.pdf", mime="application/pdf") logger.info("PDF report generated successfully.") except Exception as e: st.error(f"Failed to generate PDF: {e}") logger.error(f"Failed to generate PDF: {e}") if __name__ == "__main__": try: logger.info("Application starting...") main() except Exception as e: logger.error(f"Application failed to start: {e}") raise