|
import pytest |
|
import torch |
|
from triton_kernels.routing import routing, routing_torch |
|
from triton_kernels.testing import assert_close |
|
from triton_kernels.testing import assert_equal |
|
|
|
|
|
def init_data(n_tokens, n_expts_tot, dtype=torch.float16, device="cuda"): |
|
logits = torch.randn((n_tokens, n_expts_tot), dtype=dtype, device=device, requires_grad=True) |
|
return logits |
|
|
|
|
|
n_tokens = [(x, None) for x in [371, 255, 256, 4096, 1023, 1024]] |
|
n_tokens += [(1152, 911)] |
|
|
|
|
|
@pytest.mark.parametrize("n_tokens_pad, n_tokens_raw", n_tokens) |
|
@pytest.mark.parametrize("n_expts_tot, n_expts_act", [(128, 32), (1500, 8)]) |
|
@pytest.mark.parametrize("use_expt_indx", [False, True]) |
|
@pytest.mark.parametrize("sm_first", [True, False]) |
|
def test_op(n_tokens_pad, n_tokens_raw, n_expts_tot, n_expts_act, sm_first, use_expt_indx, device): |
|
torch.manual_seed(2) |
|
if n_tokens_raw is None: |
|
n_tokens_raw = n_tokens_pad |
|
n_routing_rows = None |
|
else: |
|
n_routing_rows = torch.tensor([n_tokens_raw], dtype=torch.int32, device=device) |
|
n_gates_raw = n_tokens_raw * n_expts_act |
|
tri_logits = init_data(n_tokens_pad, n_expts_tot, device=device, dtype=torch.float32).detach() |
|
tri_logits[n_tokens_raw:, :] = float("inf") |
|
tri_logits = tri_logits.requires_grad_(True) |
|
ref_logits = tri_logits.clone().detach().requires_grad_(True) |
|
|
|
if use_expt_indx: |
|
rand_idx = lambda: torch.randperm(n_expts_tot, device="cuda", dtype=torch.int64) |
|
tri_expt_indx = torch.stack([rand_idx()[:n_expts_act] for _ in range(n_tokens_pad)]) |
|
tri_expt_indx, _ = torch.sort(tri_expt_indx, dim=1) |
|
tri_expt_indx[n_tokens_raw:] = -99999 |
|
ref_expt_indx = tri_expt_indx[:n_tokens_raw] |
|
else: |
|
tri_expt_indx = ref_expt_indx = None |
|
ref_routing_data, ref_gather, ref_scatter = routing_torch(ref_logits, n_expts_act, sm_first, ref_expt_indx, |
|
n_rows=n_routing_rows) |
|
tri_routing_data, tri_gather, tri_scatter = routing(tri_logits, n_expts_act, sm_first, tri_expt_indx, |
|
n_rows=n_routing_rows) |
|
|
|
def _assert_indx_equal(ref, tri): |
|
assert_equal(ref, tri[:len(ref)]) |
|
assert torch.all(tri[len(ref):] == -1) |
|
|
|
assert_close(ref_routing_data.gate_scal, tri_routing_data.gate_scal[:n_gates_raw], 2e-2, 4e-3) |
|
assert_equal(ref_routing_data.expt_hist, tri_routing_data.expt_hist) |
|
|
|
ref_expt_data = ref_routing_data.expt_data |
|
tri_expt_data = tri_routing_data.expt_data |
|
assert_equal(ref_expt_data.hist, tri_expt_data.hist) |
|
assert_equal(ref_expt_data.token_offs_raw, tri_expt_data.token_offs_raw) |
|
assert len(ref_expt_data.token_offs_pad) == len(tri_expt_data.token_offs_pad) |
|
assert len(ref_expt_data.block_pid_map) == len(tri_expt_data.block_pid_map) |
|
for block_m in ref_expt_data.token_offs_pad.keys(): |
|
assert_equal(ref_expt_data.token_offs_pad[block_m], tri_expt_data.token_offs_pad[block_m]) |
|
assert_equal(ref_expt_data.block_pid_map[block_m], tri_expt_data.block_pid_map[block_m]) |
|
|
|
assert ref_routing_data.n_expts_tot == ref_routing_data.n_expts_tot |
|
assert ref_routing_data.n_expts_act == ref_routing_data.n_expts_act |
|
|
|
_assert_indx_equal(ref_gather.src_indx, tri_gather.src_indx) |
|
_assert_indx_equal(ref_gather.dst_indx, tri_gather.dst_indx) |
|
_assert_indx_equal(ref_scatter.src_indx, tri_scatter.src_indx) |
|
_assert_indx_equal(ref_scatter.dst_indx, tri_scatter.dst_indx) |
|
|
|
scales_grad = torch.randn_like(tri_routing_data.gate_scal) |
|
ref_routing_data.gate_scal.backward(scales_grad[:n_gates_raw]) |
|
tri_routing_data.gate_scal.backward(scales_grad) |
|
|
|
assert_close(ref_logits.grad[:n_tokens_raw], tri_logits.grad[:n_tokens_raw]) |
|
|
|
|
|
def bench_routing(): |
|
import triton.profiler as proton |
|
n_tokens = 8192 |
|
n_expts_tot, n_expts_act = 128, 4 |
|
tri_logits = init_data(n_tokens, n_expts_tot) |
|
proton.start("routing") |
|
proton.activate() |
|
for i in range(100): |
|
tri_routing_data, tri_gather, tri_scatter = routing(tri_logits, n_expts_act) |
|
proton.finalize() |
|
try: |
|
import os |
|
os.system("proton-viewer -m time/ms routing.hatchet") |
|
except Exception: |
|
pass |
|
|
|
|
|
if __name__ == "__main__": |
|
bench_routing() |
|
|