|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import torch |
|
import torch.nn.functional as F |
|
from torch import nn |
|
|
|
|
|
def _get_gtnorm(gt): |
|
if gt.size(1) == 1: |
|
return gt |
|
|
|
return torch.sqrt(torch.sum(gt**2, dim=1, keepdims=True)) |
|
|
|
|
|
|
|
|
|
|
|
class L1Loss(nn.Module): |
|
def __init__(self, max_gtnorm=None): |
|
super().__init__() |
|
self.max_gtnorm = max_gtnorm |
|
self.with_conf = False |
|
|
|
def _error(self, gt, predictions): |
|
return torch.abs(gt - predictions) |
|
|
|
def forward(self, predictions, gt, inspect=False): |
|
mask = torch.isfinite(gt) |
|
if self.max_gtnorm is not None: |
|
mask *= _get_gtnorm(gt).expand(-1, gt.size(1), -1, -1) < self.max_gtnorm |
|
if inspect: |
|
return self._error(gt, predictions) |
|
return self._error(gt[mask], predictions[mask]).mean() |
|
|
|
|
|
|
|
|
|
|
|
|
|
class LaplacianLoss(nn.Module): |
|
def __init__(self, max_gtnorm=None): |
|
super().__init__() |
|
self.max_gtnorm = max_gtnorm |
|
self.with_conf = True |
|
|
|
def forward(self, predictions, gt, conf): |
|
mask = torch.isfinite(gt) |
|
mask = mask[:, 0, :, :] |
|
if self.max_gtnorm is not None: |
|
mask *= _get_gtnorm(gt)[:, 0, :, :] < self.max_gtnorm |
|
conf = conf.squeeze(1) |
|
return ( |
|
torch.abs(gt - predictions).sum(dim=1)[mask] / torch.exp(conf[mask]) |
|
+ conf[mask] |
|
).mean() |
|
|
|
|
|
class LaplacianLossBounded( |
|
nn.Module |
|
): |
|
def __init__(self, max_gtnorm=10000.0, a=0.25, b=4.0): |
|
super().__init__() |
|
self.max_gtnorm = max_gtnorm |
|
self.with_conf = True |
|
self.a, self.b = a, b |
|
|
|
def forward(self, predictions, gt, conf): |
|
mask = torch.isfinite(gt) |
|
mask = mask[:, 0, :, :] |
|
if self.max_gtnorm is not None: |
|
mask *= _get_gtnorm(gt)[:, 0, :, :] < self.max_gtnorm |
|
conf = conf.squeeze(1) |
|
conf = (self.b - self.a) * torch.sigmoid(conf) + self.a |
|
return ( |
|
torch.abs(gt - predictions).sum(dim=1)[mask] / conf[mask] |
|
+ torch.log(conf)[mask] |
|
).mean() |
|
|
|
|
|
class LaplacianLossBounded2( |
|
nn.Module |
|
): |
|
def __init__(self, max_gtnorm=None, a=3.0, b=3.0): |
|
super().__init__() |
|
self.max_gtnorm = max_gtnorm |
|
self.with_conf = True |
|
self.a, self.b = a, b |
|
|
|
def forward(self, predictions, gt, conf): |
|
mask = torch.isfinite(gt) |
|
mask = mask[:, 0, :, :] |
|
if self.max_gtnorm is not None: |
|
mask *= _get_gtnorm(gt)[:, 0, :, :] < self.max_gtnorm |
|
conf = conf.squeeze(1) |
|
conf = 2 * self.a * (torch.sigmoid(conf / self.b) - 0.5) |
|
return ( |
|
torch.abs(gt - predictions).sum(dim=1)[mask] / torch.exp(conf[mask]) |
|
+ conf[mask] |
|
).mean() |
|
|
|
|
|
|
|
|
|
|
|
class StereoMetrics(nn.Module): |
|
def __init__(self, do_quantile=False): |
|
super().__init__() |
|
self.bad_ths = [0.5, 1, 2, 3] |
|
self.do_quantile = do_quantile |
|
|
|
def forward(self, predictions, gt): |
|
B = predictions.size(0) |
|
metrics = {} |
|
gtcopy = gt.clone() |
|
mask = torch.isfinite(gtcopy) |
|
gtcopy[ |
|
~mask |
|
] = 999999.0 |
|
Npx = mask.view(B, -1).sum(dim=1) |
|
L1error = (torch.abs(gtcopy - predictions) * mask).view(B, -1) |
|
L2error = (torch.square(gtcopy - predictions) * mask).view(B, -1) |
|
|
|
metrics["avgerr"] = torch.mean(L1error.sum(dim=1) / Npx) |
|
|
|
metrics["rmse"] = torch.sqrt(L2error.sum(dim=1) / Npx).mean(dim=0) |
|
|
|
for ths in self.bad_ths: |
|
metrics["bad@{:.1f}".format(ths)] = ( |
|
((L1error > ths) * mask.view(B, -1)).sum(dim=1) / Npx |
|
).mean(dim=0) * 100 |
|
return metrics |
|
|
|
|
|
class FlowMetrics(nn.Module): |
|
def __init__(self): |
|
super().__init__() |
|
self.bad_ths = [1, 3, 5] |
|
|
|
def forward(self, predictions, gt): |
|
B = predictions.size(0) |
|
metrics = {} |
|
mask = torch.isfinite(gt[:, 0, :, :]) |
|
Npx = mask.view(B, -1).sum(dim=1) |
|
gtcopy = ( |
|
gt.clone() |
|
) |
|
gtcopy[:, 0, :, :][~mask] = 999999.0 |
|
gtcopy[:, 1, :, :][~mask] = 999999.0 |
|
L1error = (torch.abs(gtcopy - predictions).sum(dim=1) * mask).view(B, -1) |
|
L2error = ( |
|
torch.sqrt(torch.sum(torch.square(gtcopy - predictions), dim=1)) * mask |
|
).view(B, -1) |
|
metrics["L1err"] = torch.mean(L1error.sum(dim=1) / Npx) |
|
metrics["EPE"] = torch.mean(L2error.sum(dim=1) / Npx) |
|
for ths in self.bad_ths: |
|
metrics["bad@{:.1f}".format(ths)] = ( |
|
((L2error > ths) * mask.view(B, -1)).sum(dim=1) / Npx |
|
).mean(dim=0) * 100 |
|
return metrics |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class StereoDatasetMetrics(nn.Module): |
|
def __init__(self): |
|
super().__init__() |
|
self.bad_ths = [0.5, 1, 2, 3] |
|
|
|
def reset(self): |
|
self.agg_N = 0 |
|
self.agg_L1err = torch.tensor(0.0) |
|
self.agg_Nbad = [0 for _ in self.bad_ths] |
|
self._metrics = None |
|
|
|
def add_batch(self, predictions, gt): |
|
assert predictions.size(1) == 1, predictions.size() |
|
assert gt.size(1) == 1, gt.size() |
|
if ( |
|
gt.size(2) == predictions.size(2) * 2 |
|
and gt.size(3) == predictions.size(3) * 2 |
|
): |
|
L1err = torch.minimum( |
|
torch.minimum( |
|
torch.minimum( |
|
torch.sum(torch.abs(gt[:, :, 0::2, 0::2] - predictions), dim=1), |
|
torch.sum(torch.abs(gt[:, :, 1::2, 0::2] - predictions), dim=1), |
|
), |
|
torch.sum(torch.abs(gt[:, :, 0::2, 1::2] - predictions), dim=1), |
|
), |
|
torch.sum(torch.abs(gt[:, :, 1::2, 1::2] - predictions), dim=1), |
|
) |
|
valid = torch.isfinite(L1err) |
|
else: |
|
valid = torch.isfinite(gt[:, 0, :, :]) |
|
L1err = torch.sum(torch.abs(gt - predictions), dim=1) |
|
N = valid.sum() |
|
Nnew = self.agg_N + N |
|
self.agg_L1err = ( |
|
float(self.agg_N) / Nnew * self.agg_L1err |
|
+ L1err[valid].mean().cpu() * float(N) / Nnew |
|
) |
|
self.agg_N = Nnew |
|
for i, th in enumerate(self.bad_ths): |
|
self.agg_Nbad[i] += (L1err[valid] > th).sum().cpu() |
|
|
|
def _compute_metrics(self): |
|
if self._metrics is not None: |
|
return |
|
out = {} |
|
out["L1err"] = self.agg_L1err.item() |
|
for i, th in enumerate(self.bad_ths): |
|
out["bad@{:.1f}".format(th)] = ( |
|
float(self.agg_Nbad[i]) / self.agg_N |
|
).item() * 100.0 |
|
self._metrics = out |
|
|
|
def get_results(self): |
|
self._compute_metrics() |
|
return self._metrics |
|
|
|
|
|
class FlowDatasetMetrics(nn.Module): |
|
def __init__(self): |
|
super().__init__() |
|
self.bad_ths = [0.5, 1, 3, 5] |
|
self.speed_ths = [(0, 10), (10, 40), (40, torch.inf)] |
|
|
|
def reset(self): |
|
self.agg_N = 0 |
|
self.agg_L1err = torch.tensor(0.0) |
|
self.agg_L2err = torch.tensor(0.0) |
|
self.agg_Nbad = [0 for _ in self.bad_ths] |
|
self.agg_EPEspeed = [ |
|
torch.tensor(0.0) for _ in self.speed_ths |
|
] |
|
self.agg_Nspeed = [0 for _ in self.speed_ths] |
|
self._metrics = None |
|
self.pairname_results = {} |
|
|
|
def add_batch(self, predictions, gt): |
|
assert predictions.size(1) == 2, predictions.size() |
|
assert gt.size(1) == 2, gt.size() |
|
if ( |
|
gt.size(2) == predictions.size(2) * 2 |
|
and gt.size(3) == predictions.size(3) * 2 |
|
): |
|
L1err = torch.minimum( |
|
torch.minimum( |
|
torch.minimum( |
|
torch.sum(torch.abs(gt[:, :, 0::2, 0::2] - predictions), dim=1), |
|
torch.sum(torch.abs(gt[:, :, 1::2, 0::2] - predictions), dim=1), |
|
), |
|
torch.sum(torch.abs(gt[:, :, 0::2, 1::2] - predictions), dim=1), |
|
), |
|
torch.sum(torch.abs(gt[:, :, 1::2, 1::2] - predictions), dim=1), |
|
) |
|
L2err = torch.minimum( |
|
torch.minimum( |
|
torch.minimum( |
|
torch.sqrt( |
|
torch.sum( |
|
torch.square(gt[:, :, 0::2, 0::2] - predictions), dim=1 |
|
) |
|
), |
|
torch.sqrt( |
|
torch.sum( |
|
torch.square(gt[:, :, 1::2, 0::2] - predictions), dim=1 |
|
) |
|
), |
|
), |
|
torch.sqrt( |
|
torch.sum( |
|
torch.square(gt[:, :, 0::2, 1::2] - predictions), dim=1 |
|
) |
|
), |
|
), |
|
torch.sqrt( |
|
torch.sum(torch.square(gt[:, :, 1::2, 1::2] - predictions), dim=1) |
|
), |
|
) |
|
valid = torch.isfinite(L1err) |
|
gtspeed = ( |
|
torch.sqrt(torch.sum(torch.square(gt[:, :, 0::2, 0::2]), dim=1)) |
|
+ torch.sqrt(torch.sum(torch.square(gt[:, :, 0::2, 1::2]), dim=1)) |
|
+ torch.sqrt(torch.sum(torch.square(gt[:, :, 1::2, 0::2]), dim=1)) |
|
+ torch.sqrt(torch.sum(torch.square(gt[:, :, 1::2, 1::2]), dim=1)) |
|
) / 4.0 |
|
else: |
|
valid = torch.isfinite(gt[:, 0, :, :]) |
|
L1err = torch.sum(torch.abs(gt - predictions), dim=1) |
|
L2err = torch.sqrt(torch.sum(torch.square(gt - predictions), dim=1)) |
|
gtspeed = torch.sqrt(torch.sum(torch.square(gt), dim=1)) |
|
N = valid.sum() |
|
Nnew = self.agg_N + N |
|
self.agg_L1err = ( |
|
float(self.agg_N) / Nnew * self.agg_L1err |
|
+ L1err[valid].mean().cpu() * float(N) / Nnew |
|
) |
|
self.agg_L2err = ( |
|
float(self.agg_N) / Nnew * self.agg_L2err |
|
+ L2err[valid].mean().cpu() * float(N) / Nnew |
|
) |
|
self.agg_N = Nnew |
|
for i, th in enumerate(self.bad_ths): |
|
self.agg_Nbad[i] += (L2err[valid] > th).sum().cpu() |
|
for i, (th1, th2) in enumerate(self.speed_ths): |
|
vv = (gtspeed[valid] >= th1) * (gtspeed[valid] < th2) |
|
iNspeed = vv.sum() |
|
if iNspeed == 0: |
|
continue |
|
iNnew = self.agg_Nspeed[i] + iNspeed |
|
self.agg_EPEspeed[i] = ( |
|
float(self.agg_Nspeed[i]) / iNnew * self.agg_EPEspeed[i] |
|
+ float(iNspeed) / iNnew * L2err[valid][vv].mean().cpu() |
|
) |
|
self.agg_Nspeed[i] = iNnew |
|
|
|
def _compute_metrics(self): |
|
if self._metrics is not None: |
|
return |
|
out = {} |
|
out["L1err"] = self.agg_L1err.item() |
|
out["EPE"] = self.agg_L2err.item() |
|
for i, th in enumerate(self.bad_ths): |
|
out["bad@{:.1f}".format(th)] = ( |
|
float(self.agg_Nbad[i]) / self.agg_N |
|
).item() * 100.0 |
|
for i, (th1, th2) in enumerate(self.speed_ths): |
|
out[ |
|
"s{:d}{:s}".format(th1, "-" + str(th2) if th2 < torch.inf else "+") |
|
] = self.agg_EPEspeed[i].item() |
|
self._metrics = out |
|
|
|
def get_results(self): |
|
self._compute_metrics() |
|
return self._metrics |
|
|