|
import pytest |
|
import torch |
|
from triton_kernels.compaction import compaction, compaction_torch |
|
|
|
|
|
@pytest.mark.parametrize("n_tokens, n_cols, k, p", [ |
|
(8192, 64, 4, 0.5), |
|
(8192, 64, 4, 1.0), |
|
(131, 128, 16, 0.6), |
|
(496, 128, 16, 0.), |
|
]) |
|
def test_compaction(n_tokens, n_cols, k, p, device): |
|
yi = torch.rand((n_tokens, n_cols), device=device).argsort(dim=-1) |
|
yi = yi[:, :k].to(torch.int32) |
|
yv = torch.randn((n_tokens, k), dtype=torch.bfloat16, device=device) |
|
|
|
mask = torch.zeros((n_tokens, n_cols), dtype=torch.int32, device=device) |
|
keep = (torch.rand(yi.shape, device=device) < p) |
|
if keep.any(): |
|
rows = torch.arange(yi.size(0), device=device).unsqueeze(1).expand_as(yi) |
|
mask[rows[keep], yi[keep]] = 1 |
|
chunks = mask.view(*mask.shape[:-1], -1, 32) |
|
weights = (1 << torch.arange(32, dtype=torch.int32, device=device)) |
|
bitmask = (chunks.int() * weights).sum(dim=-1) |
|
yv_ref, yi_ref = compaction_torch(yv, yi, bitmask) |
|
yv_tri, yi_tri = compaction(yv, yi, bitmask) |
|
assert torch.all(yi_ref == yi_tri) |
|
assert torch.all(yv_ref == yv_tri) |
|
|