|
|
|
|
|
|
|
import time |
|
import os |
|
import json |
|
from loguru import logger |
|
|
|
import cv2 |
|
|
|
import torch |
|
|
|
from yolox.data.data_augment import ValTransform |
|
from yolox.data.datasets import COCO_CLASSES |
|
from yolox.exp import get_exp |
|
from yolox.utils import fuse_model, get_model_info, postprocess, vis |
|
|
|
IMAGE_EXT = [".jpg", ".jpeg", ".webp", ".bmp", ".png"] |
|
|
|
def get_image_list(path): |
|
image_names = [] |
|
for maindir, subdir, file_name_list in os.walk(path): |
|
for filename in file_name_list: |
|
apath = os.path.join(maindir, filename) |
|
ext = os.path.splitext(apath)[1] |
|
if ext in IMAGE_EXT: |
|
image_names.append(apath) |
|
return image_names |
|
|
|
|
|
class Predictor(object): |
|
def __init__( |
|
self, |
|
model, |
|
exp, |
|
cls_names=COCO_CLASSES, |
|
trt_file=None, |
|
decoder=None, |
|
device="cpu", |
|
fp16=False, |
|
legacy=False, |
|
): |
|
self.model = model |
|
self.cls_names = cls_names |
|
self.decoder = decoder |
|
self.num_classes = exp.num_classes |
|
self.confthre = exp.test_conf |
|
self.nmsthre = exp.nmsthre |
|
self.test_size = exp.test_size |
|
self.device = device |
|
self.fp16 = fp16 |
|
self.preproc = ValTransform(legacy=legacy) |
|
if trt_file is not None: |
|
from torch2trt import TRTModule |
|
|
|
model_trt = TRTModule() |
|
model_trt.load_state_dict(torch.load(trt_file)) |
|
|
|
x = torch.ones(1, 3, exp.test_size[0], exp.test_size[1]).cuda() |
|
self.model(x) |
|
self.model = model_trt |
|
|
|
def inference(self, img): |
|
img_info = {"id": 0} |
|
if isinstance(img, str): |
|
img_info["file_name"] = os.path.basename(img) |
|
img = cv2.imread(img) |
|
else: |
|
img_info["file_name"] = None |
|
|
|
height, width = img.shape[:2] |
|
img_info["height"] = height |
|
img_info["width"] = width |
|
img_info["raw_img"] = img |
|
|
|
ratio = min(self.test_size[0] / img.shape[0], self.test_size[1] / img.shape[1]) |
|
pad_h =0 |
|
pad_w =0 |
|
img_info["pad"] = (pad_w, pad_h) |
|
img_info["ratio"] = ratio |
|
|
|
img, _ = self.preproc(img, None, self.test_size) |
|
img = torch.from_numpy(img).unsqueeze(0) |
|
img = img.float() |
|
if self.device == "gpu": |
|
img = img.cuda() |
|
if self.fp16: |
|
img = img.half() |
|
|
|
with torch.no_grad(): |
|
t0 = time.time() |
|
outputs = self.model(img) |
|
if self.decoder is not None: |
|
outputs = self.decoder(outputs, dtype=outputs.type()) |
|
outputs = postprocess( |
|
outputs, self.num_classes, self.confthre, |
|
self.nmsthre, class_agnostic=True |
|
) |
|
logger.info("Infer time: {:.4f}s".format(time.time() - t0)) |
|
return outputs, img_info |
|
|
|
def build_predictor( |
|
exp_file, model_name, ckpt_path, device="cpu", fp16=False, fuse=False, trt=False, conf=0, nms=0, tsize=None |
|
): |
|
|
|
exp = get_exp(exp_file, model_name) |
|
if conf is not None: |
|
exp.test_conf = conf |
|
if nms is not None: |
|
exp.nmsthre = nms |
|
if tsize is not None: |
|
exp.test_size = (tsize, tsize) |
|
|
|
|
|
model = exp.get_model() |
|
if device == "gpu": |
|
model.cuda() |
|
if fp16: |
|
model.half() |
|
model.eval() |
|
logger.info("Model Summary: {}".format(get_model_info(model, exp.test_size))) |
|
|
|
logger.info("loading checkpoint") |
|
ckpt = torch.load(ckpt_path, map_location="cpu", weights_only=False) |
|
|
|
model.load_state_dict(ckpt["model"]) |
|
logger.info("loaded checkpoint done.") |
|
|
|
predictor = Predictor( |
|
model, exp, COCO_CLASSES, |
|
None, decoder=None, |
|
device=device, fp16=fp16, legacy=False |
|
) |
|
|
|
return predictor |
|
|
|
def run_detection(predictor, path): |
|
|
|
|
|
if os.path.isdir(path): |
|
files = get_image_list(path) |
|
else: |
|
files = [path] |
|
files.sort() |
|
|
|
img_list = [] |
|
ann_list = [] |
|
|
|
for img_id, image_name in enumerate(files): |
|
|
|
outputs, img_info = predictor.inference(image_name) |
|
ratio = img_info["ratio"] |
|
pad_w, pad_h = img_info["pad"] |
|
|
|
img_entry = {"id": img_id, |
|
"filename": image_name } |
|
img_list.append(img_entry) |
|
|
|
if outputs[0] is not None: |
|
for id, output in enumerate(outputs[0]): |
|
print(output) |
|
x1, y1, x2, y2 = output[:4] |
|
x1 = (x1 - pad_w) / ratio |
|
y1 = (y1 - pad_h) / ratio |
|
x2 = (x2 - pad_w) / ratio |
|
y2 = (y2 - pad_h) / ratio |
|
ann_entry = {"id": id, |
|
"image_id": img_id, |
|
"bbox": [float(x1), float(y1), float(x2), float(y2)], |
|
"cls": output[6].item(), |
|
"score": (output[4] * output[5]).item() } |
|
ann_list.append(ann_entry) |
|
|
|
data_dict = { "images": img_list, |
|
"annotations": ann_list |
|
} |
|
|
|
with open(f"{path}/results.json", 'w') as f: |
|
json.dump(data_dict, f) |
|
|
|
return f"{path}/results.json" |
|
|