import os
import io
import sys
import json
import time
import hashlib
import logging
import requests
import subprocess
import pandas as pd
import altair as alt
import streamlit as st
from pathlib import Path
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
# Import the new path manager
try:
from path_config import path_manager
except ImportError:
# Add current directory to path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
sys.path.append('/app')
from path_config import path_manager
# Configure logging with error handling for restricted environments
def setup_streamlit_logging():
"""Setup logging with fallback for restricted file access"""
try:
# Try to create a log file in logs directory
log_file_path = path_manager.get_logs_path('streamlit_app.log')
log_file_path.parent.mkdir(parents=True, exist_ok=True)
# Test write access
with open(log_file_path, 'a') as test_file:
test_file.write('')
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file_path),
logging.StreamHandler()
]
)
return True
except (PermissionError, OSError):
# Fallback to console-only logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
return False
# Setup logging
file_logging_enabled = setup_streamlit_logging()
logger = logging.getLogger(__name__)
if not file_logging_enabled:
logger.warning("File logging disabled due to permission restrictions")
# Log environment info at startup
logger.info(f"Streamlit starting in {path_manager.environment} environment")
class StreamlitAppManager:
"""Manages Streamlit application state and functionality with dynamic paths"""
def __init__(self):
self.setup_config()
self.setup_api_client()
self.initialize_session_state()
def setup_config(self):
"""Setup application configuration"""
self.config = {
'api_url': "http://localhost:8000",
'max_upload_size': 1000 * 1024 * 1024, # 1000 MB
'supported_file_types': ['csv', 'txt', 'json'],
'max_text_length': 10000,
'prediction_timeout': 30,
'refresh_interval': 60,
'max_batch_size': 100
}
def setup_api_client(self):
"""Setup API client with error handling"""
self.session = requests.Session()
self.session.timeout = self.config['prediction_timeout']
# Test API connection
self.api_available = self.test_api_connection()
def test_api_connection(self) -> bool:
"""Test API connection"""
try:
response = self.session.get(
f"{self.config['api_url']}/health", timeout=5)
return response.status_code == 200
except:
return False
def initialize_session_state(self):
"""Initialize Streamlit session state"""
if 'prediction_history' not in st.session_state:
st.session_state.prediction_history = []
if 'upload_history' not in st.session_state:
st.session_state.upload_history = []
if 'last_refresh' not in st.session_state:
st.session_state.last_refresh = datetime.now()
if 'auto_refresh' not in st.session_state:
st.session_state.auto_refresh = False
def get_cv_results_from_api(self):
"""Get cross-validation results from API"""
try:
if not self.api_available:
return None
response = self.session.get(
f"{self.config['api_url']}/cv/results",
timeout=10
)
if response.status_code == 200:
return response.json()
elif response.status_code == 404:
return {'error': 'No CV results available'}
else:
return None
except Exception as e:
logger.warning(f"Could not fetch CV results: {e}")
return None
def get_model_comparison_from_api(self):
"""Get model comparison results from API"""
try:
if not self.api_available:
return None
response = self.session.get(
f"{self.config['api_url']}/cv/comparison",
timeout=10
)
if response.status_code == 200:
return response.json()
elif response.status_code == 404:
return {'error': 'No comparison results available'}
else:
return None
except Exception as e:
logger.warning(f"Could not fetch model comparison: {e}")
return None
def get_validation_statistics_from_api(self):
"""Get validation statistics from API"""
try:
if not self.api_available:
return None
response = self.session.get(
f"{self.config['api_url']}/validation/statistics",
timeout=10
)
if response.status_code == 200:
return response.json()
else:
return None
except Exception as e:
logger.warning(f"Could not fetch validation statistics: {e}")
return None
def get_validation_health_from_api(self):
"""Get validation system health from API"""
try:
if not self.api_available:
return None
response = self.session.get(
f"{self.config['api_url']}/validation/health",
timeout=10
)
if response.status_code == 200:
return response.json()
else:
return None
except Exception as e:
logger.warning(f"Could not fetch validation health: {e}")
return None
def get_validation_quality_report_from_api(self):
"""Get validation quality report from API"""
try:
if not self.api_available:
return None
response = self.session.get(f"{self.config['api_url']}/validation/quality-report", timeout=10)
return response.json() if response.status_code == 200 else None
except Exception as e:
logger.warning(f"Could not fetch quality report: {e}")
return None
def get_monitoring_metrics_from_api(self):
"""Get current monitoring metrics from API"""
try:
if not self.api_available:
return None
response = self.session.get(f"{self.config['api_url']}/monitor/metrics/current", timeout=10)
return response.json() if response.status_code == 200 else None
except Exception as e:
logger.warning(f"Could not fetch monitoring metrics: {e}")
return None
def get_monitoring_alerts_from_api(self):
"""Get monitoring alerts from API"""
try:
if not self.api_available:
return None
response = self.session.get(f"{self.config['api_url']}/monitor/alerts", timeout=10)
return response.json() if response.status_code == 200 else None
except Exception as e:
logger.warning(f"Could not fetch monitoring alerts: {e}")
return None
def get_prediction_patterns_from_api(self, hours: int = 24):
"""Get prediction patterns from API"""
try:
if not self.api_available:
return None
response = self.session.get(f"{self.config['api_url']}/monitor/patterns?hours={hours}", timeout=10)
return response.json() if response.status_code == 200 else None
except Exception as e:
logger.warning(f"Could not fetch prediction patterns: {e}")
return None
def get_automation_status_from_api(self):
"""Get automation status from API"""
try:
if not self.api_available:
return None
response = self.session.get(f"{self.config['api_url']}/automation/status", timeout=10)
return response.json() if response.status_code == 200 else None
except Exception as e:
logger.warning(f"Could not fetch automation status: {e}")
return None
# Blue-Green Deployment
def get_deployment_status_from_api(self):
"""Get deployment status from API"""
try:
if not self.api_available:
return None
response = self.session.get(f"{self.config['api_url']}/deployment/status", timeout=10)
return response.json() if response.status_code == 200 else None
except Exception as e:
logger.warning(f"Could not fetch deployment status: {e}")
return None
def get_traffic_status_from_api(self):
"""Get traffic routing status from API"""
try:
if not self.api_available:
return None
response = self.session.get(f"{self.config['api_url']}/deployment/traffic", timeout=10)
return response.json() if response.status_code == 200 else None
except Exception as e:
logger.warning(f"Could not fetch traffic status: {e}")
return None
# Initialize app manager
app_manager = StreamlitAppManager()
# Page configuration
st.set_page_config(
page_title="Fake News Detection System",
page_icon="📰",
layout="wide",
initial_sidebar_state="expanded"
)
# Custom CSS for better styling
st.markdown("""
""", unsafe_allow_html=True)
def load_json_file(file_path: Path, default: Any = None) -> Any:
"""Safely load JSON file with error handling"""
try:
if file_path.exists():
with open(file_path, 'r') as f:
return json.load(f)
return default or {}
except Exception as e:
logger.error(f"Failed to load {file_path}: {e}")
return default or {}
def show_logs_section():
"""Display system logs in Streamlit"""
st.subheader("System Logs")
log_files = {
"Activity Log": path_manager.get_activity_log_path(),
"Prediction Log": path_manager.get_logs_path("prediction_log.json"),
"Scheduler Log": path_manager.get_logs_path("scheduler_execution.json"),
"Drift History": path_manager.get_logs_path("drift_history.json"),
"Drift Alerts": path_manager.get_logs_path("drift_alerts.json"),
"Prediction Monitor": path_manager.get_logs_path("monitor/predictions.json"),
"Metrics Log": path_manager.get_logs_path("monitor/metrics.json"),
"Alerts Log": path_manager.get_logs_path("monitor/alerts.json")
}
col1, col2 = st.columns([2, 1])
with col1:
selected_log = st.selectbox("Select log file:", list(log_files.keys()))
with col2:
max_entries = st.number_input("Max entries:", min_value=10, max_value=1000, value=50)
if st.button("Load Log", type="primary"):
log_path = log_files[selected_log]
if log_path.exists():
try:
with open(log_path, 'r') as f:
log_data = json.load(f)
if log_data:
st.info(f"Total entries: {len(log_data)}")
if len(log_data) > max_entries:
log_data = log_data[-max_entries:]
st.warning(f"Showing last {max_entries} entries")
with st.expander("Raw JSON Data"):
st.json(log_data)
if isinstance(log_data, list) and log_data:
df = pd.DataFrame(log_data)
st.dataframe(df, use_container_width=True)
else:
st.warning("Log file is empty")
except Exception as e:
st.error(f"Error reading log: {e}")
else:
st.warning(f"Log file not found: {log_path}")
def render_cv_results_section():
"""Render cross-validation results section"""
st.subheader("🎯 Cross-Validation Results")
cv_results = app_manager.get_cv_results_from_api()
if cv_results is None:
st.warning("API not available - showing local CV results if available")
# Try to load local metadata
try:
from path_config import path_manager
metadata_path = path_manager.get_metadata_path()
if metadata_path.exists():
with open(metadata_path, 'r') as f:
metadata = json.load(f)
cv_results = {'cross_validation': metadata.get('cross_validation', {})}
else:
st.info("No local CV results found")
return
except Exception as e:
st.error(f"Could not load local CV results: {e}")
return
if cv_results and 'error' not in cv_results:
# Display model information
if 'model_version' in cv_results:
st.info(f"**Model Version:** {cv_results.get('model_version', 'Unknown')} | "
f"**Type:** {cv_results.get('model_type', 'Unknown')} | "
f"**Trained:** {cv_results.get('training_timestamp', 'Unknown')}")
cv_data = cv_results.get('cross_validation', {})
if cv_data:
# CV Methodology
methodology = cv_data.get('methodology', {})
col1, col2, col3 = st.columns(3)
with col1:
st.metric("CV Folds", methodology.get('n_splits', 'Unknown'))
with col2:
st.metric("CV Type", methodology.get('cv_type', 'StratifiedKFold'))
with col3:
st.metric("Random State", methodology.get('random_state', 42))
# Performance Metrics Summary
st.subheader("📊 Performance Summary")
test_scores = cv_data.get('test_scores', {})
if test_scores:
metrics_cols = st.columns(len(test_scores))
for idx, (metric, scores) in enumerate(test_scores.items()):
with metrics_cols[idx]:
if isinstance(scores, dict):
mean_val = scores.get('mean', 0)
std_val = scores.get('std', 0)
st.metric(
f"{metric.upper()}",
f"{mean_val:.4f}",
delta=f"±{std_val:.4f}"
)
# Detailed CV Scores Visualization
st.subheader("📈 Cross-Validation Scores by Metric")
# Create a comprehensive chart
chart_data = []
fold_results = cv_data.get('individual_fold_results', [])
if fold_results:
for fold_result in fold_results:
fold_num = fold_result.get('fold', 0)
test_scores_fold = fold_result.get('test_scores', {})
for metric, score in test_scores_fold.items():
chart_data.append({
'Fold': f"Fold {fold_num}",
'Metric': metric.upper(),
'Score': score,
'Type': 'Test'
})
# Add train scores if available
train_scores_fold = fold_result.get('train_scores', {})
for metric, score in train_scores_fold.items():
chart_data.append({
'Fold': f"Fold {fold_num}",
'Metric': metric.upper(),
'Score': score,
'Type': 'Train'
})
if chart_data:
df_cv = pd.DataFrame(chart_data)
# Create separate charts for each metric
for metric in df_cv['Metric'].unique():
metric_data = df_cv[df_cv['Metric'] == metric]
fig = px.bar(
metric_data,
x='Fold',
y='Score',
color='Type',
title=f'{metric} Scores Across CV Folds',
barmode='group'
)
fig.update_layout(height=400)
st.plotly_chart(fig, use_container_width=True)
# Performance Indicators
st.subheader("🔍 Model Quality Indicators")
performance_indicators = cv_data.get('performance_indicators', {})
col1, col2 = st.columns(2)
with col1:
overfitting_score = performance_indicators.get('overfitting_score', 'Unknown')
if isinstance(overfitting_score, (int, float)):
if overfitting_score < 0.05:
st.success(f"**Overfitting Score:** {overfitting_score:.4f} (Low)")
elif overfitting_score < 0.15:
st.warning(f"**Overfitting Score:** {overfitting_score:.4f} (Moderate)")
else:
st.error(f"**Overfitting Score:** {overfitting_score:.4f} (High)")
else:
st.info(f"**Overfitting Score:** {overfitting_score}")
with col2:
stability_score = performance_indicators.get('stability_score', 'Unknown')
if isinstance(stability_score, (int, float)):
if stability_score > 0.9:
st.success(f"**Stability Score:** {stability_score:.4f} (High)")
elif stability_score > 0.7:
st.warning(f"**Stability Score:** {stability_score:.4f} (Moderate)")
else:
st.error(f"**Stability Score:** {stability_score:.4f} (Low)")
else:
st.info(f"**Stability Score:** {stability_score}")
# Statistical Validation Results
if 'statistical_validation' in cv_results:
st.subheader("📈 Statistical Validation")
stat_validation = cv_results['statistical_validation']
for metric, validation_data in stat_validation.items():
if isinstance(validation_data, dict):
with st.expander(f"Statistical Tests - {metric.upper()}"):
col1, col2 = st.columns(2)
with col1:
st.write(f"**Improvement:** {validation_data.get('improvement', 0):.4f}")
st.write(f"**Effect Size:** {validation_data.get('effect_size', 0):.4f}")
with col2:
sig_improvement = validation_data.get('significant_improvement', False)
if sig_improvement:
st.success("**Significant Improvement:** Yes")
else:
st.info("**Significant Improvement:** No")
# Display test results
tests = validation_data.get('tests', {})
if tests:
st.write("**Statistical Test Results:**")
for test_name, test_result in tests.items():
if isinstance(test_result, dict):
p_value = test_result.get('p_value', 1.0)
significant = test_result.get('significant', False)
status = "✅ Significant" if significant else "❌ Not Significant"
st.write(f"- {test_name}: p-value = {p_value:.4f} ({status})")
# Promotion Validation
if 'promotion_validation' in cv_results:
st.subheader("🚀 Model Promotion Validation")
promotion_val = cv_results['promotion_validation']
col1, col2, col3 = st.columns(3)
with col1:
confidence = promotion_val.get('decision_confidence', 'Unknown')
if isinstance(confidence, (int, float)):
st.metric("Decision Confidence", f"{confidence:.2%}")
else:
st.metric("Decision Confidence", str(confidence))
with col2:
st.write(f"**Promotion Reason:**")
st.write(promotion_val.get('promotion_reason', 'Unknown'))
with col3:
st.write(f"**Comparison Method:**")
st.write(promotion_val.get('comparison_method', 'Unknown'))
# Raw CV Data (expandable)
with st.expander("🔍 Detailed CV Data"):
st.json(cv_data)
else:
st.info("No detailed CV test scores available")
else:
st.info("No cross-validation data available")
else:
error_msg = cv_results.get('error', 'Unknown error') if cv_results else 'No CV results available'
st.warning(f"Cross-validation results not available: {error_msg}")
def render_validation_statistics_section():
"""Render validation statistics section"""
st.subheader("📊 Data Validation Statistics")
validation_stats = app_manager.get_validation_statistics_from_api()
if validation_stats and validation_stats.get('statistics_available'):
overall_metrics = validation_stats.get('overall_metrics', {})
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Total Validations", overall_metrics.get('total_validations', 0))
with col2:
st.metric("Articles Processed", overall_metrics.get('total_articles_processed', 0))
with col3:
success_rate = overall_metrics.get('overall_success_rate', 0)
st.metric("Success Rate", f"{success_rate:.1%}")
with col4:
quality_score = overall_metrics.get('average_quality_score', 0)
st.metric("Avg Quality", f"{quality_score:.3f}")
else:
st.info("No validation statistics available yet. Please make predictions first to generate validation statistics")
def render_validation_quality_report():
"""Render validation quality report section"""
st.subheader("📋 Data Quality Report")
quality_report = app_manager.get_validation_quality_report_from_api()
if quality_report and 'error' not in quality_report:
overall_stats = quality_report.get('overall_statistics', {})
quality_assessment = quality_report.get('quality_assessment', {})
col1, col2 = st.columns(2)
with col1:
st.metric("Total Articles", overall_stats.get('total_articles', 0))
st.metric("Success Rate", f"{overall_stats.get('overall_success_rate', 0):.1%}")
with col2:
quality_level = quality_assessment.get('quality_level', 'unknown')
if quality_level == 'excellent':
st.success(f"Quality Level: {quality_level.title()}")
elif quality_level == 'good':
st.info(f"Quality Level: {quality_level.title()}")
elif quality_level == 'fair':
st.warning(f"Quality Level: {quality_level.title()}")
else:
st.error(f"Quality Level: {quality_level.title()}")
recommendations = quality_report.get('recommendations', [])
if recommendations:
st.subheader("💡 Recommendations")
for i, rec in enumerate(recommendations, 1):
st.write(f"{i}. {rec}")
else:
st.info("Quality report not available yet. Please make predictions first to generate data quality report")
def render_model_comparison_section():
"""Render model comparison results section"""
st.subheader("⚖️ Model Comparison Results")
comparison_results = app_manager.get_model_comparison_from_api()
if comparison_results is None:
st.warning("API not available - comparison results not accessible")
return
if comparison_results and 'error' not in comparison_results:
# Comparison Summary
summary = comparison_results.get('summary', {})
models_compared = comparison_results.get('models_compared', {})
st.info(f"**Comparison:** {models_compared.get('model1_name', 'Model 1')} vs "
f"{models_compared.get('model2_name', 'Model 2')} | "
f"**Timestamp:** {comparison_results.get('comparison_timestamp', 'Unknown')}")
# Decision Summary
col1, col2, col3 = st.columns(3)
with col1:
decision = summary.get('decision', False)
if decision:
st.success("**Decision:** Promote New Model")
else:
st.info("**Decision:** Keep Current Model")
with col2:
confidence = summary.get('confidence', 0)
st.metric("Decision Confidence", f"{confidence:.2%}")
with col3:
st.write("**Reason:**")
st.write(summary.get('reason', 'Unknown'))
# Performance Comparison
st.subheader("📊 Performance Comparison")
prod_performance = comparison_results.get('model_performance', {}).get('production_model', {})
cand_performance = comparison_results.get('model_performance', {}).get('candidate_model', {})
# Create comparison chart
if prod_performance.get('test_scores') and cand_performance.get('test_scores'):
comparison_data = []
prod_scores = prod_performance['test_scores']
cand_scores = cand_performance['test_scores']
for metric in set(prod_scores.keys()) & set(cand_scores.keys()):
prod_mean = prod_scores[metric].get('mean', 0)
cand_mean = cand_scores[metric].get('mean', 0)
comparison_data.extend([
{'Model': 'Production', 'Metric': metric.upper(), 'Score': prod_mean},
{'Model': 'Candidate', 'Metric': metric.upper(), 'Score': cand_mean}
])
if comparison_data:
df_comparison = pd.DataFrame(comparison_data)
fig = px.bar(
df_comparison,
x='Metric',
y='Score',
color='Model',
title='Model Performance Comparison',
barmode='group'
)
fig.update_layout(height=400)
st.plotly_chart(fig, use_container_width=True)
# Detailed Metric Comparisons
st.subheader("🔍 Detailed Metric Analysis")
metric_comparisons = comparison_results.get('metric_comparisons', {})
if metric_comparisons:
for metric, comparison_data in metric_comparisons.items():
if isinstance(comparison_data, dict):
with st.expander(f"{metric.upper()} Analysis"):
col1, col2, col3 = st.columns(3)
with col1:
improvement = comparison_data.get('improvement', 0)
rel_improvement = comparison_data.get('relative_improvement', 0)
if improvement > 0:
st.success(f"**Improvement:** +{improvement:.4f}")
st.success(f"**Relative:** +{rel_improvement:.2f}%")
else:
st.info(f"**Improvement:** {improvement:.4f}")
st.info(f"**Relative:** {rel_improvement:.2f}%")
with col2:
effect_size = comparison_data.get('effect_size', 0)
if abs(effect_size) > 0.8:
st.success(f"**Effect Size:** {effect_size:.4f} (Large)")
elif abs(effect_size) > 0.5:
st.warning(f"**Effect Size:** {effect_size:.4f} (Medium)")
else:
st.info(f"**Effect Size:** {effect_size:.4f} (Small)")
with col3:
sig_improvement = comparison_data.get('significant_improvement', False)
practical_sig = comparison_data.get('practical_significance', False)
if sig_improvement:
st.success("**Statistical Significance:** Yes")
else:
st.info("**Statistical Significance:** No")
if practical_sig:
st.success("**Practical Significance:** Yes")
else:
st.info("**Practical Significance:** No")
# Statistical test results
tests = comparison_data.get('tests', {})
if tests:
st.write("**Statistical Tests:**")
for test_name, test_result in tests.items():
if isinstance(test_result, dict):
p_value = test_result.get('p_value', 1.0)
significant = test_result.get('significant', False)
status = "✅" if significant else "❌"
st.write(f"- {test_name}: p = {p_value:.4f} {status}")
# CV Methodology
cv_methodology = comparison_results.get('cv_methodology', {})
if cv_methodology:
st.subheader("🎯 Cross-Validation Methodology")
st.info(f"**CV Folds:** {cv_methodology.get('cv_folds', 'Unknown')} | "
f"**Session ID:** {comparison_results.get('session_id', 'Unknown')}")
# Raw comparison data (expandable)
with st.expander("🔍 Raw Comparison Data"):
st.json(comparison_results)
else:
error_msg = comparison_results.get('error', 'Unknown error') if comparison_results else 'No comparison results available'
st.warning(f"Model comparison results not available: {error_msg}")
def save_prediction_to_history(text: str, prediction: str, confidence: float):
"""Save prediction to session history"""
prediction_entry = {
'timestamp': datetime.now().isoformat(),
'text': text[:100] + "..." if len(text) > 100 else text,
'prediction': prediction,
'confidence': confidence,
'text_length': len(text)
}
st.session_state.prediction_history.append(prediction_entry)
# Keep only last 50 predictions
if len(st.session_state.prediction_history) > 50:
st.session_state.prediction_history = st.session_state.prediction_history[-50:]
def make_prediction_request(text: str) -> Dict[str, Any]:
"""Make prediction request to API"""
try:
if not app_manager.api_available:
return {'error': 'API is not available'}
response = app_manager.session.post(
f"{app_manager.config['api_url']}/predict",
json={"text": text},
timeout=app_manager.config['prediction_timeout']
)
if response.status_code == 200:
return response.json()
else:
return {'error': f'API Error: {response.status_code} - {response.text}'}
except requests.exceptions.Timeout:
return {'error': 'Request timed out. Please try again.'}
except requests.exceptions.ConnectionError:
return {'error': 'Cannot connect to prediction service.'}
except Exception as e:
return {'error': f'Unexpected error: {str(e)}'}
def validate_text_input(text: str) -> tuple[bool, str]:
"""Validate text input"""
if not text or not text.strip():
return False, "Please enter some text to analyze."
if len(text) < 10:
return False, "Text must be at least 10 characters long."
if len(text) > app_manager.config['max_text_length']:
return False, f"Text must be less than {app_manager.config['max_text_length']} characters."
# Check for suspicious content
suspicious_patterns = ['