Fake-News-Detection-with-MLOps / initialize_system.py
Ahmedik95316's picture
Update initialize_system.py
075b701 verified
import os
import sys
import json
import shutil
import joblib
import pandas as pd
from pathlib import Path
from datetime import datetime
from sklearn.pipeline import Pipeline
from sklearn.ensemble import VotingClassifier
from sklearn.model_selection import cross_validate
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
from sklearn.feature_extraction.text import TfidfVectorizer
# Import the new path manager# Cal
try:
from path_config import path_manager
except ImportError:
# Add current directory to path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from path_config import path_manager
def log_step(message):
"""Log initialization steps"""
print(f"[{datetime.now().strftime('%H:%M:%S')}] {message}")
def create_directories():
"""Create necessary directories"""
log_step("Creating directory structure...")
# Directories are already created by path_manager initialization
directories = [
path_manager.get_data_path(),
path_manager.get_model_path(),
path_manager.get_logs_path(),
path_manager.get_cache_path(),
path_manager.get_temp_path()
]
for dir_path in directories:
if dir_path.exists():
log_step(f"βœ… Directory exists: {dir_path}")
else:
try:
dir_path.mkdir(parents=True, exist_ok=True)
log_step(f"βœ… Created directory: {dir_path}")
except Exception as e:
log_step(f"⚠️ Failed to create {dir_path}: {e}")
return False
# Create kaggle subdirectory
kaggle_dir = path_manager.get_data_path('kaggle')
kaggle_dir.mkdir(parents=True, exist_ok=True)
log_step(f"βœ… Created kaggle directory: {kaggle_dir}")
return True
def check_existing_datasets():
"""Check for existing datasets in the project structure"""
log_step("Checking for existing datasets...")
# Check for datasets in the current project structure
base_dir = path_manager.base_paths['base']
# Possible source locations
source_locations = [
base_dir / "data" / "kaggle" / "Fake.csv",
base_dir / "data" / "kaggle" / "True.csv",
base_dir / "data" / "combined_dataset.csv"
]
found_files = []
for source_file in source_locations:
if source_file.exists():
found_files.append(source_file)
log_step(f"βœ… Found existing dataset: {source_file}")
return found_files
def copy_existing_datasets():
"""Copy existing datasets if they're not in the target location"""
log_step("Copying existing datasets to target locations...")
base_dir = path_manager.base_paths['base']
target_data_dir = path_manager.get_data_path()
# Define source-target pairs
copy_operations = [
(base_dir / "data" / "kaggle" / "Fake.csv", target_data_dir / "kaggle" / "Fake.csv"),
(base_dir / "data" / "kaggle" / "True.csv", target_data_dir / "kaggle" / "True.csv"),
(base_dir / "data" / "combined_dataset.csv", target_data_dir / "combined_dataset.csv")
]
copied_count = 0
for source, target in copy_operations:
# Skip if source and target are the same (already in correct location)
if source == target:
if source.exists():
log_step(f"βœ… Dataset already in correct location: {target}")
copied_count += 1
continue
if source.exists():
try:
# Ensure target directory exists
target.parent.mkdir(parents=True, exist_ok=True)
# Copy file
shutil.copy2(source, target)
log_step(f"βœ… Copied {source} β†’ {target}")
copied_count += 1
except Exception as e:
log_step(f"⚠️ Failed to copy {source}: {e}")
else:
log_step(f"⚠️ Source file not found: {source}")
return copied_count > 0
def create_minimal_dataset():
"""Create a minimal dataset if no existing dataset is found"""
log_step("Creating minimal dataset...")
combined_path = path_manager.get_combined_dataset_path()
if combined_path.exists():
log_step(f"βœ… Combined dataset already exists: {combined_path}")
return True
try:
# Create minimal training data with diverse examples
minimal_data = pd.DataFrame({
'text': [
# Real news examples
'Scientists at MIT have developed a new renewable energy technology that could revolutionize solar power generation.',
'The Federal Reserve announced interest rate decisions following their latest economic review meeting.',
'Local authorities report significant improvements in air quality following new environmental regulations.',
'Research published in Nature journal reveals new insights about climate change adaptation strategies.',
'Economic indicators show steady growth in the manufacturing sector across multiple regions.',
'Healthcare officials recommend updated vaccination schedules based on latest medical research findings.',
'Transportation department announces infrastructure improvements for major highway systems nationwide.',
'Educational institutions implement new digital learning platforms to enhance student engagement.',
'Agricultural experts develop drought-resistant crop varieties to improve food security globally.',
'Technology companies invest heavily in cybersecurity measures to protect user data privacy.',
# Fake news examples
'SHOCKING: Government officials secretly planning to control population through mind control technology.',
'EXCLUSIVE: Celebrities caught in massive alien communication scandal that mainstream media won\'t report.',
'BREAKING: Scientists discover time travel but government hiding the truth from public knowledge.',
'EXPOSED: Pharmaceutical companies deliberately spreading diseases to increase their massive profits.',
'URGENT: Social media platforms using secret algorithms to brainwash users into political compliance.',
'LEAKED: Banking system about to collapse completely, insiders reveal financial catastrophe coming soon.',
'CONFIRMED: Weather modification technology being used to create artificial natural disasters worldwide.',
'REVEALED: Food companies adding dangerous chemicals that cause instant health problems and addiction.',
'CONSPIRACY: Educational system designed to suppress critical thinking and create obedient citizens.',
'TRUTH: Technology giants working with foreign powers to undermine national sovereignty completely.'
],
'label': [
# Real news labels (0)
0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
# Fake news labels (1)
1, 1, 1, 1, 1, 1, 1, 1, 1, 1
]
})
# Save the dataset
minimal_data.to_csv(combined_path, index=False)
log_step(f"βœ… Created minimal dataset with {len(minimal_data)} samples at {combined_path}")
# Verify the file was created correctly
if combined_path.exists():
df_check = pd.read_csv(combined_path)
log_step(f"βœ… Verified dataset: {len(df_check)} rows loaded successfully")
return True
else:
log_step("❌ Failed to verify created dataset")
return False
except Exception as e:
log_step(f"❌ Failed to create minimal dataset: {str(e)}")
return False
def run_initial_training():
"""Run basic model training"""
log_step("Starting initial model training...")
try:
# Get all the paths
model_path = path_manager.get_model_file_path()
vectorizer_path = path_manager.get_vectorizer_path()
pipeline_path = path_manager.get_pipeline_path()
log_step(f"Model path: {model_path}")
log_step(f"Vectorizer path: {vectorizer_path}")
log_step(f"Pipeline path: {pipeline_path}")
# Check if model already exists
if pipeline_path.exists() or (model_path.exists() and vectorizer_path.exists()):
log_step("βœ… Model files already exist, checking if pipeline needs to be created...")
# If individual components exist but pipeline doesn't, create pipeline
if model_path.exists() and vectorizer_path.exists() and not pipeline_path.exists():
log_step("Creating pipeline from existing components...")
try:
# Load existing components
# model = joblib.load(model_path)
vectorizer = joblib.load(vectorizer_path)
# Create pipeline
# pipeline = Pipeline([
# ('vectorizer', vectorizer),
# ('model', model)
# ])
# Create ensemble method pipeline
# Initialize ensemble model
ensemble_model = VotingClassifier(
estimators=[
('logistic', LogisticRegression(max_iter=1000, random_state=42, class_weight='balanced')),
('random_forest', RandomForestClassifier(n_estimators=50, random_state=42, class_weight='balanced'))
],
voting='soft'
)
pipeline = Pipeline([
('vectorizer', TfidfVectorizer(...)),
('model', ensemble_model) # Use ensemble instead of single model
])
# Save pipeline
joblib.dump(pipeline, pipeline_path)
log_step(f"βœ… Created pipeline from existing components: {pipeline_path}")
except Exception as e:
log_step(f"⚠️ Failed to create pipeline from existing components: {e}")
return True
# Load dataset
dataset_path = path_manager.get_combined_dataset_path()
if not dataset_path.exists():
log_step("❌ No dataset available for training")
return False
df = pd.read_csv(dataset_path)
log_step(f"Loaded dataset with {len(df)} samples")
# Validate dataset
if len(df) < 10:
log_step("❌ Dataset too small for training")
return False
# Prepare data
X = df['text'].values
y = df['label'].values
# Check class distribution
class_counts = pd.Series(y).value_counts()
log_step(f"Class distribution: {class_counts.to_dict()}")
# Train-test split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y if len(class_counts) > 1 else None
)
# Create pipeline with preprocessing
pipeline = Pipeline([
('vectorizer', TfidfVectorizer(
max_features=5000,
stop_words='english',
ngram_range=(1, 2),
min_df=1,
max_df=0.95
)),
('model', LogisticRegression(
max_iter=1000,
random_state=42,
class_weight='balanced'
))
])
# Train model with cross-validation
log_step("Training model with cross-validation...")
# Perform cross-validation before final training
cv_results = cross_validate(
pipeline, X_train, y_train,
cv=3,
scoring=['accuracy', 'f1_weighted', 'precision_weighted', 'recall_weighted'],
return_train_score=True
)
# Train final model on all training data
pipeline.fit(X_train, y_train)
# Evaluate on test set
y_pred = pipeline.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred, average='weighted')
# Calculate quality indicators directly from cv_results
train_acc_mean = float(cv_results['train_accuracy'].mean())
test_acc_mean = float(cv_results['test_accuracy'].mean())
test_acc_std = float(cv_results['test_accuracy'].std())
overfitting_score = train_acc_mean - test_acc_mean
stability_score = 1 - (test_acc_std / test_acc_mean) if test_acc_mean > 0 else 0
# Save CV results for API access
cv_data = {
"methodology": {
"n_splits": 3,
"cv_type": "StratifiedKFold",
"random_state": 42
},
"test_scores": {
"accuracy": {
"mean": test_acc_mean,
"std": test_acc_std,
"scores": cv_results['test_accuracy'].tolist()
},
"f1": {
"mean": float(cv_results['test_f1_weighted'].mean()),
"std": float(cv_results['test_f1_weighted'].std()),
"scores": cv_results['test_f1_weighted'].tolist()
}
},
"train_scores": {
"accuracy": {
"mean": train_acc_mean,
"std": float(cv_results['train_accuracy'].std()),
"scores": cv_results['train_accuracy'].tolist()
}
},
"performance_indicators": {
"overfitting_score": overfitting_score,
"stability_score": stability_score
}
}
# Save CV results
cv_results_path = path_manager.get_logs_path("cv_results.json")
with open(cv_results_path, 'w') as f:
json.dump(cv_data, f, indent=2)
log_step(f"Saved CV results to: {cv_results_path}")
# Ensure model directory exists
model_path.parent.mkdir(parents=True, exist_ok=True)
# Save complete pipeline FIRST (this is the priority)
log_step(f"Saving pipeline to: {pipeline_path}")
joblib.dump(pipeline, pipeline_path)
# Verify pipeline was saved
if pipeline_path.exists():
log_step(f"βœ… Pipeline saved successfully to {pipeline_path}")
# Test loading the pipeline
try:
test_pipeline = joblib.load(pipeline_path)
test_pred = test_pipeline.predict(["This is a test"])
log_step(f"βœ… Pipeline verification successful: {test_pred}")
except Exception as e:
log_step(f"⚠️ Pipeline verification failed: {e}")
else:
log_step(f"❌ Pipeline was not saved to {pipeline_path}")
# Save individual components for backward compatibility
try:
joblib.dump(pipeline.named_steps['model'], model_path)
joblib.dump(pipeline.named_steps['vectorizer'], vectorizer_path)
log_step(f"βœ… Saved individual components")
except Exception as e:
log_step(f"⚠️ Failed to save individual components: {e}")
# Save metadata
metadata = {
"model_version": "v1.0_init",
"model_type": "ensemble_voting_pipeline", # "logistic_regression_pipeline",
"test_accuracy": float(accuracy),
"test_f1": float(f1),
"train_size": len(X_train),
"test_size": len(X_test),
"timestamp": datetime.now().isoformat(),
"training_method": "initialization",
"environment": path_manager.environment,
"data_path": str(dataset_path),
"class_distribution": class_counts.to_dict(),
"pipeline_created": pipeline_path.exists(),
"individual_components_created": model_path.exists() and vectorizer_path.exists(),
"cv_f1_mean": float(cv_results['test_f1_weighted'].mean()),
"cv_f1_std": float(cv_results['test_f1_weighted'].std()),
"cv_accuracy_mean": float(cv_results['test_accuracy'].mean()),
"cv_accuracy_std": float(cv_results['test_accuracy'].std()),
"overfitting_score": cv_data['performance_indicators']['overfitting_score'],
"stability_score": cv_data['performance_indicators']['stability_score']
}
metadata_path = path_manager.get_metadata_path()
with open(metadata_path, 'w') as f:
json.dump(metadata, f, indent=2)
log_step(f"βœ… Training completed successfully")
log_step(f" Accuracy: {accuracy:.4f}")
log_step(f" F1 Score: {f1:.4f}")
log_step(f" Pipeline saved: {pipeline_path.exists()}")
log_step(f" Model saved to: {model_path}")
log_step(f" Vectorizer saved to: {vectorizer_path}")
return True
except Exception as e:
log_step(f"❌ Training failed: {str(e)}")
import traceback
log_step(f"❌ Traceback: {traceback.format_exc()}")
return False
def create_initial_logs():
"""Create initial log files"""
log_step("Creating initial log files...")
try:
# Activity log
activity_log = [{
"timestamp": datetime.now().strftime("%Y-%m-%d %I:%M %p"),
"event": "System initialized successfully",
"level": "INFO",
"environment": path_manager.environment
}]
activity_log_path = path_manager.get_activity_log_path()
with open(activity_log_path, 'w') as f:
json.dump(activity_log, f, indent=2)
log_step(f"βœ… Created activity log: {activity_log_path}")
# Create monitoring directory structure
monitor_dir = path_manager.get_logs_path("monitor")
monitor_dir.mkdir(parents=True, exist_ok=True)
log_step(f"βœ… Created monitor directory: {monitor_dir}")
# Create empty monitoring logs
monitoring_log_path = path_manager.get_logs_path("monitoring_log.json")
with open(monitoring_log_path, 'w') as f:
json.dump([], f)
log_step(f"βœ… Created monitoring log: {monitoring_log_path}")
# Create monitor-specific log files
monitor_log_files = [
"monitor/predictions.json",
"monitor/metrics.json",
"monitor/alerts.json"
]
for log_file in monitor_log_files:
log_path = path_manager.get_logs_path(log_file)
log_path.parent.mkdir(parents=True, exist_ok=True) # Ensure parent dir exists
if not log_path.exists():
with open(log_path, 'w') as f:
json.dump([], f)
log_step(f"βœ… Created {log_file}")
# Create other necessary log files
log_files = [
"drift_history.json",
"drift_alerts.json",
"scheduler_execution.json",
"scheduler_errors.json"
]
for log_file in log_files:
log_path = path_manager.get_logs_path(log_file)
if not log_path.exists():
with open(log_path, 'w') as f:
json.dump([], f)
log_step(f"βœ… Created {log_file}")
return True
except Exception as e:
log_step(f"❌ Log creation failed: {str(e)}")
return False
def verify_system():
"""Verify that the system is properly initialized"""
log_step("Verifying system initialization...")
# Check critical files
critical_files = [
(path_manager.get_combined_dataset_path(), "Combined dataset"),
(path_manager.get_model_file_path(), "Model file"),
(path_manager.get_vectorizer_path(), "Vectorizer file"),
(path_manager.get_pipeline_path(), "Pipeline file"),
(path_manager.get_metadata_path(), "Metadata file"),
(path_manager.get_activity_log_path(), "Activity log")
]
all_good = True
for file_path, description in critical_files:
if file_path.exists():
log_step(f"βœ… {description}: {file_path}")
else:
log_step(f"❌ Missing {description}: {file_path}")
if description == "Pipeline file":
# Pipeline is critical, mark as not all good
all_good = False
# Test model loading - prioritize pipeline
try:
import joblib
pipeline_path = path_manager.get_pipeline_path()
if pipeline_path.exists():
pipeline = joblib.load(pipeline_path)
test_pred = pipeline.predict(["This is a test text"])
log_step(f"βœ… Pipeline test prediction successful: {test_pred}")
else:
log_step("⚠️ Pipeline not available, testing individual components...")
model_path = path_manager.get_model_file_path()
vectorizer_path = path_manager.get_vectorizer_path()
if model_path.exists() and vectorizer_path.exists():
model = joblib.load(model_path)
vectorizer = joblib.load(vectorizer_path)
test_text_vec = vectorizer.transform(["This is a test text"])
test_pred = model.predict(test_text_vec)
log_step(f"βœ… Individual components test prediction successful: {test_pred}")
else:
log_step("❌ No working model components found")
all_good = False
except Exception as e:
log_step(f"❌ Model test failed: {e}")
all_good = False
return all_good
def main():
"""Main initialization function"""
log_step("πŸš€ Starting system initialization...")
log_step(f"🌍 Environment: {path_manager.environment}")
log_step(f"πŸ“ Base directory: {path_manager.base_paths['base']}")
log_step(f"πŸ“Š Data directory: {path_manager.base_paths['data']}")
log_step(f"πŸ€– Model directory: {path_manager.base_paths['model']}")
steps = [
("Directory Creation", create_directories),
("Existing Dataset Copy", copy_existing_datasets),
("Minimal Dataset Creation", create_minimal_dataset),
("Model Training", run_initial_training),
("Log File Creation", create_initial_logs),
("System Verification", verify_system)
]
failed_steps = []
completed_steps = []
for step_name, step_function in steps:
try:
log_step(f"πŸ”„ Starting: {step_name}")
if step_function():
log_step(f"βœ… {step_name} completed")
completed_steps.append(step_name)
else:
log_step(f"❌ {step_name} failed")
failed_steps.append(step_name)
except Exception as e:
log_step(f"❌ {step_name} failed: {str(e)}")
failed_steps.append(step_name)
# Summary
log_step(f"\nπŸ“Š Initialization Summary:")
log_step(f" βœ… Completed: {len(completed_steps)}/{len(steps)} steps")
log_step(f" ❌ Failed: {len(failed_steps)}/{len(steps)} steps")
if completed_steps:
log_step(f" Completed steps: {', '.join(completed_steps)}")
if failed_steps:
log_step(f" Failed steps: {', '.join(failed_steps)}")
log_step(f"⚠️ Initialization completed with {len(failed_steps)} failed steps")
else:
log_step("πŸŽ‰ System initialization completed successfully!")
# Environment info
log_step(f"\nπŸ” Environment Information:")
env_info = path_manager.get_environment_info()
log_step(f" Environment: {env_info['environment']}")
log_step(f" Available datasets: {sum(env_info['available_datasets'].values())}")
log_step(f" Available models: {sum(env_info['available_models'].values())}")
# Final pipeline check
pipeline_path = path_manager.get_pipeline_path()
log_step(f"\n🎯 Final Pipeline Check:")
log_step(f" Pipeline path: {pipeline_path}")
log_step(f" Pipeline exists: {pipeline_path.exists()}")
if pipeline_path.exists():
try:
import joblib
pipeline = joblib.load(pipeline_path)
log_step(f" Pipeline loadable: βœ…")
log_step(f" Pipeline steps: {list(pipeline.named_steps.keys())}")
except Exception as e:
log_step(f" Pipeline load error: {e}")
log_step("\n🎯 System ready for use!")
return len(failed_steps) == 0
if __name__ == "__main__":
success = main()
if not success:
sys.exit(1)