Spaces:
Paused
Paused
import os | |
import pathlib | |
import torch | |
import numpy as np | |
import skimage | |
from imageio import imread | |
from scipy import linalg | |
from torch.nn.functional import adaptive_avg_pool2d | |
from skimage.metrics import structural_similarity as compare_ssim | |
from skimage.metrics import peak_signal_noise_ratio as compare_psnr | |
import glob | |
import argparse | |
import matplotlib.pyplot as plt | |
from inception import InceptionV3 | |
#from scripts.PerceptualSimilarity.models import dist_model as dm | |
import lpips | |
import pandas as pd | |
import json | |
import imageio | |
import cv2 | |
print(skimage.__version__) | |
class FID(): | |
"""docstring for FID | |
Calculates the Frechet Inception Distance (FID) to evalulate GANs | |
The FID metric calculates the distance between two distributions of images. | |
Typically, we have summary statistics (mean & covariance matrix) of one | |
of these distributions, while the 2nd distribution is given by a GAN. | |
When run as a stand-alone program, it compares the distribution of | |
images that are stored as PNG/JPEG at a specified location with a | |
distribution given by summary statistics (in pickle format). | |
The FID is calculated by assuming that X_1 and X_2 are the activations of | |
the pool_3 layer of the inception net for generated samples and real world | |
samples respectivly. | |
See --help to see further details. | |
Code apapted from https://github.com/bioinf-jku/TTUR to use PyTorch instead | |
of Tensorflow | |
Copyright 2018 Institute of Bioinformatics, JKU Linz | |
Licensed under the Apache License, Version 2.0 (the "License"); | |
you may not use this file except in compliance with the License. | |
You may obtain a copy of the License at | |
http://www.apache.org/licenses/LICENSE-2.0 | |
Unless required by applicable law or agreed to in writing, software | |
distributed under the License is distributed on an "AS IS" BASIS, | |
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
See the License for the specific language governing permissions and | |
limitations under the License. | |
""" | |
def __init__(self): | |
self.dims = 2048 | |
self.batch_size = 128 | |
self.cuda = True | |
self.verbose=False | |
block_idx = InceptionV3.BLOCK_INDEX_BY_DIM[self.dims] | |
self.model = InceptionV3([block_idx]) | |
if self.cuda: | |
# TODO: put model into specific GPU | |
self.model.cuda() | |
def __call__(self, images, gt_path): | |
""" images: list of the generated image. The values must lie between 0 and 1. | |
gt_path: the path of the ground truth images. The values must lie between 0 and 1. | |
""" | |
if not os.path.exists(gt_path): | |
raise RuntimeError('Invalid path: %s' % gt_path) | |
print('calculate gt_path statistics...') | |
m1, s1 = self.compute_statistics_of_path(gt_path, self.verbose) | |
print('calculate generated_images statistics...') | |
m2, s2 = self.calculate_activation_statistics(images, self.verbose) | |
fid_value = self.calculate_frechet_distance(m1, s1, m2, s2) | |
return fid_value | |
def calculate_from_disk(self, generated_path, gt_path, img_size): | |
""" | |
""" | |
if not os.path.exists(gt_path): | |
raise RuntimeError('Invalid path: %s' % gt_path) | |
if not os.path.exists(generated_path): | |
raise RuntimeError('Invalid path: %s' % generated_path) | |
print ('exp-path - '+generated_path) | |
print('calculate gt_path statistics...') | |
m1, s1 = self.compute_statistics_of_path(gt_path, self.verbose, img_size) | |
print('calculate generated_path statistics...') | |
m2, s2 = self.compute_statistics_of_path(generated_path, self.verbose, img_size) | |
print('calculate frechet distance...') | |
fid_value = self.calculate_frechet_distance(m1, s1, m2, s2) | |
print('fid_distance %f' % (fid_value)) | |
return fid_value | |
def compute_statistics_of_path(self, path , verbose, img_size): | |
size_flag = '{}_{}'.format(img_size[0], img_size[1]) | |
npz_file = os.path.join(path, size_flag + '_statistics.npz') | |
if os.path.exists(npz_file): | |
f = np.load(npz_file) | |
m, s = f['mu'][:], f['sigma'][:] | |
f.close() | |
else: | |
path = pathlib.Path(path) | |
files = list(path.glob('*.jpg')) + list(path.glob('*.png')) | |
imgs = (np.array([(cv2.resize(imread(str(fn)).astype(np.float32),img_size,interpolation=cv2.INTER_CUBIC)) for fn in files]))/255.0 | |
# Bring images to shape (B, 3, H, W) | |
imgs = imgs.transpose((0, 3, 1, 2)) | |
# Rescale images to be between 0 and 1 | |
m, s = self.calculate_activation_statistics(imgs, verbose) | |
np.savez(npz_file, mu=m, sigma=s) | |
return m, s | |
def calculate_activation_statistics(self, images, verbose): | |
"""Calculation of the statistics used by the FID. | |
Params: | |
-- images : Numpy array of dimension (n_images, 3, hi, wi). The values | |
must lie between 0 and 1. | |
-- model : Instance of inception model | |
-- batch_size : The images numpy array is split into batches with | |
batch size batch_size. A reasonable batch size | |
depends on the hardware. | |
-- dims : Dimensionality of features returned by Inception | |
-- cuda : If set to True, use GPU | |
-- verbose : If set to True and parameter out_step is given, the | |
number of calculated batches is reported. | |
Returns: | |
-- mu : The mean over samples of the activations of the pool_3 layer of | |
the inception model. | |
-- sigma : The covariance matrix of the activations of the pool_3 layer of | |
the inception model. | |
""" | |
act = self.get_activations(images, verbose) | |
mu = np.mean(act, axis=0) | |
sigma = np.cov(act, rowvar=False) | |
return mu, sigma | |
def get_activations(self, images, verbose=False): | |
"""Calculates the activations of the pool_3 layer for all images. | |
Params: | |
-- images : Numpy array of dimension (n_images, 3, hi, wi). The values | |
must lie between 0 and 1. | |
-- model : Instance of inception model | |
-- batch_size : the images numpy array is split into batches with | |
batch size batch_size. A reasonable batch size depends | |
on the hardware. | |
-- dims : Dimensionality of features returned by Inception | |
-- cuda : If set to True, use GPU | |
-- verbose : If set to True and parameter out_step is given, the number | |
of calculated batches is reported. | |
Returns: | |
-- A numpy array of dimension (num images, dims) that contains the | |
activations of the given tensor when feeding inception with the | |
query tensor. | |
""" | |
self.model.eval() | |
d0 = images.shape[0] | |
if self.batch_size > d0: | |
print(('Warning: batch size is bigger than the data size. ' | |
'Setting batch size to data size')) | |
self.batch_size = d0 | |
n_batches = d0 // self.batch_size | |
n_used_imgs = n_batches * self.batch_size | |
pred_arr = np.empty((n_used_imgs, self.dims)) | |
for i in range(n_batches): | |
if verbose: | |
print('\rPropagating batch %d/%d' % (i + 1, n_batches)) | |
# end='', flush=True) | |
start = i * self.batch_size | |
end = start + self.batch_size | |
batch = torch.from_numpy(images[start:end]).type(torch.FloatTensor) | |
# batch = Variable(batch, volatile=True) | |
if self.cuda: | |
batch = batch.cuda() | |
pred = self.model(batch)[0] | |
# If model output is not scalar, apply global spatial average pooling. | |
# This happens if you choose a dimensionality not equal 2048. | |
if pred.shape[2] != 1 or pred.shape[3] != 1: | |
pred = adaptive_avg_pool2d(pred, output_size=(1, 1)) | |
pred_arr[start:end] = pred.cpu().data.numpy().reshape(self.batch_size, -1) | |
if verbose: | |
print(' done') | |
return pred_arr | |
def calculate_frechet_distance(self, mu1, sigma1, mu2, sigma2, eps=1e-6): | |
"""Numpy implementation of the Frechet Distance. | |
The Frechet distance between two multivariate Gaussians X_1 ~ N(mu_1, C_1) | |
and X_2 ~ N(mu_2, C_2) is | |
d^2 = ||mu_1 - mu_2||^2 + Tr(C_1 + C_2 - 2*sqrt(C_1*C_2)). | |
Stable version by Dougal J. Sutherland. | |
Params: | |
-- mu1 : Numpy array containing the activations of a layer of the | |
inception net (like returned by the function 'get_predictions') | |
for generated samples. | |
-- mu2 : The sample mean over activations, precalculated on an | |
representive data set. | |
-- sigma1: The covariance matrix over activations for generated samples. | |
-- sigma2: The covariance matrix over activations, precalculated on an | |
representive data set. | |
Returns: | |
-- : The Frechet Distance. | |
""" | |
mu1 = np.atleast_1d(mu1) | |
mu2 = np.atleast_1d(mu2) | |
sigma1 = np.atleast_2d(sigma1) | |
sigma2 = np.atleast_2d(sigma2) | |
assert mu1.shape == mu2.shape, \ | |
'Training and test mean vectors have different lengths' | |
assert sigma1.shape == sigma2.shape, \ | |
'Training and test covariances have different dimensions' | |
diff = mu1 - mu2 | |
# Product might be almost singular | |
covmean, _ = linalg.sqrtm(sigma1.dot(sigma2), disp=False) | |
if not np.isfinite(covmean).all(): | |
msg = ('fid calculation produces singular product; ' | |
'adding %s to diagonal of cov estimates') % eps | |
print(msg) | |
offset = np.eye(sigma1.shape[0]) * eps | |
covmean = linalg.sqrtm((sigma1 + offset).dot(sigma2 + offset)) | |
# Numerical error might give slight imaginary component | |
if np.iscomplexobj(covmean): | |
if not np.allclose(np.diagonal(covmean).imag, 0, atol=1e-3): | |
m = np.max(np.abs(covmean.imag)) | |
raise ValueError('Imaginary component {}'.format(m)) | |
covmean = covmean.real | |
tr_covmean = np.trace(covmean) | |
return (diff.dot(diff) + np.trace(sigma1) + | |
np.trace(sigma2) - 2 * tr_covmean) | |
class Reconstruction_Metrics(): | |
def __init__(self, metric_list=['ssim', 'psnr', 'l1', 'mae'], data_range=1, win_size=51, multichannel=True): | |
self.data_range = data_range | |
self.win_size = win_size | |
self.multichannel = multichannel | |
for metric in metric_list: | |
if metric in ['ssim', 'psnr', 'l1', 'mae']: | |
setattr(self, metric, True) | |
else: | |
print('unsupport reconstruction metric: %s'%metric) | |
def __call__(self, inputs, gts): | |
""" | |
inputs: the generated image, size (b,c,w,h), data range(0, data_range) | |
gts: the ground-truth image, size (b,c,w,h), data range(0, data_range) | |
""" | |
result = dict() | |
[b,n,w,h] = inputs.size() | |
inputs = inputs.view(b*n, w, h).detach().cpu().numpy().astype(np.float32).transpose(1,2,0) | |
gts = gts.view(b*n, w, h).detach().cpu().numpy().astype(np.float32).transpose(1,2,0) | |
if hasattr(self, 'ssim'): | |
ssim_value = compare_ssim(inputs, gts, data_range=self.data_range, | |
win_size=self.win_size, multichannel=self.multichannel) | |
result['ssim'] = ssim_value | |
if hasattr(self, 'psnr'): | |
psnr_value = compare_psnr(inputs, gts, self.data_range) | |
result['psnr'] = psnr_value | |
if hasattr(self, 'l1'): | |
l1_value = compare_l1(inputs, gts) | |
result['l1'] = l1_value | |
if hasattr(self, 'mae'): | |
mae_value = compare_mae(inputs, gts) | |
result['mae'] = mae_value | |
return result | |
def calculate_from_disk(self, inputs, gts, save_path=None, img_size=(176,256), sort=True, debug=0): | |
""" | |
inputs: .txt files, floders, image files (string), image files (list) | |
gts: .txt files, floders, image files (string), image files (list) | |
""" | |
if sort: | |
input_image_list = sorted(get_image_list(inputs)) | |
gt_image_list = sorted(get_image_list(gts)) | |
else: | |
input_image_list = get_image_list(inputs) | |
gt_image_list = get_image_list(gts) | |
size_flag = '{}_{}'.format(img_size[0], img_size[1]) | |
npz_file = os.path.join(save_path, size_flag + '_metrics.npz') | |
if os.path.exists(npz_file): | |
f = np.load(npz_file) | |
psnr,ssim,ssim_256,mae,l1=f['psnr'],f['ssim'],f['ssim_256'],f['mae'],f['l1'] | |
else: | |
psnr = [] | |
ssim = [] | |
ssim_256 = [] | |
mae = [] | |
l1 = [] | |
names = [] | |
for index in range(len(input_image_list)): | |
name = os.path.basename(input_image_list[index]) | |
names.append(name) | |
img_gt = (cv2.resize(imread(str(gt_image_list[index])).astype(np.float32), img_size,interpolation=cv2.INTER_CUBIC)) /255.0 | |
img_pred = (cv2.resize(imread(str(input_image_list[index])).astype(np.float32), img_size,interpolation=cv2.INTER_CUBIC)) / 255.0 | |
if debug != 0: | |
plt.subplot('121') | |
plt.imshow(img_gt) | |
plt.title('Groud truth') | |
plt.subplot('122') | |
plt.imshow(img_pred) | |
plt.title('Output') | |
plt.show() | |
psnr.append(compare_psnr(img_gt, img_pred, data_range=self.data_range)) | |
ssim.append(compare_ssim(img_gt, img_pred, data_range=self.data_range, | |
win_size=self.win_size,multichannel=self.multichannel, channel_axis=2)) | |
mae.append(compare_mae(img_gt, img_pred)) | |
l1.append(compare_l1(img_gt, img_pred)) | |
img_gt_256 = img_gt*255.0 | |
img_pred_256 = img_pred*255.0 | |
ssim_256.append(compare_ssim(img_gt_256, img_pred_256, gaussian_weights=True, sigma=1.2, | |
use_sample_covariance=False, multichannel=True, channel_axis=2, | |
data_range=img_pred_256.max() - img_pred_256.min())) | |
if np.mod(index, 200) == 0: | |
print( | |
str(index) + ' images processed', | |
"PSNR: %.4f" % round(np.mean(psnr), 4), | |
"SSIM_256: %.4f" % round(np.mean(ssim_256), 4), | |
"MAE: %.4f" % round(np.mean(mae), 4), | |
"l1: %.4f" % round(np.mean(l1), 4), | |
) | |
if save_path: | |
np.savez(save_path + '/' + size_flag + '_metrics.npz', psnr=psnr, ssim=ssim, ssim_256=ssim_256, mae=mae, l1=l1, names=names) | |
print( | |
"PSNR: %.4f" % round(np.mean(psnr), 4), | |
"PSNR Variance: %.4f" % round(np.var(psnr), 4), | |
"SSIM_256: %.4f" % round(np.mean(ssim_256), 4), | |
"SSIM_256 Variance: %.4f" % round(np.var(ssim_256), 4), | |
"MAE: %.4f" % round(np.mean(mae), 4), | |
"MAE Variance: %.4f" % round(np.var(mae), 4), | |
"l1: %.4f" % round(np.mean(l1), 4), | |
"l1 Variance: %.4f" % round(np.var(l1), 4) | |
) | |
dic = {"psnr":[round(np.mean(psnr), 6)], | |
"psnr_variance": [round(np.var(psnr), 6)], | |
"ssim_256": [round(np.mean(ssim_256), 6)], | |
"ssim_256_variance": [round(np.var(ssim_256), 6)], | |
"mae": [round(np.mean(mae), 6)], | |
"mae_variance": [round(np.var(mae), 6)], | |
"l1": [round(np.mean(l1), 6)], | |
"l1_variance": [round(np.var(l1), 6)] } | |
return dic | |
def get_image_list(flist): | |
if isinstance(flist, list): | |
return flist | |
# flist: image file path, image directory path, text file flist path | |
if isinstance(flist, str): | |
if os.path.isdir(flist): | |
flist = list(glob.glob(flist + '/*.jpg')) + list(glob.glob(flist + '/*.png')) | |
flist.sort() | |
return flist | |
if os.path.isfile(flist): | |
try: | |
return np.genfromtxt(flist, dtype=np.str) | |
except: | |
return [flist] | |
print('can not read files from %s return empty list'%flist) | |
return [] | |
def compare_l1(img_true, img_test): | |
img_true = img_true.astype(np.float32) | |
img_test = img_test.astype(np.float32) | |
return np.mean(np.abs(img_true - img_test)) | |
def compare_mae(img_true, img_test): | |
img_true = img_true.astype(np.float32) | |
img_test = img_test.astype(np.float32) | |
return np.sum(np.abs(img_true - img_test)) / np.sum(img_true + img_test) | |
def preprocess_path_for_deform_task(gt_path, distorted_path): | |
distorted_image_list = sorted(get_image_list(distorted_path)) | |
gt_list=[] | |
distorated_list=[] | |
for distorted_image in distorted_image_list: | |
image = os.path.basename(distorted_image)[1:] | |
image = image.split('_to_')[-1] | |
gt_image = gt_path + '/' + image.replace('jpg', 'png') | |
if not os.path.isfile(gt_image): | |
print(distorted_image, gt_image) | |
print('=====') | |
continue | |
gt_list.append(gt_image) | |
distorated_list.append(distorted_image) | |
return gt_list, distorated_list | |
class LPIPS(): | |
def __init__(self, use_gpu=True): | |
self.model = lpips.LPIPS(net='alex').eval().cuda() | |
self.use_gpu=use_gpu | |
def __call__(self, image_1, image_2): | |
""" | |
image_1: images with size (n, 3, w, h) with value [-1, 1] | |
image_2: images with size (n, 3, w, h) with value [-1, 1] | |
""" | |
result = self.model.forward(image_1, image_2) | |
return result | |
def calculate_from_disk(self, path_1, path_2,img_size, batch_size=64, verbose=False, sort=True): | |
if sort: | |
files_1 = sorted(get_image_list(path_1)) | |
files_2 = sorted(get_image_list(path_2)) | |
else: | |
files_1 = get_image_list(path_1) | |
files_2 = get_image_list(path_2) | |
results=[] | |
d0 = len(files_1) | |
if batch_size > d0: | |
print(('Warning: batch size is bigger than the data size. ' | |
'Setting batch size to data size')) | |
batch_size = d0 | |
n_batches = d0 // batch_size | |
for i in range(n_batches): | |
if verbose: | |
print('\rPropagating batch %d/%d' % (i + 1, n_batches)) | |
# end='', flush=True) | |
start = i * batch_size | |
end = start + batch_size | |
imgs_1 = np.array([cv2.resize(imread(str(fn)).astype(np.float32),img_size,interpolation=cv2.INTER_CUBIC)/255.0 for fn in files_1[start:end]]) | |
imgs_2 = np.array([cv2.resize(imread(str(fn)).astype(np.float32),img_size,interpolation=cv2.INTER_CUBIC)/255.0 for fn in files_2[start:end]]) | |
imgs_1 = imgs_1.transpose((0, 3, 1, 2)) | |
imgs_2 = imgs_2.transpose((0, 3, 1, 2)) | |
img_1_batch = torch.from_numpy(imgs_1).type(torch.FloatTensor) | |
img_2_batch = torch.from_numpy(imgs_2).type(torch.FloatTensor) | |
if self.use_gpu: | |
img_1_batch = img_1_batch.cuda() | |
img_2_batch = img_2_batch.cuda() | |
with torch.no_grad(): | |
result = self.model.forward(img_1_batch, img_2_batch) | |
results.append(result) | |
distance = torch.cat(results,0)[:,0,0,0].mean() | |
print('lpips: %.3f'%distance) | |
return distance | |