Spaces:
Runtime error
Runtime error
| # Copyright (c) SenseTime Research. All rights reserved. | |
| # Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| import codecs | |
| import os | |
| import time | |
| import yaml | |
| import numpy as np | |
| import cv2 | |
| import paddle | |
| import paddleseg.transforms as T | |
| from paddle.inference import create_predictor, PrecisionType | |
| from paddle.inference import Config as PredictConfig | |
| from paddleseg.core.infer import reverse_transform | |
| from paddleseg.cvlibs import manager | |
| from paddleseg.utils import TimeAverager | |
| from ..scripts.optic_flow_process import optic_flow_process | |
| class DeployConfig: | |
| def __init__(self, path): | |
| with codecs.open(path, 'r', 'utf-8') as file: | |
| self.dic = yaml.load(file, Loader=yaml.FullLoader) | |
| self._transforms = self._load_transforms(self.dic['Deploy'][ | |
| 'transforms']) | |
| self._dir = os.path.dirname(path) | |
| def transforms(self): | |
| return self._transforms | |
| def model(self): | |
| return os.path.join(self._dir, self.dic['Deploy']['model']) | |
| def params(self): | |
| return os.path.join(self._dir, self.dic['Deploy']['params']) | |
| def _load_transforms(self, t_list): | |
| com = manager.TRANSFORMS | |
| transforms = [] | |
| for t in t_list: | |
| ctype = t.pop('type') | |
| transforms.append(com[ctype](**t)) | |
| return transforms | |
| class Predictor: | |
| def __init__(self, args): | |
| self.cfg = DeployConfig(args.cfg) | |
| self.args = args | |
| self.compose = T.Compose(self.cfg.transforms) | |
| resize_h, resize_w = args.input_shape | |
| self.disflow = cv2.DISOpticalFlow_create( | |
| cv2.DISOPTICAL_FLOW_PRESET_ULTRAFAST) | |
| self.prev_gray = np.zeros((resize_h, resize_w), np.uint8) | |
| self.prev_cfd = np.zeros((resize_h, resize_w), np.float32) | |
| self.is_init = True | |
| pred_cfg = PredictConfig(self.cfg.model, self.cfg.params) | |
| pred_cfg.disable_glog_info() | |
| if self.args.use_gpu: | |
| pred_cfg.enable_use_gpu(100, 0) | |
| self.predictor = create_predictor(pred_cfg) | |
| if self.args.test_speed: | |
| self.cost_averager = TimeAverager() | |
| def preprocess(self, img): | |
| ori_shapes = [] | |
| processed_imgs = [] | |
| processed_img = self.compose(img)[0] | |
| processed_imgs.append(processed_img) | |
| ori_shapes.append(img.shape) | |
| return processed_imgs, ori_shapes | |
| def run(self, img, bg): | |
| input_names = self.predictor.get_input_names() | |
| input_handle = self.predictor.get_input_handle(input_names[0]) | |
| processed_imgs, ori_shapes = self.preprocess(img) | |
| data = np.array(processed_imgs) | |
| input_handle.reshape(data.shape) | |
| input_handle.copy_from_cpu(data) | |
| if self.args.test_speed: | |
| start = time.time() | |
| self.predictor.run() | |
| if self.args.test_speed: | |
| self.cost_averager.record(time.time() - start) | |
| output_names = self.predictor.get_output_names() | |
| output_handle = self.predictor.get_output_handle(output_names[0]) | |
| output = output_handle.copy_to_cpu() | |
| return self.postprocess(output, img, ori_shapes[0], bg) | |
| def postprocess(self, pred, img, ori_shape, bg): | |
| if not os.path.exists(self.args.save_dir): | |
| os.makedirs(self.args.save_dir) | |
| resize_w = pred.shape[-1] | |
| resize_h = pred.shape[-2] | |
| if self.args.soft_predict: | |
| if self.args.use_optic_flow: | |
| score_map = pred[:, 1, :, :].squeeze(0) | |
| score_map = 255 * score_map | |
| cur_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| cur_gray = cv2.resize(cur_gray, (resize_w, resize_h)) | |
| optflow_map = optic_flow_process(cur_gray, score_map, self.prev_gray, self.prev_cfd, | |
| self.disflow, self.is_init) | |
| self.prev_gray = cur_gray.copy() | |
| self.prev_cfd = optflow_map.copy() | |
| self.is_init = False | |
| score_map = np.repeat(optflow_map[:, :, np.newaxis], 3, axis=2) | |
| score_map = np.transpose(score_map, [2, 0, 1])[np.newaxis, ...] | |
| score_map = reverse_transform( | |
| paddle.to_tensor(score_map), | |
| ori_shape, | |
| self.cfg.transforms, | |
| mode='bilinear') | |
| alpha = np.transpose(score_map.numpy().squeeze(0), | |
| [1, 2, 0]) / 255 | |
| else: | |
| score_map = pred[:, 1, :, :] | |
| score_map = score_map[np.newaxis, ...] | |
| score_map = reverse_transform( | |
| paddle.to_tensor(score_map), | |
| ori_shape, | |
| self.cfg.transforms, | |
| mode='bilinear') | |
| alpha = np.transpose(score_map.numpy().squeeze(0), [1, 2, 0]) | |
| else: | |
| if pred.ndim == 3: | |
| pred = pred[:, np.newaxis, ...] | |
| result = reverse_transform( | |
| paddle.to_tensor( | |
| pred, dtype='float32'), | |
| ori_shape, | |
| self.cfg.transforms, | |
| mode='bilinear') | |
| result = np.array(result) | |
| if self.args.add_argmax: | |
| result = np.argmax(result, axis=1) | |
| else: | |
| result = result.squeeze(1) | |
| alpha = np.transpose(result, [1, 2, 0]) | |
| # background replace | |
| h, w, _ = img.shape | |
| if bg is None: | |
| bg = np.ones_like(img)*255 | |
| else: | |
| bg = cv2.resize(bg, (w, h)) | |
| if bg.ndim == 2: | |
| bg = bg[..., np.newaxis] | |
| comb = (alpha * img + (1 - alpha) * bg).astype(np.uint8) | |
| return comb, alpha, bg, img | |