import streamlit as st
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime, timedelta
import google.generativeai as genai
import io
import base64
from reportlab.lib import colors
from reportlab.lib.pagesizes import letter, A4
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer, Image, PageBreak
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch
import plotly.io as pio
import tempfile
import os
import requests
import warnings
warnings.filterwarnings("ignore", message=".*secrets.*")
DESIGN_SYSTEM = {
'colors': {
'primary': '#1E40AF',
'secondary': '#059669',
'accent': '#DC2626',
'warning': '#D97706',
'success': '#10B981',
'background': '#F8FAFC',
'text': '#1F2937',
'border': '#E5E7EB'
},
'fonts': {
'title': 'font-family: "Inter", sans-serif; font-weight: 700;',
'subtitle': 'font-family: "Inter", sans-serif; font-weight: 600;',
'body': 'font-family: "Inter", sans-serif; font-weight: 400;'
}
}
st.set_page_config(
page_title="Production Monitor with AI Insights | Nilsen Service & Consulting",
page_icon="🏭",
layout="wide",
initial_sidebar_state="expanded"
)
def load_css():
st.markdown(f"""
""", unsafe_allow_html=True)
@st.cache_resource
def init_ai():
"""Initialize AI model with proper error handling for secrets"""
try:
# Try to get API key from Streamlit secrets
api_key = st.secrets.get("GOOGLE_API_KEY", "")
except (FileNotFoundError, KeyError, AttributeError):
# If secrets file doesn't exist or key not found, try environment variable
api_key = os.environ.get("GOOGLE_API_KEY", "")
if api_key:
try:
genai.configure(api_key=api_key)
return genai.GenerativeModel('gemini-1.5-flash')
except Exception as e:
st.error(f"AI configuration failed: {str(e)}")
return None
return None
@st.cache_data
def load_preset_data(year):
urls = {
"2024": "https://huggingface.co/spaces/entropy25/production-data-analysis/resolve/main/2024.csv",
"2025": "https://huggingface.co/spaces/entropy25/production-data-analysis/resolve/main/2025.csv"
}
try:
if year in urls:
response = requests.get(urls[year], timeout=10)
response.raise_for_status()
df = pd.read_csv(io.StringIO(response.text), sep='\t')
df['date'] = pd.to_datetime(df['date'], format='%m/%d/%Y')
df['day_name'] = df['date'].dt.day_name()
return df
else:
return generate_sample_data(year)
except Exception as e:
st.warning(f"Could not load remote {year} data: {str(e)}. Loading sample data instead.")
return generate_sample_data(year)
def generate_sample_data(year):
np.random.seed(42 if year == "2024" else 84)
start_date = f"01/01/{year}"
end_date = f"12/31/{year}"
dates = pd.date_range(start=start_date, end=end_date, freq='D')
weekdays = dates[dates.weekday < 5]
data = []
materials = ['steel', 'aluminum', 'plastic', 'copper']
shifts = ['day', 'night']
for date in weekdays:
for material in materials:
for shift in shifts:
base_weight = {
'steel': 1500,
'aluminum': 800,
'plastic': 600,
'copper': 400
}[material]
weight = base_weight + np.random.normal(0, base_weight * 0.2)
weight = max(weight, base_weight * 0.3)
data.append({
'date': date.strftime('%m/%d/%Y'),
'weight_kg': round(weight, 1),
'material_type': material,
'shift': shift
})
df = pd.DataFrame(data)
df['date'] = pd.to_datetime(df['date'], format='%m/%d/%Y')
df['day_name'] = df['date'].dt.day_name()
return df
@st.cache_data
def load_data(file):
df = pd.read_csv(file, sep='\t')
df['date'] = pd.to_datetime(df['date'], format='%m/%d/%Y')
df['day_name'] = df['date'].dt.day_name()
return df
def get_material_stats(df):
stats = {}
total = df['weight_kg'].sum()
total_work_days = df['date'].nunique()
for material in df['material_type'].unique():
data = df[df['material_type'] == material]
work_days = data['date'].nunique()
daily_avg = data.groupby('date')['weight_kg'].sum().mean()
stats[material] = {
'total': data['weight_kg'].sum(),
'percentage': (data['weight_kg'].sum() / total) * 100,
'daily_avg': daily_avg,
'work_days': work_days,
'records': len(data)
}
stats['_total_'] = {
'total': total,
'percentage': 100.0,
'daily_avg': df.groupby('date')['weight_kg'].sum().mean(),
'work_days': total_work_days,
'records': len(df)
}
return stats
def get_chart_theme():
return {
'layout': {
'plot_bgcolor': 'white',
'paper_bgcolor': 'white',
'font': {'family': 'Inter, sans-serif', 'color': DESIGN_SYSTEM['colors']['text']},
'colorway': [DESIGN_SYSTEM['colors']['primary'], DESIGN_SYSTEM['colors']['secondary'],
DESIGN_SYSTEM['colors']['accent'], DESIGN_SYSTEM['colors']['warning']],
'margin': {'t': 60, 'b': 40, 'l': 40, 'r': 40}
}
}
def create_total_production_chart(df, time_period='daily'):
if time_period == 'daily':
grouped = df.groupby('date')['weight_kg'].sum().reset_index()
fig = px.line(grouped, x='date', y='weight_kg',
title='Total Production Trend',
labels={'weight_kg': 'Weight (kg)', 'date': 'Date'})
elif time_period == 'weekly':
df_copy = df.copy()
df_copy['week'] = df_copy['date'].dt.isocalendar().week
df_copy['year'] = df_copy['date'].dt.year
grouped = df_copy.groupby(['year', 'week'])['weight_kg'].sum().reset_index()
grouped['week_label'] = grouped['year'].astype(str) + '-W' + grouped['week'].astype(str)
fig = px.bar(grouped, x='week_label', y='weight_kg',
title='Total Production Trend (Weekly)',
labels={'weight_kg': 'Weight (kg)', 'week_label': 'Week'})
else:
df_copy = df.copy()
df_copy['month'] = df_copy['date'].dt.to_period('M')
grouped = df_copy.groupby('month')['weight_kg'].sum().reset_index()
grouped['month'] = grouped['month'].astype(str)
fig = px.bar(grouped, x='month', y='weight_kg',
title='Total Production Trend (Monthly)',
labels={'weight_kg': 'Weight (kg)', 'month': 'Month'})
fig.update_layout(**get_chart_theme()['layout'], height=400, showlegend=False)
return fig
def create_materials_trend_chart(df, time_period='daily', selected_materials=None):
df_copy = df.copy()
if selected_materials:
df_copy = df_copy[df_copy['material_type'].isin(selected_materials)]
if time_period == 'daily':
grouped = df_copy.groupby(['date', 'material_type'])['weight_kg'].sum().reset_index()
fig = px.line(grouped, x='date', y='weight_kg', color='material_type',
title='Materials Production Trends',
labels={'weight_kg': 'Weight (kg)', 'date': 'Date', 'material_type': 'Material'})
elif time_period == 'weekly':
df_copy['week'] = df_copy['date'].dt.isocalendar().week
df_copy['year'] = df_copy['date'].dt.year
grouped = df_copy.groupby(['year', 'week', 'material_type'])['weight_kg'].sum().reset_index()
grouped['week_label'] = grouped['year'].astype(str) + '-W' + grouped['week'].astype(str)
fig = px.bar(grouped, x='week_label', y='weight_kg', color='material_type',
title='Materials Production Trends (Weekly)',
labels={'weight_kg': 'Weight (kg)', 'week_label': 'Week', 'material_type': 'Material'})
else:
df_copy['month'] = df_copy['date'].dt.to_period('M')
grouped = df_copy.groupby(['month', 'material_type'])['weight_kg'].sum().reset_index()
grouped['month'] = grouped['month'].astype(str)
fig = px.bar(grouped, x='month', y='weight_kg', color='material_type',
title='Materials Production Trends (Monthly)',
labels={'weight_kg': 'Weight (kg)', 'month': 'Month', 'material_type': 'Material'})
fig.update_layout(**get_chart_theme()['layout'], height=400)
return fig
def create_shift_trend_chart(df, time_period='daily'):
if time_period == 'daily':
grouped = df.groupby(['date', 'shift'])['weight_kg'].sum().reset_index()
pivot_data = grouped.pivot(index='date', columns='shift', values='weight_kg').fillna(0)
fig = go.Figure()
if 'day' in pivot_data.columns:
fig.add_trace(go.Bar(
x=pivot_data.index, y=pivot_data['day'], name='Day Shift',
marker_color=DESIGN_SYSTEM['colors']['warning'],
text=pivot_data['day'].round(0), textposition='inside'
))
if 'night' in pivot_data.columns:
fig.add_trace(go.Bar(
x=pivot_data.index, y=pivot_data['night'], name='Night Shift',
marker_color=DESIGN_SYSTEM['colors']['primary'],
base=pivot_data['day'] if 'day' in pivot_data.columns else 0,
text=pivot_data['night'].round(0), textposition='inside'
))
fig.update_layout(
**get_chart_theme()['layout'],
title='Daily Shift Production Trends (Stacked)',
xaxis_title='Date', yaxis_title='Weight (kg)',
barmode='stack', height=400, showlegend=True
)
else:
grouped = df.groupby(['date', 'shift'])['weight_kg'].sum().reset_index()
fig = px.bar(grouped, x='date', y='weight_kg', color='shift',
title=f'{time_period.title()} Shift Production Trends',
barmode='stack')
fig.update_layout(**get_chart_theme()['layout'], height=400)
return fig
def detect_outliers(df):
outliers = {}
for material in df['material_type'].unique():
material_data = df[df['material_type'] == material]
data = material_data['weight_kg']
Q1, Q3 = data.quantile(0.25), data.quantile(0.75)
IQR = Q3 - Q1
lower, upper = Q1 - 1.5 * IQR, Q3 + 1.5 * IQR
outlier_mask = (data < lower) | (data > upper)
outlier_dates = material_data[outlier_mask]['date'].dt.strftime('%Y-%m-%d').tolist()
outliers[material] = {
'count': len(outlier_dates),
'range': f"{lower:.0f} - {upper:.0f} kg",
'dates': outlier_dates
}
return outliers
def generate_ai_summary(model, df, stats, outliers):
if not model:
return "AI analysis unavailable - Google API key not configured. Please set the GOOGLE_API_KEY environment variable or in Streamlit secrets to enable AI insights."
try:
materials = [k for k in stats.keys() if k != '_total_']
context_parts = [
"# Production Data Analysis Context",
f"## Overview",
f"- Total Production: {stats['_total_']['total']:,.0f} kg",
f"- Production Period: {stats['_total_']['work_days']} working days",
f"- Daily Average: {stats['_total_']['daily_avg']:,.0f} kg",
f"- Materials Tracked: {len(materials)}",
"",
"## Material Breakdown:"
]
for material in materials:
info = stats[material]
context_parts.append(f"- {material.title()}: {info['total']:,.0f} kg ({info['percentage']:.1f}%), avg {info['daily_avg']:,.0f} kg/day")
daily_data = df.groupby('date')['weight_kg'].sum()
trend_direction = "increasing" if daily_data.iloc[-1] > daily_data.iloc[0] else "decreasing"
volatility = daily_data.std() / daily_data.mean() * 100
context_parts.extend([
"",
"## Trend Analysis:",
f"- Overall trend: {trend_direction}",
f"- Production volatility: {volatility:.1f}% coefficient of variation",
f"- Peak production: {daily_data.max():,.0f} kg",
f"- Lowest production: {daily_data.min():,.0f} kg"
])
total_outliers = sum(info['count'] for info in outliers.values())
context_parts.extend([
"",
"## Quality Control:",
f"- Total outliers detected: {total_outliers}",
f"- Materials with quality issues: {sum(1 for info in outliers.values() if info['count'] > 0)}"
])
if 'shift' in df.columns:
shift_stats = df.groupby('shift')['weight_kg'].sum()
context_parts.extend([
"",
"## Shift Performance:",
f"- Day shift: {shift_stats.get('day', 0):,.0f} kg",
f"- Night shift: {shift_stats.get('night', 0):,.0f} kg"
])
context_text = "\n".join(context_parts)
prompt = f"""
{context_text}
As an expert AI analyst embedded within the "Production Monitor with AI Insights" platform, provide a comprehensive analysis based on the data provided. Your tone should be professional and data-driven. Your primary goal is to highlight how the platform's features reveal critical insights.
Structure your response in the following format:
**PRODUCTION ASSESSMENT**
Evaluate the overall production status (Excellent/Good/Needs Attention). Briefly justify your assessment using key metrics from the data summary.
**KEY FINDINGS**
Identify 3-4 of the most important insights. For each finding, explicitly mention the platform feature that made the discovery possible. Use formats like "(revealed by the 'Quality Check' module)" or "(visualized in the 'Production Trend' chart)".
Example Finding format:
• Finding X: [Your insight, e.g., "Liquid-Ctu production shows high volatility..."] (as identified by the 'Materials Analysis' view).
**RECOMMENDATIONS**
Provide 2-3 actionable recommendations. Frame these as steps the management can take, encouraging them to use the platform for further investigation.
Example Recommendation format:
• Recommendation Y: [Your recommendation, e.g., "Investigate the root causes of the 11 outliers..."] We recommend using the platform's interactive charts to drill down into the specific dates identified by the 'Quality Check' module.
Keep the entire analysis concise and under 300 words.
"""
response = model.generate_content(prompt)
return response.text
except Exception as e:
return f"AI analysis error: {str(e)}"
def query_ai(model, stats, question, df=None):
if not model:
return "AI assistant not available - Please configure Google API key"
context_parts = [
"Production Data Summary:",
*[f"- {mat.title()}: {info['total']:,.0f}kg ({info['percentage']:.1f}%)"
for mat, info in stats.items() if mat != '_total_'],
f"\nTotal Production: {stats['_total_']['total']:,.0f}kg across {stats['_total_']['work_days']} work days"
]
if df is not None:
available_cols = list(df.columns)
context_parts.append(f"\nAvailable data fields: {', '.join(available_cols)}")
if 'shift' in df.columns:
shift_stats = df.groupby('shift')['weight_kg'].sum()
context_parts.append(f"Shift breakdown: {dict(shift_stats)}")
if 'day_name' in df.columns:
day_stats = df.groupby('day_name')['weight_kg'].mean()
context_parts.append(f"Average daily production: {dict(day_stats.round(0))}")
context = "\n".join(context_parts) + f"\n\nQuestion: {question}\nAnswer based on available data:"
try:
response = model.generate_content(context)
return response.text
except Exception as e:
return f"Error getting AI response: {str(e)}"
def save_plotly_as_image(fig, filename):
try:
temp_dir = tempfile.gettempdir()
filepath = os.path.join(temp_dir, filename)
theme = get_chart_theme()['layout'].copy()
theme.update({
'font': dict(size=12, family="Arial"),
'plot_bgcolor': 'white',
'paper_bgcolor': 'white',
'margin': dict(t=50, b=40, l=40, r=40)
})
fig.update_layout(**theme)
try:
pio.write_image(fig, filepath, format='png', width=800, height=400, scale=2, engine='kaleido')
if os.path.exists(filepath):
return filepath
except:
pass
return None
except Exception as e:
return None
def create_pdf_charts(df, stats):
charts = {}
try:
materials = [k for k in stats.keys() if k != '_total_']
values = [stats[mat]['total'] for mat in materials]
labels = [mat.replace('_', ' ').title() for mat in materials]
if len(materials) > 0 and len(values) > 0:
try:
fig_pie = px.pie(values=values, names=labels, title="Production Distribution by Material")
charts['pie'] = save_plotly_as_image(fig_pie, "distribution.png")
except:
pass
if len(df) > 0:
try:
daily_data = df.groupby('date')['weight_kg'].sum().reset_index()
if len(daily_data) > 0:
fig_trend = px.line(daily_data, x='date', y='weight_kg', title="Daily Production Trend",
labels={'date': 'Date', 'weight_kg': 'Weight (kg)'},
color_discrete_sequence=[DESIGN_SYSTEM['colors']['primary']])
charts['trend'] = save_plotly_as_image(fig_trend, "trend.png")
except:
pass
if len(materials) > 0 and len(values) > 0:
try:
fig_bar = px.bar(x=labels, y=values, title="Production by Material Type",
labels={'x': 'Material Type', 'y': 'Weight (kg)'},
color_discrete_sequence=[DESIGN_SYSTEM['colors']['primary']])
charts['bar'] = save_plotly_as_image(fig_bar, "materials.png")
except:
pass
if 'shift' in df.columns and len(df) > 0:
try:
shift_data = df.groupby('shift')['weight_kg'].sum().reset_index()
if len(shift_data) > 0 and shift_data['weight_kg'].sum() > 0:
fig_shift = px.pie(shift_data, values='weight_kg', names='shift', title="Production by Shift")
charts['shift'] = save_plotly_as_image(fig_shift, "shifts.png")
except:
pass
except Exception as e:
pass
return charts
def create_enhanced_pdf_report(df, stats, outliers, model=None):
buffer = io.BytesIO()
doc = SimpleDocTemplate(buffer, pagesize=A4, rightMargin=50, leftMargin=50, topMargin=50, bottomMargin=50)
elements = []
styles = getSampleStyleSheet()
# Custom styles
title_style = ParagraphStyle(
'CustomTitle',
parent=styles['Heading1'],
fontSize=24,
spaceAfter=30,
alignment=1,
textColor=colors.darkblue
)
subtitle_style = ParagraphStyle(
'CustomSubtitle',
parent=styles['Heading2'],
fontSize=16,
spaceAfter=20,
textColor=colors.darkblue
)
analysis_style = ParagraphStyle(
'AnalysisStyle',
parent=styles['Normal'],
fontSize=11,
spaceAfter=12,
leftIndent=20,
textColor=colors.darkgreen
)
# Title page
elements.append(Spacer(1, 100))
elements.append(Paragraph("Production Monitor Dashboard", title_style))
elements.append(Paragraph("Comprehensive Production Analysis Report", styles['Heading3']))
elements.append(Spacer(1, 50))
report_info = f"""
Production Analytics Division
Report Period: {df['date'].min().strftime('%B %d, %Y')} - {df['date'].max().strftime('%B %d, %Y')}
Generated: {datetime.now().strftime('%B %d, %Y at %H:%M')}
Total Records: {len(df):,}
Key Highlights:
• Total production: {total_production:,.0f} kg
• Daily average: {daily_avg:,.0f} kg
• Materials tracked: {len([k for k in stats.keys() if k != '_total_'])}
• Data quality: {len(df):,} records processed
')
elements.append(Paragraph(formatted_text, styles['Normal']))
elements.append(Spacer(1, 8))
else:
elements.append(PageBreak())
elements.append(Paragraph("Advanced Analysis", subtitle_style))
elements.append(Paragraph("Advanced analysis features unavailable - Google API key not configured. Please set the GOOGLE_API_KEY environment variable or configure it in Streamlit secrets to enable intelligent insights.", styles['Normal']))
# Footer
elements.append(Spacer(1, 30))
footer_text = f"""
Nilsen Service & Consulting AS - Production Analytics Division
Report contains {len(df):,} data records across {stats['_total_']['work_days']} working days