# File: features/feature_engineer.py # Enhanced Feature Engineering Pipeline for Priority 6 import json import joblib import logging import numpy as np import pandas as pd from pathlib import Path from datetime import datetime from scipy.sparse import hstack, csr_matrix from typing import Dict, List, Any, Optional, Tuple from sklearn.pipeline import Pipeline from sklearn.base import BaseEstimator, TransformerMixin from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.feature_selection import SelectKBest, chi2, mutual_info_classif from sklearn.preprocessing import StandardScaler, FunctionTransformer import warnings warnings.filterwarnings('ignore') # Import feature analyzers from features.sentiment_analyzer import SentimentAnalyzer from features.readability_analyzer import ReadabilityAnalyzer from features.entity_analyzer import EntityAnalyzer from features.linguistic_analyzer import LinguisticAnalyzer # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class AdvancedFeatureEngineer(BaseEstimator, TransformerMixin): """ Advanced feature engineering pipeline combining multiple NLP feature extractors for enhanced fake news detection performance. """ def __init__(self, enable_sentiment: bool = True, enable_readability: bool = True, enable_entities: bool = True, enable_linguistic: bool = True, feature_selection_k: int = 5000, tfidf_max_features: int = 10000, ngram_range: Tuple[int, int] = (1, 3), min_df: int = 2, max_df: float = 0.95): """ Initialize the advanced feature engineering pipeline. Args: enable_sentiment: Enable sentiment analysis features enable_readability: Enable readability/complexity features enable_entities: Enable named entity recognition features enable_linguistic: Enable advanced linguistic features feature_selection_k: Number of features to select tfidf_max_features: Maximum TF-IDF features ngram_range: N-gram range for TF-IDF min_df: Minimum document frequency for TF-IDF max_df: Maximum document frequency for TF-IDF """ self.enable_sentiment = enable_sentiment self.enable_readability = enable_readability self.enable_entities = enable_entities self.enable_linguistic = enable_linguistic self.feature_selection_k = feature_selection_k self.tfidf_max_features = tfidf_max_features self.ngram_range = ngram_range self.min_df = min_df self.max_df = max_df # Initialize feature extractors self.sentiment_analyzer = SentimentAnalyzer() if enable_sentiment else None self.readability_analyzer = ReadabilityAnalyzer() if enable_readability else None self.entity_analyzer = EntityAnalyzer() if enable_entities else None self.linguistic_analyzer = LinguisticAnalyzer() if enable_linguistic else None # Initialize TF-IDF components self.tfidf_vectorizer = None self.feature_selector = None self.feature_scaler = None # Feature metadata self.feature_names_ = [] self.feature_importance_ = {} self.is_fitted_ = False def fit(self, X, y=None): """ Fit the feature engineering pipeline. Args: X: Text data (array-like of strings) y: Target labels (optional, for supervised feature selection) """ logger.info("Fitting advanced feature engineering pipeline...") # Convert to array if needed if isinstance(X, pd.Series): X = X.values elif isinstance(X, list): X = np.array(X) # Validate input if len(X) == 0: raise ValueError("Cannot fit on empty data") # Initialize TF-IDF vectorizer self.tfidf_vectorizer = TfidfVectorizer( max_features=self.tfidf_max_features, ngram_range=self.ngram_range, min_df=self.min_df, max_df=self.max_df, stop_words='english', sublinear_tf=True, norm='l2', lowercase=True ) # Fit TF-IDF on text data logger.info("Fitting TF-IDF vectorizer...") tfidf_features = self.tfidf_vectorizer.fit_transform(X) # Extract additional features additional_features = self._extract_additional_features(X, fit=True) # Combine all features if additional_features.shape[1] > 0: all_features = hstack([tfidf_features, additional_features]) else: all_features = tfidf_features logger.info(f"Total features before selection: {all_features.shape[1]}") # Feature selection if y is not None and self.feature_selection_k < all_features.shape[1]: logger.info(f"Performing feature selection (k={self.feature_selection_k})...") # Use chi2 for text features and mutual information for numerical features self.feature_selector = SelectKBest( score_func=chi2, k=min(self.feature_selection_k, all_features.shape[1]) ) # Ensure non-negative features for chi2 if hasattr(all_features, 'toarray'): features_dense = all_features.toarray() else: features_dense = all_features # Make features non-negative for chi2 features_dense = np.maximum(features_dense, 0) self.feature_selector.fit(features_dense, y) selected_features = self.feature_selector.transform(features_dense) logger.info(f"Selected {selected_features.shape[1]} features") else: selected_features = all_features # Scale numerical features (additional features only) if additional_features.shape[1] > 0: self.feature_scaler = StandardScaler() # Only scale the additional features part additional_selected = selected_features[:, -additional_features.shape[1]:] self.feature_scaler.fit(additional_selected) # Generate feature names self._generate_feature_names() # Calculate feature importance if possible if y is not None and self.feature_selector is not None: self._calculate_feature_importance() self.is_fitted_ = True logger.info("Feature engineering pipeline fitted successfully") return self def transform(self, X): """ Transform text data into enhanced feature vectors. Args: X: Text data (array-like of strings) Returns: Transformed feature matrix """ if not self.is_fitted_: raise ValueError("Pipeline must be fitted before transforming") # Convert to array if needed if isinstance(X, pd.Series): X = X.values elif isinstance(X, list): X = np.array(X) # Extract TF-IDF features tfidf_features = self.tfidf_vectorizer.transform(X) # Extract additional features additional_features = self._extract_additional_features(X, fit=False) # Combine features if additional_features.shape[1] > 0: all_features = hstack([tfidf_features, additional_features]) else: all_features = tfidf_features # Apply feature selection if self.feature_selector is not None: if hasattr(all_features, 'toarray'): features_dense = all_features.toarray() else: features_dense = all_features # Ensure non-negative for consistency features_dense = np.maximum(features_dense, 0) selected_features = self.feature_selector.transform(features_dense) else: selected_features = all_features # Scale additional features if scaler exists if self.feature_scaler is not None and additional_features.shape[1] > 0: # Scale only the additional features part tfidf_selected = selected_features[:, :-additional_features.shape[1]] additional_selected = selected_features[:, -additional_features.shape[1]:] additional_scaled = self.feature_scaler.transform(additional_selected) # Combine back if hasattr(tfidf_selected, 'toarray'): tfidf_selected = tfidf_selected.toarray() final_features = np.hstack([tfidf_selected, additional_scaled]) else: if hasattr(selected_features, 'toarray'): final_features = selected_features.toarray() else: final_features = selected_features return final_features def _extract_additional_features(self, X, fit=False): """Extract additional features beyond TF-IDF""" feature_arrays = [] try: # Sentiment features if self.sentiment_analyzer is not None: logger.info("Extracting sentiment features...") if fit: sentiment_features = self.sentiment_analyzer.fit_transform(X) else: sentiment_features = self.sentiment_analyzer.transform(X) feature_arrays.append(sentiment_features) # Readability features if self.readability_analyzer is not None: logger.info("Extracting readability features...") if fit: readability_features = self.readability_analyzer.fit_transform(X) else: readability_features = self.readability_analyzer.transform(X) feature_arrays.append(readability_features) # Entity features if self.entity_analyzer is not None: logger.info("Extracting entity features...") if fit: entity_features = self.entity_analyzer.fit_transform(X) else: entity_features = self.entity_analyzer.transform(X) feature_arrays.append(entity_features) # Linguistic features if self.linguistic_analyzer is not None: logger.info("Extracting linguistic features...") if fit: linguistic_features = self.linguistic_analyzer.fit_transform(X) else: linguistic_features = self.linguistic_analyzer.transform(X) feature_arrays.append(linguistic_features) # Combine all additional features if feature_arrays: additional_features = np.hstack(feature_arrays) logger.info(f"Extracted {additional_features.shape[1]} additional features") else: additional_features = np.empty((len(X), 0)) except Exception as e: logger.warning(f"Error extracting additional features: {e}") additional_features = np.empty((len(X), 0)) return additional_features def _generate_feature_names(self): """Generate comprehensive feature names""" self.feature_names_ = [] # TF-IDF feature names if self.tfidf_vectorizer is not None: tfidf_names = [f"tfidf_{name}" for name in self.tfidf_vectorizer.get_feature_names_out()] self.feature_names_.extend(tfidf_names) # Additional feature names if self.sentiment_analyzer is not None: self.feature_names_.extend(self.sentiment_analyzer.get_feature_names()) if self.readability_analyzer is not None: self.feature_names_.extend(self.readability_analyzer.get_feature_names()) if self.entity_analyzer is not None: self.feature_names_.extend(self.entity_analyzer.get_feature_names()) if self.linguistic_analyzer is not None: self.feature_names_.extend(self.linguistic_analyzer.get_feature_names()) # Apply feature selection to names if applicable if self.feature_selector is not None: selected_indices = self.feature_selector.get_support() # Add bounds checking to prevent IndexError if len(selected_indices) == len(self.feature_names_): self.feature_names_ = [name for i, name in enumerate(self.feature_names_) if selected_indices[i]] else: logger.warning(f"Mismatch: {len(selected_indices)} selected_indices vs {len(self.feature_names_)} feature_names") # Use the shorter length to avoid index errors min_length = min(len(selected_indices), len(self.feature_names_)) self.feature_names_ = [name for i, name in enumerate(self.feature_names_[:min_length]) if i < len(selected_indices) and selected_indices[i]] def _calculate_feature_importance(self): """Calculate feature importance scores""" if self.feature_selector is not None: scores = self.feature_selector.scores_ selected_indices = self.feature_selector.get_support() # Get scores for selected features selected_scores = scores[selected_indices] # Create importance dictionary self.feature_importance_ = { name: float(score) for name, score in zip(self.feature_names_, selected_scores) } # Sort by importance self.feature_importance_ = dict( sorted(self.feature_importance_.items(), key=lambda x: x[1], reverse=True) ) def get_feature_names(self): """Get names of output features""" if not self.is_fitted_: raise ValueError("Pipeline must be fitted first") return self.feature_names_ def get_feature_importance(self, top_k=None): """Get feature importance scores""" if not self.feature_importance_: return {} if top_k is not None: return dict(list(self.feature_importance_.items())[:top_k]) return self.feature_importance_ def get_feature_metadata(self): """Get comprehensive feature metadata""" if not self.is_fitted_: raise ValueError("Pipeline must be fitted first") metadata = { 'total_features': len(self.feature_names_), 'feature_types': { 'tfidf_features': sum(1 for name in self.feature_names_ if name.startswith('tfidf_')), 'sentiment_features': sum(1 for name in self.feature_names_ if name.startswith('sentiment_')), 'readability_features': sum(1 for name in self.feature_names_ if name.startswith('readability_')), 'entity_features': sum(1 for name in self.feature_names_ if name.startswith('entity_')), 'linguistic_features': sum(1 for name in self.feature_names_ if name.startswith('linguistic_')) }, 'configuration': { 'enable_sentiment': self.enable_sentiment, 'enable_readability': self.enable_readability, 'enable_entities': self.enable_entities, 'enable_linguistic': self.enable_linguistic, 'feature_selection_k': self.feature_selection_k, 'tfidf_max_features': self.tfidf_max_features, 'ngram_range': self.ngram_range }, 'feature_importance_available': bool(self.feature_importance_), 'timestamp': datetime.now().isoformat() } return metadata def save_pipeline(self, filepath): """Save the fitted pipeline""" if not self.is_fitted_: raise ValueError("Pipeline must be fitted before saving") save_data = { 'feature_engineer': self, 'metadata': self.get_feature_metadata(), 'feature_names': self.feature_names_, 'feature_importance': self.feature_importance_ } joblib.dump(save_data, filepath) logger.info(f"Feature engineering pipeline saved to {filepath}") @classmethod def load_pipeline(cls, filepath): """Load a fitted pipeline""" save_data = joblib.load(filepath) feature_engineer = save_data['feature_engineer'] logger.info(f"Feature engineering pipeline loaded from {filepath}") return feature_engineer def create_enhanced_pipeline(X_train, y_train, enable_sentiment=True, enable_readability=True, enable_entities=True, enable_linguistic=True, feature_selection_k=5000): """ Create and fit an enhanced feature engineering pipeline. Args: X_train: Training text data y_train: Training labels enable_sentiment: Enable sentiment analysis features enable_readability: Enable readability features enable_entities: Enable entity features enable_linguistic: Enable linguistic features feature_selection_k: Number of features to select Returns: Fitted AdvancedFeatureEngineer instance """ logger.info("Creating enhanced feature engineering pipeline...") # Create feature engineer feature_engineer = AdvancedFeatureEngineer( enable_sentiment=enable_sentiment, enable_readability=enable_readability, enable_entities=enable_entities, enable_linguistic=enable_linguistic, feature_selection_k=feature_selection_k ) # Fit the pipeline feature_engineer.fit(X_train, y_train) # Log feature information metadata = feature_engineer.get_feature_metadata() logger.info(f"Enhanced pipeline created with {metadata['total_features']} features") logger.info(f"Feature breakdown: {metadata['feature_types']}") return feature_engineer def analyze_feature_importance(feature_engineer, top_k=20): """ Analyze and display feature importance. Args: feature_engineer: Fitted AdvancedFeatureEngineer instance top_k: Number of top features to analyze Returns: Dictionary with feature analysis results """ if not feature_engineer.is_fitted_: raise ValueError("Feature engineer must be fitted first") # Get feature importance importance = feature_engineer.get_feature_importance(top_k=top_k) metadata = feature_engineer.get_feature_metadata() # Analyze feature types in top features top_features = list(importance.keys()) feature_type_counts = {} for feature in top_features: if feature.startswith('tfidf_'): feature_type = 'tfidf' elif feature.startswith('sentiment_'): feature_type = 'sentiment' elif feature.startswith('readability_'): feature_type = 'readability' elif feature.startswith('entity_'): feature_type = 'entity' elif feature.startswith('linguistic_'): feature_type = 'linguistic' else: feature_type = 'other' feature_type_counts[feature_type] = feature_type_counts.get(feature_type, 0) + 1 analysis = { 'top_features': importance, 'feature_type_distribution': feature_type_counts, 'total_features': metadata['total_features'], 'feature_breakdown': metadata['feature_types'], 'analysis_timestamp': datetime.now().isoformat() } return analysis