| import os |
| |
| from functools import partial |
| from pathlib import Path |
|
|
| import torch |
| from huggingface_hub import hf_hub_download |
| from torch import Tensor, nn |
| from torchvision import models, transforms |
| import pandas as pd |
|
|
|
|
| class ModelInterface: |
| def __init__(self, config): |
| |
| |
| self.device = torch.device( |
| f"cuda:{config.get('gpu_kernel')}" if torch.cuda.is_available() else "cpu" |
| ) |
| normalization = (const["NORM_MEAN"], const["NORM_SD"]) |
| |
| transform = config.get("transform_surface") |
| transform["normalize"] = normalization |
| self.transform_surface = transform |
| transform = config.get("transform_road_type") |
| transform["normalize"] = normalization |
| self.transform_road_type = transform |
| self.model_root = Path(config.get("model_root")) |
| self.models = config.get("models") |
| self.hf_model_repo = config.get("hf_model_repo") |
|
|
| @staticmethod |
| def custom_crop(img, crop_style=None): |
| im_width, im_height = img.size |
| if crop_style == const["CROP_LOWER_MIDDLE_HALF"]: |
| top = im_height / 2 |
| left = im_width / 4 |
| height = im_height / 2 |
| width = im_width / 2 |
| elif crop_style == const["CROP_LOWER_HALF"]: |
| top = im_height / 2 |
| left = 0 |
| height = im_height / 2 |
| width = im_width |
| else: |
| return img |
|
|
| cropped_img = transforms.functional.crop(img, top, left, height, width) |
| return cropped_img |
|
|
| def transform( |
| self, |
| resize=None, |
| crop=None, |
| to_tensor=True, |
| normalize=None, |
| ): |
| """ |
| Create a PyTorch image transformation function based on specified parameters. |
| |
| Parameters: |
| - resize (tuple or None): Target size for resizing, e.g. (height, width). |
| - crop (string): crop style e.g. 'lower_middle_third' |
| - to_tensor (bool): Converts the PIL Image (H x W x C) in the range [0, 255] to a torch.FloatTensor of shape (C x H x W) in the range [0.0, 1.0] |
| - normalize (tuple of lists [r, g, b] or None): Mean and standard deviation for normalization. |
| |
| Returns: |
| PyTorch image transformation function. |
| """ |
| transform_list = [] |
|
|
| if crop is not None: |
| transform_list.append( |
| transforms.Lambda(partial(self.custom_crop, crop_style=crop)) |
| ) |
|
|
| if resize is not None: |
| if isinstance(resize, int): |
| resize = (resize, resize) |
| transform_list.append(transforms.Resize(resize)) |
|
|
| if to_tensor: |
| transform_list.append(transforms.ToTensor()) |
|
|
| if normalize is not None: |
| transform_list.append(transforms.Normalize(*normalize)) |
|
|
| composed_transform = transforms.Compose(transform_list) |
| return composed_transform |
|
|
| def preprocessing(self, img_data_raw, transform): |
| transform = self.transform(**transform) |
| img_data = torch.stack([transform(img) for img in img_data_raw]) |
| return img_data |
|
|
| def load_model(self, model): |
| model_path = self.model_root / model |
| |
| if not os.path.exists(model_path): |
| print( |
| f"Model file not found at {model_path}. Downloading from Hugging Face..." |
| ) |
| try: |
| os.makedirs(self.model_root, exist_ok=True) |
| model_path = hf_hub_download( |
| repo_id=self.hf_model_repo, filename=model, local_dir=self.model_root |
| ) |
| print(f"Model file downloaded to {model_path}.") |
| except Exception as e: |
| print(f"An unexpected error occurred while downloading the model: {e}") |
| return None, {}, False |
| |
| model_state = torch.load(model_path, map_location=self.device) |
| model_name = model_state["model_name"] |
| is_regression = model_state["is_regression"] |
| class_to_idx = model_state["class_to_idx"] |
| num_classes = 1 if is_regression else len(class_to_idx.items()) |
| model_state_dict = model_state["model_state_dict"] |
| model_cls = model_mapping[model_name] |
| model = model_cls(num_classes=num_classes) |
| model.load_state_dict(model_state_dict) |
|
|
| return model, class_to_idx, is_regression |
|
|
| def predict(self, model, data): |
| model.to(self.device) |
| model.eval() |
|
|
| image_batch = data.to(self.device) |
|
|
| with torch.no_grad(): |
| batch_outputs = model(image_batch) |
| |
| batch_values = model.get_class_probabilities(batch_outputs) |
|
|
| return batch_values |
| |
| @staticmethod |
| def predict_value_to_class(batch_values, class_to_idx, ids, level=""): |
| columns = ["id", "level", "value", "class"] |
| batch_size = list(batch_values.shape) |
| if len(batch_size) < 2: |
| batch_size = [batch_size[0], 1] |
| df = pd.DataFrame(columns=columns, index=range(batch_size[0] * batch_size[1])) |
| idx_to_class = {i: cls for cls, i in class_to_idx.items()} |
| |
| if batch_size[1] == 1: |
| batch_classes = [ |
| idx_to_class[ |
| min( |
| max(idx.item(), min(list(class_to_idx.values()))), |
| max(list(class_to_idx.values())), |
| ) |
| ] |
| for idx in batch_values.round().int() |
| ] |
| i = 0 |
| for id, value, cls in zip(ids, batch_values, batch_classes): |
| df.iloc[i] = [id, level, value.item(), cls] |
| i += 1 |
| else: |
| batch_classes = [idx_to_class[idx.item()] for idx in torch.argmax(batch_values, dim=1)] |
| i = 0 |
| for id, values in zip(ids, batch_values): |
| for idx, value in enumerate(values.tolist()): |
| df.iloc[i] = [id, level, value, idx_to_class[idx]] |
| i += 1 |
| |
| return df, batch_classes |
|
|
| def batch_classifications(self, img_data_raw, img_ids=None): |
| |
| if img_ids is None: |
| img_ids = range(len(img_data_raw)) |
|
|
| df = pd.DataFrame() |
|
|
| |
| level = "road_type" |
| model_file = self.models.get(level) |
| if model_file is not None: |
| model, class_to_idx, _ = self.load_model(model=model_file) |
| if model is None: |
| print(f"Road type model '{model_file}' is not found.\n" |
| + "Road type prediction is skipped.") |
| else: |
| data = self.preprocessing(img_data_raw, self.transform_road_type) |
| values = self.predict(model, data) |
| df_tmp, _ = self.predict_value_to_class( |
| values, |
| class_to_idx, |
| img_ids, |
| level, |
| ) |
| df = pd.concat([df, df_tmp], ignore_index=True) |
|
|
| |
| level = "surface_type" |
| model_file = self.models.get(level) |
| if model_file is not None: |
| model, class_to_idx, _ = self.load_model(model=model_file) |
| if model is None: |
| print(f"Surface type model '{model_file}' is not found.\n" |
| + "Surface type prediction is skipped.") |
| else: |
| data = self.preprocessing(img_data_raw, self.transform_surface) |
| values = self.predict(model, data) |
| df_tmp, classes = self.predict_value_to_class( |
| values, |
| class_to_idx, |
| img_ids, |
| level, |
| ) |
| df = pd.concat([df, df_tmp], ignore_index=True) |
|
|
| |
| level = "surface_quality" |
| sub_models = self.models.get(level) |
| if sub_models is not None: |
| surface_indices = {} |
| for i, surface_type in enumerate(classes): |
| if surface_type not in surface_indices: |
| surface_indices[surface_type] = [] |
| surface_indices[surface_type].append(i) |
|
|
| for surface_type, indices in surface_indices.items(): |
| model_file = sub_models.get(surface_type) |
| if model_file is not None: |
| model, class_to_idx, _ = self.load_model(model=model_file) |
| if model is None: |
| print(f"Quality model '{model_file}' is not found.\n" |
| + f"Quality prediction is skipped for surface '{surface_type}'.") |
| else: |
| values = self.predict(model, data[indices]) |
| df_tmp, _ = self.predict_value_to_class( |
| values, |
| class_to_idx, |
| [img_ids[i] for i in indices], |
| level, |
| ) |
| df = pd.concat([df, df_tmp], ignore_index=True) |
|
|
| return df |
|
|
|
|
| class CustomEfficientNetV2SLinear(nn.Module): |
| def __init__(self, num_classes, avg_pool=1): |
| super(CustomEfficientNetV2SLinear, self).__init__() |
|
|
| model = models.efficientnet_v2_s(weights="IMAGENET1K_V1") |
| |
| in_features = model.classifier[-1].in_features * (avg_pool * avg_pool) |
| fc = nn.Linear(in_features, num_classes, bias=True) |
| model.classifier[-1] = fc |
|
|
| self.features = model.features |
| self.avgpool = nn.AdaptiveAvgPool2d(avg_pool) |
| self.classifier = model.classifier |
| if num_classes == 1: |
| self.criterion = nn.MSELoss |
| self.is_regression = True |
| else: |
| self.criterion = nn.CrossEntropyLoss |
| self.is_regression = False |
|
|
| def get_class_probabilities(self, x): |
| if self.is_regression: |
| x = x.flatten() |
| else: |
| x = nn.functional.softmax(x, dim=1) |
| return x |
|
|
| def forward(self, x: Tensor) -> Tensor: |
| x = self.features(x) |
|
|
| x = self.avgpool(x) |
| x = torch.flatten(x, 1) |
|
|
| x = self.classifier(x) |
|
|
| return x |
|
|
| |
| |
|
|
|
|
| |
| const = { |
| "EFFNET_LINEAR": "efficientNetV2SLinear", |
| "CROP_LOWER_MIDDLE_HALF": "lower_middle_half", |
| "CROP_LOWER_HALF": "lower_half", |
| "NORM_MEAN": [0.42834484577178955, 0.4461250305175781, 0.4350937306880951], |
| "NORM_SD": [0.22991590201854706, 0.23555299639701843, 0.26348039507865906], |
| } |
|
|
| model_mapping = { |
| const["EFFNET_LINEAR"]: CustomEfficientNetV2SLinear, |
| } |
|
|