File size: 5,442 Bytes
de071e9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
"""
Implementation of Chi-Squared Hypothesis (CSH) test for comparing neural network models.

This module provides functions to test whether two models have similar activation patterns
across different layers using Chi-Squared statistical tests.
"""

import torch
from collections import defaultdict
import scipy
import numpy as np
from scipy.stats import chi2

from scipy.optimize import linear_sum_assignment as LAP

from tracing.utils.utils import cossim
from tracing.utils.evaluate import evaluate


def statistic(base_model, ft_model, dataloader):
    """
    Compute Chi-Squared Hypothesis test statistic between two models.

    Args:
        base_model: Base model to compare
        ft_model: Fine-tuned or target model to compare against the base model
        dataloader: DataLoader providing input data for activation collection

    Returns:
        tuple: (p_value, p_values_per_layer) from the CSH test
    """
    return csh_sp_dataloader(base_model, ft_model, dataloader)


def hook(m, inp, op, feats, name):
    """
    Forward hook to capture output activations from model layers.

    Args:
        m: Module being hooked
        inp: Input to the module
        op: Output from the module
        feats: Dictionary to store activations
        name: Key to store the activations under
    """
    feats[name].append(op.detach().cpu())


def hook_in(m, inp, op, feats, name):
    """
    Forward hook to capture input activations to model layers.

    Args:
        m: Module being hooked
        inp: Input to the module (tuple)
        op: Output from the module
        feats: Dictionary to store activations
        name: Key to store the activations under
    """
    feats[name].append(inp[0].detach().cpu())


def csh_sp_dataloader_block(base_model, ft_model, dataloader, i):
    """
    Apply CSH test to a specific block in the model.

    Args:
        base_model: Base model to compare
        ft_model: Fine-tuned or target model to compare against the base model
        dataloader: DataLoader providing input data for activation collection
        i: Block index to analyze

    Returns:
        float: p-value indicating the statistical similarity between models at block i
    """
    feats = defaultdict(list)

    base_hook = lambda *args: hook(*args, feats, "base")
    base_model.model.layers[i].mlp.down_proj.register_forward_hook(base_hook)

    ft_hook = lambda *args: hook(*args, feats, "ft")
    ft_model.model.layers[i].mlp.down_proj.register_forward_hook(ft_hook)

    evaluate(base_model, dataloader)
    evaluate(ft_model, dataloader)

    base_mat = torch.vstack(feats["base"])
    ft_mat = torch.vstack(feats["ft"])

    base_mat = base_mat.view(-1, base_mat.shape[-1]).T
    ft_mat = ft_mat.view(-1, ft_mat.shape[-1]).T

    matched = torch.argmax(cossim(base_mat, ft_mat), axis=-1)
    orig = torch.arange(len(matched))

    cor, pvalue = scipy.stats.spearmanr(matched.tolist(), orig.tolist())
    return pvalue


def csh_sp_dataloader(base_model, ft_model, dataloader, n_blocks=32):
    """
    Apply CSH test across all model blocks using activations from a dataloader.

    Performs Chi-Squared Hypothesis test by:
    1. Collecting activations from both models using the same input data
    2. Computing optimal matching between neurons in corresponding layers
    3. Calculating Spearman correlation between matched indices and original indices
    4. Computing combined p-value using Fisher's method

    Args:
        base_model: Base model to compare
        ft_model: Fine-tuned or target model to compare against the base model
        dataloader: DataLoader providing input data for activation collection
        n_blocks: Number of transformer blocks to analyze (default: 32)

    Returns:
        tuple: (combined_p_value, list_of_p_values_per_layer)
    """
    chi_squared = 0
    feats = defaultdict(list)

    base_hooks = {}
    ft_hooks = {}

    for i in range(n_blocks):
        layer = str(i)

        base_hooks[layer] = lambda m, inp, op, layer=layer, feats=feats: hook(
            m, inp, op, feats, "base_" + layer
        )
        base_model.model.layers[i].mlp.up_proj.register_forward_hook(base_hooks[layer])

        ft_hooks[layer] = lambda m, inp, op, layer=layer, feats=feats: hook(
            m, inp, op, feats, "ft_" + layer
        )
        ft_model.model.layers[i].mlp.up_proj.register_forward_hook(ft_hooks[layer])

    evaluate(base_model, dataloader)
    evaluate(ft_model, dataloader)

    p_values = []
    count = 0

    for i in range(n_blocks):
        base_mat = torch.vstack(feats["base_" + str(i)])
        ft_mat = torch.vstack(feats["ft_" + str(i)])

        base_mat = torch.reshape(
            base_mat, (base_mat.shape[0] * base_mat.shape[1], base_mat.shape[2])
        )
        ft_mat = torch.reshape(ft_mat, (ft_mat.shape[0] * ft_mat.shape[1], ft_mat.shape[2]))

        base_mat = base_mat.T
        ft_mat = ft_mat.T

        matched = LAP(
            cossim(base_mat.type(torch.float64), ft_mat.type(torch.float64)), maximize=True
        )
        matched = matched[1]
        orig = torch.arange(len(matched))

        cor, temp = scipy.stats.spearmanr(matched.tolist(), orig.tolist())

        if not np.isnan(temp):
            chi_squared -= 2 * np.log(temp)
            count += 1
            print(i, temp)
        p_values.append(temp)

    p_value = chi2.sf(chi_squared, df=2 * count)

    return p_value, p_values