In [None]:
# Fake News Detection using BERT-BiLSTM-Attention

This notebook is optimized for Google Colab free version with the following optimizations:
- Reduced model size
- Optimized memory usage
- Efficient data loading
- Gradient checkpointing
- Mixed precision training


In [None]:
## 1. Setup and Installation


In [None]:
# Install required packages
!pip install torch==2.0.1 transformers==4.30.2 nltk==3.8.1 pandas==2.0.3 numpy==1.24.3 scikit-learn==1.3.0 tqdm==4.65.0


In [None]:
# Import required libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from transformers import BertModel, BertTokenizer
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
import re
from tqdm import tqdm
import gc

# Download NLTK data
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')


In [None]:
## 2. Configuration and Constants


In [None]:
# Optimized for Colab free version
class Config:
 # Model parameters
 MAX_SEQUENCE_LENGTH = 128 # Reduced from 256
 VOCAB_SIZE = 10000 # Reduced from 15000
 EMBEDDING_DIM = 64 # Reduced from 128
 HIDDEN_DIM = 128 # Reduced from 256
 
 # Training parameters
 BATCH_SIZE = 4 # Reduced from 8
 NUM_EPOCHS = 2 # Reduced from 3
 LEARNING_RATE = 2e-5
 
 # Dataset parameters
 MAX_SAMPLES = 5000 # Reduced from 10000
 
 # Device configuration
 DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
 
 # Model paths
 MODEL_NAME = 'bert-base-uncased'
 
 # Enable mixed precision
 USE_AMP = True
 
 # Enable gradient checkpointing
 USE_GRADIENT_CHECKPOINTING = True

