#!/usr/bin/env python3 # -*- coding:utf-8 -*- import time import os import json from loguru import logger import cv2 import torch from yolox.data.data_augment import ValTransform from yolox.data.datasets import COCO_CLASSES from yolox.exp import get_exp from yolox.utils import fuse_model, get_model_info, postprocess, vis IMAGE_EXT = [".jpg", ".jpeg", ".webp", ".bmp", ".png"] def get_image_list(path): image_names = [] for maindir, subdir, file_name_list in os.walk(path): for filename in file_name_list: apath = os.path.join(maindir, filename) ext = os.path.splitext(apath)[1] if ext in IMAGE_EXT: image_names.append(apath) return image_names class Predictor(object): def __init__( self, model, exp, cls_names=COCO_CLASSES, trt_file=None, decoder=None, device="cpu", fp16=False, legacy=False, ): self.model = model self.cls_names = cls_names self.decoder = decoder self.num_classes = exp.num_classes self.confthre = exp.test_conf self.nmsthre = exp.nmsthre self.test_size = exp.test_size self.device = device self.fp16 = fp16 self.preproc = ValTransform(legacy=legacy) if trt_file is not None: from torch2trt import TRTModule model_trt = TRTModule() model_trt.load_state_dict(torch.load(trt_file)) x = torch.ones(1, 3, exp.test_size[0], exp.test_size[1]).cuda() self.model(x) self.model = model_trt def inference(self, img): img_info = {"id": 0} if isinstance(img, str): img_info["file_name"] = os.path.basename(img) img = cv2.imread(img) else: img_info["file_name"] = None height, width = img.shape[:2] img_info["height"] = height img_info["width"] = width img_info["raw_img"] = img ratio = min(self.test_size[0] / img.shape[0], self.test_size[1] / img.shape[1]) pad_h =0 # ( self.test_size[0] - img.shape[0] * ratio ) / 2 pad_w =0 # ( self.test_size[1] - img.shape[1] * ratio ) / 2 img_info["pad"] = (pad_w, pad_h) img_info["ratio"] = ratio img, _ = self.preproc(img, None, self.test_size) img = torch.from_numpy(img).unsqueeze(0) img = img.float() if self.device == "gpu": img = img.cuda() if self.fp16: img = img.half() # to FP16 with torch.no_grad(): t0 = time.time() outputs = self.model(img) if self.decoder is not None: outputs = self.decoder(outputs, dtype=outputs.type()) outputs = postprocess( outputs, self.num_classes, self.confthre, self.nmsthre, class_agnostic=True ) logger.info("Infer time: {:.4f}s".format(time.time() - t0)) return outputs, img_info def build_predictor( exp_file, model_name, ckpt_path, device="cpu", fp16=False, fuse=False, trt=False, conf=0, nms=0, tsize=None ): # load experiment exp = get_exp(exp_file, model_name) if conf is not None: exp.test_conf = conf if nms is not None: exp.nmsthre = nms if tsize is not None: exp.test_size = (tsize, tsize) # create & initialize model model = exp.get_model() if device == "gpu": model.cuda() if fp16: model.half() model.eval() logger.info("Model Summary: {}".format(get_model_info(model, exp.test_size))) logger.info("loading checkpoint") ckpt = torch.load(ckpt_path, map_location="cpu", weights_only=False) # load the model state dict model.load_state_dict(ckpt["model"]) logger.info("loaded checkpoint done.") predictor = Predictor( model, exp, COCO_CLASSES, None, decoder=None, device=device, fp16=fp16, legacy=False ) return predictor def run_detection(predictor, path): # COCO output format: { images: [{id: 0, filename: "x.jpg"}, ...], # annotations: [{id: 0, image_id: 0, bbox: [0 0 0 0], score: 0.35, class: 1}, ... ] } if os.path.isdir(path): files = get_image_list(path) else: files = [path] files.sort() img_list = [] ann_list = [] for img_id, image_name in enumerate(files): outputs, img_info = predictor.inference(image_name) ratio = img_info["ratio"] pad_w, pad_h = img_info["pad"] img_entry = {"id": img_id, "filename": image_name } img_list.append(img_entry) if outputs[0] is not None: for id, output in enumerate(outputs[0]): print(output) x1, y1, x2, y2 = output[:4] x1 = (x1 - pad_w) / ratio y1 = (y1 - pad_h) / ratio x2 = (x2 - pad_w) / ratio y2 = (y2 - pad_h) / ratio ann_entry = {"id": id, "image_id": img_id, "bbox": [float(x1), float(y1), float(x2), float(y2)], "cls": output[6].item(), "score": (output[4] * output[5]).item() } ann_list.append(ann_entry) data_dict = { "images": img_list, "annotations": ann_list } with open(f"{path}/results.json", 'w') as f: json.dump(data_dict, f) return f"{path}/results.json"