Ahmed Ahmed
Add model-tracing code for p-value computation (without binary files)
de071e9
"""
Implementation of activation matching algorithms for comparing neural network models.
This module provides functions for matching neurons between two models based on
their activation patterns, helping identify corresponding functional units despite
permutation differences.
"""
import torch
from collections import defaultdict
import scipy
import numpy as np
from tracing.utils.evaluate import evaluate
from tracing.utils.llama.matching import match_wmats
def hook_in(m, inp, op, feats, name):
"""
Forward hook to capture input activations to model layers.
Args:
m: Module being hooked
inp: Input to the module (tuple)
op: Output from the module
feats: Dictionary to store activations
name: Key to store the activations under
"""
feats[name].append(inp[0].detach().cpu())
def hook_out(m, inp, op, feats, name):
"""
Forward hook to capture output activations from model layers.
Args:
m: Module being hooked
inp: Input to the module
op: Output from the module
feats: Dictionary to store activations
name: Key to store the activations under
"""
feats[name].append(op.detach().cpu())
def statistic(base_model, ft_model, dataloader, n_blocks=32):
"""
Compute neuron matching statistics across all transformer blocks.
For each block, compares the gate and up projections to determine if
the permutation patterns are consistent, which would indicate functionally
corresponding neurons.
Args:
base_model: Base model to compare
ft_model: Fine-tuned or target model to compare against the base model
dataloader: DataLoader providing input data for activation collection
n_blocks: Number of transformer blocks to analyze (default: 32)
Returns:
list: Spearman correlation p-values for each block
"""
stats = []
for i in range(n_blocks):
gate_match = mlp_matching_gate(base_model, ft_model, dataloader, i=i)
up_match = mlp_matching_up(base_model, ft_model, dataloader, i=i)
cor, pvalue = scipy.stats.spearmanr(gate_match.tolist(), up_match.tolist())
print(i, pvalue, len(gate_match))
stats.append(pvalue)
return stats
def statistic_layer(base_model, ft_model, dataloader, i=0):
"""
Compute neuron matching statistics for a specific transformer block.
Args:
base_model: Base model to compare
ft_model: Fine-tuned or target model to compare against the base model
dataloader: DataLoader providing input data for activation collection
i: Block index to analyze (default: 0)
Returns:
float: Spearman correlation p-value for the specified block
"""
gate_perm = mlp_matching_gate(base_model, ft_model, dataloader, i=i)
up_perm = mlp_matching_up(base_model, ft_model, dataloader, i=i)
cor, pvalue = scipy.stats.spearmanr(gate_perm.tolist(), up_perm.tolist())
return pvalue
def mlp_matching_gate(base_model, ft_model, dataloader, i=0):
"""
Match neurons between models by comparing gate projection activations.
Collects activations from the gate projection layer for both models
and computes a permutation that would align corresponding neurons.
Args:
base_model: Base model to compare
ft_model: Fine-tuned or target model to compare against the base model
dataloader: DataLoader providing input data for activation collection
i: Block index to analyze (default: 0)
Returns:
torch.Tensor: Permutation indices that match neurons between models
"""
feats = defaultdict(list)
base_hook = lambda *args: hook_out(*args, feats, "base")
base_handle = base_model.model.layers[i].mlp.gate_proj.register_forward_hook(base_hook)
ft_hook = lambda *args: hook_out(*args, feats, "ft")
ft_handle = ft_model.model.layers[i].mlp.gate_proj.register_forward_hook(ft_hook)
evaluate(base_model, dataloader)
evaluate(ft_model, dataloader)
base_mat = torch.vstack(feats["base"])
ft_mat = torch.vstack(feats["ft"])
base_mat.to("cuda")
ft_mat.to("cuda")
base_mat = base_mat.view(-1, base_mat.shape[-1]).T
ft_mat = ft_mat.view(-1, ft_mat.shape[-1]).T
base_handle.remove()
ft_handle.remove()
perm = match_wmats(base_mat, ft_mat)
return perm
def mlp_matching_up(base_model, ft_model, dataloader, i=0):
"""
Match neurons between models by comparing up projection activations.
Collects activations from the up projection layer for both models
and computes a permutation that would align corresponding neurons.
Args:
base_model: Base model to compare
ft_model: Fine-tuned or target model to compare against the base model
dataloader: DataLoader providing input data for activation collection
i: Block index to analyze (default: 0)
Returns:
torch.Tensor: Permutation indices that match neurons between models
"""
feats = defaultdict(list)
base_hook = lambda *args: hook_out(*args, feats, "base")
base_handle = base_model.model.layers[i].mlp.up_proj.register_forward_hook(base_hook)
ft_hook = lambda *args: hook_out(*args, feats, "ft")
ft_handle = ft_model.model.layers[i].mlp.up_proj.register_forward_hook(ft_hook)
evaluate(base_model, dataloader)
evaluate(ft_model, dataloader)
base_mat = torch.vstack(feats["base"])
ft_mat = torch.vstack(feats["ft"])
base_mat.to("cuda")
ft_mat.to("cuda")
base_mat = base_mat.view(-1, base_mat.shape[-1]).T
ft_mat = ft_mat.view(-1, ft_mat.shape[-1]).T
base_handle.remove()
ft_handle.remove()
perm = match_wmats(base_mat, ft_mat)
return perm
def mlp_layers(base_model, ft_model, dataloader, i, j):
"""
Compare gate and up projections between specific layers of two models.
Useful for comparing non-corresponding layers to find functional similarities.
Args:
base_model: Base model to compare
ft_model: Fine-tuned or target model to compare against the base model
dataloader: DataLoader providing input data for activation collection
i: Layer index in the base model
j: Layer index in the fine-tuned model
Returns:
float: Spearman correlation p-value between gate and up projections
"""
gate_match = mlp_matching_gate(base_model, ft_model, dataloader, i, j)
up_match = mlp_matching_up(base_model, ft_model, dataloader, i, j)
cor, pvalue = scipy.stats.spearmanr(gate_match.tolist(), up_match.tolist())
return pvalue
def statistic_all(model_1, model_2, dataloader):
"""
Perform comprehensive layer matching between two models.
Tests all combinations of layers between the models to identify corresponding
functional units, regardless of their position in the network architecture.
Args:
model_1: First model to compare
model_2: Second model to compare
dataloader: DataLoader providing input data for activation collection
Returns:
None: Prints matching results during execution
"""
model_1_matched = np.zeros(model_1.config.num_hidden_layers)
model_2_matched = np.zeros(model_2.config.num_hidden_layers)
for i in range(model_1.config.num_hidden_layers):
for j in range(model_2.config.num_hidden_layers):
if model_1_matched[i] == 1 or model_2_matched[j] == 1:
continue
stat = mlp_layers(model_1, model_2, dataloader, i, j)
print(i, j, stat)
if stat < 0.000001:
model_1_matched[i] = 1
model_2_matched[j] = 1
break