config = Config()
print(f"Using device: {config.DEVICE}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
 print(f"GPU: {torch.cuda.get_device_name(0)}")
 print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB")


In [None]:
## 3. Data Loading and Preprocessing


In [None]:
# Dataset Sources:
# 1. Kaggle Fake and Real News Dataset: https://www.kaggle.com/datasets/clmentbisaillon/fake-and-real-news-dataset
# 2. LIAR Dataset: https://sites.cs.ucsb.edu/~william/data/liar_dataset.zip

import zipfile
import urllib.request
import os

def download_datasets():
 """Download and prepare the datasets"""
 
 # Download LIAR dataset
 print("Downloading LIAR dataset...")
 liar_url = "https://sites.cs.ucsb.edu/~william/data/liar_dataset.zip"
 liar_zip = "liar_dataset.zip"
 
 try:
 urllib.request.urlretrieve(liar_url, liar_zip)
 
 # Extract the zip file
 with zipfile.ZipFile(liar_zip, 'r') as zip_ref:
 zip_ref.extractall("liar_dataset/")
 
 print("LIAR dataset downloaded and extracted successfully")
 os.remove(liar_zip) # Clean up zip file
 
 except Exception as e:
 print(f"Error downloading LIAR dataset: {e}")
 
 # For Kaggle dataset, we'll use a sample since direct download requires API key
 print("Setting up Kaggle dataset alternative...")
 try:
 # Try to download a sample of the Kaggle dataset
 kaggle_url = "https://raw.githubusercontent.com/several27/FakeNewsCorpus/master/news_sample.csv"
 urllib.request.urlretrieve(kaggle_url, "kaggle_news_sample.csv")
 print("Kaggle sample dataset downloaded successfully")
 except Exception as e:
 print(f"Could not download Kaggle sample: {e}")

def load_liar_dataset(max_samples=None):
 """Load and process LIAR dataset"""
 try:
 # Load train, validation, and test sets
 train_df = pd.read_csv("liar_dataset/train.tsv", sep='\t', header=None)
 val_df = pd.read_csv("liar_dataset/valid.tsv", sep='\t', header=None)
 test_df = pd.read_csv("liar_dataset/test.tsv", sep='\t', header=None)
 
 # Column names for LIAR dataset
 columns = ['id', 'label', 'statement', 'subjects', 'speaker', 'speaker_job', 
 'state_info', 'party_affiliation', 'barely_true_counts', 'false_counts',
 'half_true_counts', 'mostly_true_counts', 'pants_on_fire_counts', 'context']
 
 train_df.columns = columns
 val_df.columns = columns
 test_df.columns = columns
 
 # Combine all datasets
 df = pd.concat([train_df, val_df, test_df], ignore_index=True)
 
 # Convert labels to binary (fake/real)
 # Consider 'false', 'barely-true', 'pants-fire' as fake (1)
 # Consider 'true', 'mostly-true', 'half-true' as real (0)
 fake_labels = ['false', 'barely-true', 'pants-fire']
 df['binary_label'] = df['label'].apply(lambda x: 1 if x in fake_labels else 0)
 
 # Use statement as text
 df = df[['statement', 'binary_label']].rename(columns={'statement': 'text', 'binary_label': 'label'})
 
 print(f"LIAR dataset loaded: {len(df)} samples")
 return df
 
 except Exception as e:
 print(f"Error loading LIAR dataset: {e}")
 return None

def load_kaggle_dataset(max_samples=None):
 """Load and process Kaggle dataset"""
 try:
 df = pd.read_csv("kaggle_news_sample.csv")
 
 # Map labels to binary if needed
 if 'label' in df.columns:
 # Handle different label formats
 if df['label'].dtype == 'object':
 df['label'] = df['label'].map({'FAKE': 1, 'REAL': 0, 'fake': 1, 'real': 0})
 
 # Use appropriate text column
 text_columns = ['text', 'title', 'content', 'article']
 text_col = None
 for col in text_columns:
 if col in df.columns:
 text_col = col
 break
 
 if text_col:
 df = df[[text_col, 'label']].rename(columns={text_col: 'text'})
 
 print(f"Kaggle dataset loaded: {len(df)} samples")
 return df
 
 except Exception as e:
 print(f"Error loading Kaggle dataset: {e}")
 return None

def load_combined_data(max_samples=config.MAX_SAMPLES):
 """Load and combine both datasets"""
 
 # Download datasets
 download_datasets()
 
 # Load datasets
 liar_df = load_liar_dataset()
 kaggle_df = load_kaggle_dataset()
 
 # Combine datasets
 dfs = []
 if liar_df is not None:
 dfs.append(liar_df)
 print(f"LIAR dataset: {len(liar_df)} samples")
 
 if kaggle_df is not None:
 dfs.append(kaggle_df)
 print(f"Kaggle dataset: {len(kaggle_df)} samples")
 
 if dfs:
 df = pd.concat(dfs, ignore_index=True)
 print(f"Combined dataset: {len(df)} samples")
 else:
 # Fallback to dummy data
 print("Creating dummy dataset for testing...")
 texts = [
 "President announces new economic policy to boost growth",
 "Scientists confirm breakthrough in renewable energy technology", 
 "False: Celebrities endorse dangerous health treatment",
 "Misleading: Government hiding alien contact information",
 "Local community rallies to support flood victims",
 "Breaking: Major scientific discovery changes understanding of physics"
 ] * (max_samples // 6)
 
 labels = [0, 0, 1, 1, 0, 0] * (max_samples // 6)
 
 df = pd.DataFrame({
 'text': texts[:max_samples],
 'label': labels[:max_samples]
 })
 print(f"Created dummy dataset with {len(df)} samples")
 
 # Remove missing values
 df = df.dropna()
 
 # Sample data for faster training if needed
 if max_samples and len(df) > max_samples:
 df = df.sample(n=max_samples, random_state=42)
 print(f"Sampled to {len(df)} samples for faster training")
 
 return df

# Text preprocessing
def preprocess_text(text):
 if pd.isna(text):
 return ""
 text = str(text)
 # Convert to lowercase
 text = text.lower()
 # Remove special characters but keep basic punctuation
 text = re.sub(r'[^\w\s.,!?]', '', text)
 # Remove extra whitespace
 text = ' '.join(text.split())
 # Limit length to prevent very long texts
 text = text[:1000] # Limit to 1000 characters
 return text

# Load the datasets
print("Loading datasets...")
df = load_combined_data()
print(f"Final dataset shape: {df.shape}")
print(f"Columns: {df.columns.tolist()}")

if len(df) > 0:
 print(f"Sample text: {df.iloc[0]['text'][:100]}...")
 print(f"Label distribution:")
 print(df['label'].value_counts())
 print(f"Label distribution percentage:")
 print(df['label'].value_counts(normalize=True) * 100)


In [None]:
### Optional: Download Kaggle Dataset Directly (If you have Kaggle API)

If you have Kaggle API credentials, you can download the full dataset by running the following cells. Otherwise, the notebook will use alternative sources.


In [None]:
# Optional: Kaggle API setup (uncomment and run if you have Kaggle credentials)
# !pip install kaggle
# !mkdir -p ~/.kaggle
# # Upload your kaggle.json file to Colab files, then run:
# # !cp kaggle.json ~/.kaggle/
# # !chmod 600 ~/.kaggle/kaggle.json

# Download the full Kaggle dataset (uncomment if you have API access)
# !kaggle datasets download -d clmentbisaillon/fake-and-real-news-dataset
# !unzip fake-and-real-news-dataset.zip

def load_full_kaggle_dataset():
 """Load the full Kaggle dataset if available"""
 try:
 # Try to load the full dataset files
 fake_df = pd.read_csv("Fake.csv")
 real_df = pd.read_csv("True.csv")
 
 # Add labels
 fake_df['label'] = 1
 real_df['label'] = 0
 
 # Combine datasets
 df = pd.concat([fake_df, real_df], ignore_index=True)
 
 # Use title + text as the full text
 if 'title' in df.columns and 'text' in df.columns:
 df['full_text'] = df['title'] + ". " + df['text']
 df = df[['full_text', 'label']].rename(columns={'full_text': 'text'})
 elif 'text' in df.columns:
 df = df[['text', 'label']]
 
 print(f"Full Kaggle dataset loaded: {len(df)} samples")
 return df
 
 except Exception as e:
 print(f"Full Kaggle dataset not available: {e}")
 return None

# Try to load full Kaggle dataset
full_kaggle_df = load_full_kaggle_dataset()
if full_kaggle_df is not None:
 print("Using full Kaggle dataset")
 # Update the df variable to use full dataset
 df = load_combined_data() # This will still use the combined approach if full isn't available


In [None]:
# Create dataset class
class FakeNewsDataset(Dataset):
 def __init__(self, texts, labels, tokenizer, max_length):
 self.texts = texts
 self.labels = labels
 self.tokenizer = tokenizer
 self.max_length = max_length
 
 def __len__(self):
 return len(self.texts)
 
 def __getitem__(self, idx):
 text = str(self.texts[idx])
 label = self.labels[idx]
 
 # Preprocess text
 text = preprocess_text(text)
 
 encoding = self.tokenizer.encode_plus(
 text,
 add_special_tokens=True,
 max_length=self.max_length,
 padding='max_length',
 truncation=True,
 return_attention_mask=True,
 return_tensors='pt'
 )
 
 return {
 'input_ids': encoding['input_ids'].flatten(),
 'attention_mask': encoding['attention_mask'].flatten(),
 'label': torch.tensor(label, dtype=torch.long)
 }

print("Dataset class created successfully")


In [None]:
## 4. Model Architecture


In [None]:
class FakeNewsModel(nn.Module):
 def __init__(self, config):
 super(FakeNewsModel, self).__init__()
 
 # BERT layer
 self.bert = BertModel.from_pretrained(config.MODEL_NAME)
 if config.USE_GRADIENT_CHECKPOINTING:
 self.bert.gradient_checkpointing_enable()
 
 # BiLSTM layer
 self.lstm = nn.LSTM(
 input_size=768, # BERT output size
 hidden_size=config.HIDDEN_DIM,
 num_layers=1,
 batch_first=True,
 bidirectional=True,
 dropout=0.1
 )
 
 # Attention layer
 self.attention = nn.Sequential(
 nn.Linear(config.HIDDEN_DIM * 2, config.HIDDEN_DIM),
 nn.Tanh(),
 nn.Linear(config.HIDDEN_DIM, 1)
 )
 
 # Classification head
 self.classifier = nn.Sequential(
 nn.Dropout(0.3),
 nn.Linear(config.HIDDEN_DIM * 2, 64),
 nn.ReLU(),
 nn.Dropout(0.2),
 nn.Linear(64, 2)
 )
 
 def forward(self, input_ids, attention_mask):
 # BERT
 bert_output = self.bert(input_ids=input_ids, attention_mask=attention_mask)[0]
 
 # BiLSTM
 lstm_output, _ = self.lstm(bert_output)
 
 # Attention mechanism
 attention_scores = self.attention(lstm_output)
 attention_weights = torch.softmax(attention_scores, dim=1)
 attended_output = torch.sum(attention_weights * lstm_output, dim=1)
 
 # Classification
 logits = self.classifier(attended_output)
 
 return logits

print("Model architecture defined successfully")


In [None]:
## 5. Training Functions


In [None]:
def train_epoch(model, train_loader, optimizer, criterion, scaler, config):
 model.train()
 total_loss = 0
 
 progress_bar = tqdm(train_loader, desc='Training')
 for batch in progress_bar:
 input_ids = batch['input_ids'].to(config.DEVICE)
 attention_mask = batch['attention_mask'].to(config.DEVICE)
 labels = batch['label'].to(config.DEVICE)
 
 optimizer.zero_grad()
 
 if config.USE_AMP and torch.cuda.is_available():
 with torch.cuda.amp.autocast():
 outputs = model(input_ids, attention_mask)
 loss = criterion(outputs, labels)
 
 scaler.scale(loss).backward()
 scaler.step(optimizer)
 scaler.update()
 else:
 outputs = model(input_ids, attention_mask)
 loss = criterion(outputs, labels)
 loss.backward()
 optimizer.step()
 
 total_loss += loss.item()
 progress_bar.set_postfix({'loss': loss.item()})
 
 # Clear memory
 del input_ids, attention_mask, labels, outputs, loss
 if torch.cuda.is_available():
 torch.cuda.empty_cache()
 
 return total_loss / len(train_loader)

def evaluate(model, val_loader, criterion, config):
 model.eval()
 total_loss = 0
 all_preds = []
 all_labels = []
 
 with torch.no_grad():
 progress_bar = tqdm(val_loader, desc='Evaluating')
 for batch in progress_bar:
 input_ids = batch['input_ids'].to(config.DEVICE)
 attention_mask = batch['attention_mask'].to(config.DEVICE)
 labels = batch['label'].to(config.DEVICE)
 
 outputs = model(input_ids, attention_mask)
 loss = criterion(outputs, labels)
 
 total_loss += loss.item()
 
 preds = torch.argmax(outputs, dim=1)
 all_preds.extend(preds.cpu().numpy())
 all_labels.extend(labels.cpu().numpy())
 
 # Clear memory
 del input_ids, attention_mask, labels, outputs, loss, preds
 if torch.cuda.is_available():
 torch.cuda.empty_cache()
 
 metrics = {
 'loss': total_loss / len(val_loader),
 'accuracy': accuracy_score(all_labels, all_preds),
 'precision': precision_score(all_labels, all_preds, average='weighted'),
 'recall': recall_score(all_labels, all_preds, average='weighted'),
 'f1': f1_score(all_labels, all_preds, average='weighted')
 }
 
 return metrics

print("Training functions defined successfully")


In [None]:
## 6. Main Training Process


In [None]:
# Setup training
def setup_training(df, config):
 # Ensure we have valid data
 if df is None or len(df) == 0:
 raise ValueError("No valid dataset available")
 
 print(f"Dataset info:")
 print(f"- Total samples: {len(df)}")
 print(f"- Label distribution: {df['label'].value_counts().to_dict()}")
 
 # Preprocess data
 print("Preprocessing text data...")
 texts = df['text'].apply(preprocess_text).values
 labels = df['label'].values
 
 # Remove empty texts
 valid_indices = [i for i, text in enumerate(texts) if len(text.strip()) > 0]
 texts = texts[valid_indices]
 labels = labels[valid_indices]
 
 print(f"After preprocessing: {len(texts)} valid samples")
 
 # Split data
 train_texts, val_texts, train_labels, val_labels = train_test_split(
 texts, labels, test_size=0.2, random_state=42, stratify=labels
 )
 
 print(f"Data split:")
 print(f"- Train samples: {len(train_texts)}")
 print(f"- Validation samples: {len(val_texts)}")
 print(f"- Train label distribution: {pd.Series(train_labels).value_counts().to_dict()}")
 print(f"- Val label distribution: {pd.Series(val_labels).value_counts().to_dict()}")
 
 # Initialize tokenizer
 print("Initializing BERT tokenizer...")
 tokenizer = BertTokenizer.from_pretrained(config.MODEL_NAME)
 
 # Create datasets
 print("Creating datasets...")
 train_dataset = FakeNewsDataset(train_texts, train_labels, tokenizer, config.MAX_SEQUENCE_LENGTH)
 val_dataset = FakeNewsDataset(val_texts, val_labels, tokenizer, config.MAX_SEQUENCE_LENGTH)
 
 # Create dataloaders
 train_loader = DataLoader(train_dataset, batch_size=config.BATCH_SIZE, shuffle=True)
 val_loader = DataLoader(val_dataset, batch_size=config.BATCH_SIZE)
 
 print(f"DataLoaders created:")
 print(f"- Train batches: {len(train_loader)}")
 print(f"- Val batches: {len(val_loader)}")
 
 # Initialize model
 print("Initializing model...")
 model = FakeNewsModel(config).to(config.DEVICE)
 
 # Count parameters
 total_params = sum(p.numel() for p in model.parameters())
 trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
 print(f"Model parameters:")
 print(f"- Total parameters: {total_params:,}")
 print(f"- Trainable parameters: {trainable_params:,}")
 print(f"- Model size (MB): {total_params * 4 / 1024 / 1024:.2f}")
 
 # Initialize optimizer
 optimizer = optim.AdamW(model.parameters(), lr=config.LEARNING_RATE, weight_decay=0.01)
 
 # Initialize loss function
 criterion = nn.CrossEntropyLoss()
 
 # Initialize scaler for mixed precision
 scaler = torch.cuda.amp.GradScaler() if config.USE_AMP and torch.cuda.is_available() else None
 
 return model, train_loader, val_loader, optimizer, criterion, scaler, tokenizer

print("Training setup function defined successfully")


In [None]:
# Run the complete training pipeline
def main():
 print("Starting fake news detection training...")
 
 # Setup training
 model, train_loader, val_loader, optimizer, criterion, scaler, tokenizer = setup_training(df, config)
 
 # Training loop
 best_val_loss = float('inf')
 best_val_acc = 0.0
 
 print(f"Starting training for {config.NUM_EPOCHS} epochs...")
 
 for epoch in range(config.NUM_EPOCHS):
 print(f'=== Epoch {epoch + 1}/{config.NUM_EPOCHS} ===')
 
 # Train
 train_loss = train_epoch(model, train_loader, optimizer, criterion, scaler, config)
 print(f'Train Loss: {train_loss:.4f}')
 
 # Evaluate
 val_metrics = evaluate(model, val_loader, criterion, config)
 print(f'Val Loss: {val_metrics["loss"]:.4f}')
 print(f'Val Accuracy: {val_metrics["accuracy"]:.4f}')
 print(f'Val Precision: {val_metrics["precision"]:.4f}')
 print(f'Val Recall: {val_metrics["recall"]:.4f}')
 print(f'Val F1: {val_metrics["f1"]:.4f}')
 
 # Save best model
 if val_metrics['accuracy'] > best_val_acc:
 best_val_acc = val_metrics['accuracy']
 best_val_loss = val_metrics['loss']
 torch.save(model.state_dict(), 'best_model_colab.pt')
 print(f'New best model saved! Accuracy: {best_val_acc:.4f}')
 
 # Clear memory
 gc.collect()
 if torch.cuda.is_available():
 torch.cuda.empty_cache()
 
 print('Training completed!')
 print(f'Best validation accuracy: {best_val_acc:.4f}')
 print(f'Best validation loss: {best_val_loss:.4f}')
 
 return model, tokenizer

# Run training
model, tokenizer = main()


In [None]:
## 7. Model Testing and Prediction


In [None]:
def predict_single(text, model, tokenizer, config):
 """Predict if a single text is fake or real news"""
 model.eval()
 text = preprocess_text(text)
 
 encoding = tokenizer.encode_plus(
 text,
 add_special_tokens=True,
 max_length=config.MAX_SEQUENCE_LENGTH,
 padding='max_length',
 truncation=True,
 return_attention_mask=True,
 return_tensors='pt'
 )
 
 input_ids = encoding['input_ids'].to(config.DEVICE)
 attention_mask = encoding['attention_mask'].to(config.DEVICE)
 
 with torch.no_grad():
 outputs = model(input_ids, attention_mask)
 probabilities = torch.softmax(outputs, dim=1)
 prediction = torch.argmax(outputs, dim=1)
 confidence = torch.max(probabilities, dim=1)[0]
 
 return {
 'prediction': prediction.item(),
 'label': 'FAKE' if prediction.item() == 1 else 'REAL',
 'confidence': confidence.item(),
 'probabilities': {
 'REAL': probabilities[0][0].item(),
 'FAKE': probabilities[0][1].item()
 }
 }

# Test with sample texts
test_texts = [
 "Breaking: Scientists discover new planet in our solar system",
 "Local community comes together to help flood victims",
 "Shocking: Aliens spotted in downtown area last night",
 "Government announces new healthcare policy to benefit citizens"
]

print("Testing model predictions:")
print("=" * 50)

for i, text in enumerate(test_texts, 1):
 result = predict_single(text, model, tokenizer, config)
 print(f"Text {i}: {text[:60]}...")
 print(f"Prediction: {result['label']} (Confidence: {result['confidence']:.3f})")
 print(f"Probabilities: REAL={result['probabilities']['REAL']:.3f}, FAKE={result['probabilities']['FAKE']:.3f}")
 print("-" * 50)


In [None]:
# Run the complete training pipeline
def main():
 print("Starting fake news detection training...")
 
 # Setup training
 model, train_loader, val_loader, optimizer, criterion, scaler, tokenizer = setup_training(df, config)
 
 # Training loop
 best_val_loss = float('inf')
 best_val_acc = 0.0
 
 print(f"\nStarting training for {config.NUM_EPOCHS} epochs...")
 
 for epoch in range(config.NUM_EPOCHS):
 print(f'\n=== Epoch {epoch + 1}/{config.NUM_EPOCHS} ===')
 
 # Train
 train_loss = train_epoch(model, train_loader, optimizer, criterion, scaler, config)
 print(f'Train Loss: {train_loss:.4f}')
 
 # Evaluate
 val_metrics = evaluate(model, val_loader, criterion, config)
 print(f'Val Loss: {val_metrics[\"loss\"]:.4f}')
 print(f'Val Accuracy: {val_metrics[\"accuracy\"]:.4f}')
 print(f'Val Precision: {val_metrics[\"precision\"]:.4f}')
 print(f'Val Recall: {val_metrics[\"recall\"]:.4f}')
 print(f'Val F1: {val_metrics[\"f1\"]:.4f}')
 
 # Save best model
 if val_metrics['accuracy'] > best_val_acc:
 best_val_acc = val_metrics['accuracy']
 best_val_loss = val_metrics['loss']
 torch.save(model.state_dict(), 'best_model_colab.pt')\n print(f'New best model saved! Accuracy: {best_val_acc:.4f}')\n \n # Clear memory\n gc.collect()\n if torch.cuda.is_available():\n torch.cuda.empty_cache()\n \n print(f'\\nTraining completed!')\n print(f'Best validation accuracy: {best_val_acc:.4f}')\n print(f'Best validation loss: {best_val_loss:.4f}')\n \n return model, tokenizer\n\n# Run training\nmodel, tokenizer = main()
