Ahmed Ahmed
Add model-tracing code for p-value computation (without binary files)
de071e9
"""
Implementation of Chi-Squared Hypothesis (CSH) test for comparing neural network models.
This module provides functions to test whether two models have similar activation patterns
across different layers using Chi-Squared statistical tests.
"""
import torch
from collections import defaultdict
import scipy
import numpy as np
from scipy.stats import chi2
from scipy.optimize import linear_sum_assignment as LAP
from tracing.utils.utils import cossim
from tracing.utils.evaluate import evaluate
def statistic(base_model, ft_model, dataloader):
"""
Compute Chi-Squared Hypothesis test statistic between two models.
Args:
base_model: Base model to compare
ft_model: Fine-tuned or target model to compare against the base model
dataloader: DataLoader providing input data for activation collection
Returns:
tuple: (p_value, p_values_per_layer) from the CSH test
"""
return csh_sp_dataloader(base_model, ft_model, dataloader)
def hook(m, inp, op, feats, name):
"""
Forward hook to capture output activations from model layers.
Args:
m: Module being hooked
inp: Input to the module
op: Output from the module
feats: Dictionary to store activations
name: Key to store the activations under
"""
feats[name].append(op.detach().cpu())
def hook_in(m, inp, op, feats, name):
"""
Forward hook to capture input activations to model layers.
Args:
m: Module being hooked
inp: Input to the module (tuple)
op: Output from the module
feats: Dictionary to store activations
name: Key to store the activations under
"""
feats[name].append(inp[0].detach().cpu())
def csh_sp_dataloader_block(base_model, ft_model, dataloader, i):
"""
Apply CSH test to a specific block in the model.
Args:
base_model: Base model to compare
ft_model: Fine-tuned or target model to compare against the base model
dataloader: DataLoader providing input data for activation collection
i: Block index to analyze
Returns:
float: p-value indicating the statistical similarity between models at block i
"""
feats = defaultdict(list)
base_hook = lambda *args: hook(*args, feats, "base")
base_model.model.layers[i].mlp.down_proj.register_forward_hook(base_hook)
ft_hook = lambda *args: hook(*args, feats, "ft")
ft_model.model.layers[i].mlp.down_proj.register_forward_hook(ft_hook)
evaluate(base_model, dataloader)
evaluate(ft_model, dataloader)
base_mat = torch.vstack(feats["base"])
ft_mat = torch.vstack(feats["ft"])
base_mat = base_mat.view(-1, base_mat.shape[-1]).T
ft_mat = ft_mat.view(-1, ft_mat.shape[-1]).T
matched = torch.argmax(cossim(base_mat, ft_mat), axis=-1)
orig = torch.arange(len(matched))
cor, pvalue = scipy.stats.spearmanr(matched.tolist(), orig.tolist())
return pvalue
def csh_sp_dataloader(base_model, ft_model, dataloader, n_blocks=32):
"""
Apply CSH test across all model blocks using activations from a dataloader.
Performs Chi-Squared Hypothesis test by:
1. Collecting activations from both models using the same input data
2. Computing optimal matching between neurons in corresponding layers
3. Calculating Spearman correlation between matched indices and original indices
4. Computing combined p-value using Fisher's method
Args:
base_model: Base model to compare
ft_model: Fine-tuned or target model to compare against the base model
dataloader: DataLoader providing input data for activation collection
n_blocks: Number of transformer blocks to analyze (default: 32)
Returns:
tuple: (combined_p_value, list_of_p_values_per_layer)
"""
chi_squared = 0
feats = defaultdict(list)
base_hooks = {}
ft_hooks = {}
for i in range(n_blocks):
layer = str(i)
base_hooks[layer] = lambda m, inp, op, layer=layer, feats=feats: hook(
m, inp, op, feats, "base_" + layer
)
base_model.model.layers[i].mlp.up_proj.register_forward_hook(base_hooks[layer])
ft_hooks[layer] = lambda m, inp, op, layer=layer, feats=feats: hook(
m, inp, op, feats, "ft_" + layer
)
ft_model.model.layers[i].mlp.up_proj.register_forward_hook(ft_hooks[layer])
evaluate(base_model, dataloader)
evaluate(ft_model, dataloader)
p_values = []
count = 0
for i in range(n_blocks):
base_mat = torch.vstack(feats["base_" + str(i)])
ft_mat = torch.vstack(feats["ft_" + str(i)])
base_mat = torch.reshape(
base_mat, (base_mat.shape[0] * base_mat.shape[1], base_mat.shape[2])
)
ft_mat = torch.reshape(ft_mat, (ft_mat.shape[0] * ft_mat.shape[1], ft_mat.shape[2]))
base_mat = base_mat.T
ft_mat = ft_mat.T
matched = LAP(
cossim(base_mat.type(torch.float64), ft_mat.type(torch.float64)), maximize=True
)
matched = matched[1]
orig = torch.arange(len(matched))
cor, temp = scipy.stats.spearmanr(matched.tolist(), orig.tolist())
if not np.isnan(temp):
chi_squared -= 2 * np.log(temp)
count += 1
print(i, temp)
p_values.append(temp)
p_value = chi2.sf(chi_squared, df=2 * count)
return p_value, p_values