|
from .configuration_mobilenetv3_large_100 import RyzenAIORTModelForImageClassificationConfig |
|
import numpy as np |
|
import time |
|
import os |
|
import torch |
|
from typing import Optional, Union |
|
from pathlib import Path |
|
import onnxruntime as ort |
|
import datasets |
|
from transformers.image_utils import load_image |
|
from transformers import AutoConfig, AutoModel, AutoModelForImageClassification |
|
from transformers.modeling_utils import custom_object_save |
|
from transformers.modeling_outputs import ModelOutput |
|
from huggingface_hub import hf_hub_download |
|
from huggingface_hub.utils import EntryNotFoundError |
|
from optimum.onnxruntime import ORTModelForCustomTasks |
|
from optimum.onnxruntime.modeling_ort import logger |
|
from optimum.utils.save_utils import maybe_load_preprocessors |
|
|
|
|
|
class RyzenAIORTModelForImageClassification(ORTModelForCustomTasks): |
|
model_type = "RyzenAIORTModelForImageClassification_mobilenetv3_large_100" |
|
config_class = AutoConfig |
|
auto_model_class = AutoModelForImageClassification |
|
|
|
def __init__(self, |
|
model: ort.InferenceSession, |
|
config: RyzenAIORTModelForImageClassificationConfig, |
|
**kwargs): |
|
super().__init__(model=model, config=config, **kwargs) |
|
|
|
def _forward(self, **kwargs): |
|
return super().forward(**kwargs) |
|
|
|
def forward(self, **kwargs): |
|
pixel_values = kwargs.pop("pixel_values") |
|
kwargs["MobileNetV3::input_0"] = pixel_values |
|
res = dict(self._forward(**kwargs)) |
|
logits = res.pop("2208") |
|
res["logits"] = logits |
|
res = ModelOutput(res) |
|
return res |
|
|
|
def infer(self, image): |
|
image = load_image(image) |
|
output = self.forward(**self.preprocessors[0]([image], return_tensors="np")) |
|
top5_probabilities, top5_class_indices = torch.topk(torch.nn.functional.softmax(torch.tensor(output["logits"]), dim=-1), k=5) |
|
return top5_probabilities, top5_class_indices |
|
|
|
def eval_imagenet(self, dataset="imagenet-1k", split="valid"): |
|
|
|
batch_time = AverageMeter() |
|
top1 = AverageMeter() |
|
top5 = AverageMeter() |
|
end = time.time() |
|
print_freq = 10 |
|
def transforms(examples): |
|
images = [img.convert("RGB") for img in examples["image"]] |
|
examples["pixel_values"] = images |
|
return examples |
|
dataset = datasets.load_dataset("zh-plus/tiny-imagenet", split=split) |
|
dataset.set_transform(transforms) |
|
for i, data in enumerate(dataset): |
|
label = data['label'] |
|
output = self.forward(**self.preprocessors[0]([data["pixel_values"]], return_tensors="np")) |
|
logits = output["logits"] |
|
|
|
|
|
n = 1 |
|
prec1, prec5 = accuracy_np(logits, np.array([label])) |
|
top1.update(prec1.item(), n) |
|
top5.update(prec5.item(), n) |
|
|
|
|
|
batch_time.update(time.time() - end) |
|
end = time.time() |
|
if i % print_freq == 0: |
|
print( |
|
f'Test: [{i}/{len(dataset)}]\t' |
|
f'Time {batch_time.val:.3f} ({batch_time.avg:.3f}, {n / batch_time.avg:.3f}/s, ' |
|
f'{100 * batch_time.avg / n:.3f} ms/sample) \t' |
|
f'Prec@1 {top1.val:.3f} ({top1.avg:.3f})\t' |
|
f'Prec@5 {top5.val:.3f} ({top5.avg:.3f})' |
|
) |
|
|
|
print(f' * Prec@1 {top1.avg:.3f} ({100-top1.avg:.3f}) Prec@5 {top5.avg:.3f} ({100.-top5.avg:.3f})') |
|
|
|
@classmethod |
|
def register_for_auto_class(cls, auto_class="AutoModel"): |
|
""" |
|
Register this class with a given auto class. This should only be used for custom models as the ones in the |
|
library are already mapped with an auto class. |
|
|
|
<Tip warning={true}> |
|
|
|
This API is experimental and may have some slight breaking changes in the next releases. |
|
|
|
</Tip> |
|
|
|
Args: |
|
auto_class (`str` or `type`, *optional*, defaults to `"AutoModel"`): |
|
The auto class to register this new model with. |
|
""" |
|
if not isinstance(auto_class, str): |
|
auto_class = auto_class.__name__ |
|
|
|
import transformers.models.auto as auto_module |
|
|
|
if not hasattr(auto_module, auto_class): |
|
raise ValueError(f"{auto_class} is not a valid auto class.") |
|
|
|
cls._auto_class = auto_class |
|
|
|
def save_pretrained( |
|
self, |
|
save_directory: Union[str, os.PathLike], |
|
push_to_hub: bool = False, |
|
**kwargs, |
|
): |
|
""" |
|
Saves a model and its configuration file to a directory, so that it can be re-loaded using the |
|
[`from_pretrained`] class method. |
|
|
|
Args: |
|
save_directory (`Union[str, os.PathLike]`): |
|
Directory to which to save. Will be created if it doesn't exist. |
|
push_to_hub (`bool`, *optional*, defaults to `False`): |
|
Whether or not to push your model to the Hugging Face model hub after saving it. |
|
|
|
<Tip warning={true}> |
|
|
|
Using `push_to_hub=True` will synchronize the repository you are pushing to with `save_directory`, |
|
which requires `save_directory` to be a local clone of the repo you are pushing to if it's an existing |
|
folder. Pass along `temp_dir=True` to use a temporary directory instead. |
|
|
|
</Tip> |
|
""" |
|
if os.path.isfile(save_directory): |
|
logger.error(f"Provided path ({save_directory}) should be a directory, not a file") |
|
return |
|
|
|
os.makedirs(save_directory, exist_ok=True) |
|
|
|
for preprocessor in self.preprocessors: |
|
preprocessor.save_pretrained(save_directory) |
|
self._save_pretrained(save_directory) |
|
if self._auto_class is not None: |
|
custom_object_save(self, save_directory, config=self.config) |
|
self._save_config(save_directory) |
|
|
|
if push_to_hub: |
|
return self.push_to_hub(save_directory, **kwargs) |
|
|
|
@staticmethod |
|
def _cached_file( |
|
model_path: Union[Path, str], |
|
use_auth_token: Optional[Union[bool, str]] = None, |
|
revision: Optional[str] = None, |
|
force_download: bool = False, |
|
cache_dir: Optional[str] = None, |
|
file_name: Optional[str] = None, |
|
subfolder: str = "", |
|
local_files_only: bool = False, |
|
): |
|
model_path = Path(model_path) |
|
|
|
|
|
if model_path.is_dir(): |
|
model_cache_path = model_path / file_name |
|
preprocessors = maybe_load_preprocessors(model_path.as_posix(), trust_remote_code=True) |
|
else: |
|
model_cache_path = hf_hub_download( |
|
repo_id=model_path.as_posix(), |
|
filename=file_name, |
|
subfolder=subfolder, |
|
use_auth_token=use_auth_token, |
|
revision=revision, |
|
cache_dir=cache_dir, |
|
force_download=force_download, |
|
local_files_only=local_files_only, |
|
) |
|
|
|
try: |
|
hf_hub_download( |
|
repo_id=model_path.as_posix(), |
|
subfolder=subfolder, |
|
filename=file_name + "_data", |
|
use_auth_token=use_auth_token, |
|
revision=revision, |
|
cache_dir=cache_dir, |
|
force_download=force_download, |
|
local_files_only=local_files_only, |
|
) |
|
except EntryNotFoundError: |
|
|
|
pass |
|
|
|
model_cache_path = Path(model_cache_path) |
|
preprocessors = maybe_load_preprocessors(model_path.as_posix(), subfolder=subfolder, trust_remote_code=True) |
|
|
|
return model_cache_path, preprocessors |
|
|
|
|
|
class AverageMeter: |
|
"""Computes and stores the average and current value""" |
|
def __init__(self): |
|
self.reset() |
|
|
|
def reset(self): |
|
self.val = 0 |
|
self.avg = 0 |
|
self.sum = 0 |
|
self.count = 0 |
|
|
|
def update(self, val, n=1): |
|
self.val = val |
|
self.sum += val * n |
|
self.count += n |
|
self.avg = self.sum / self.count |
|
|
|
|
|
def accuracy_np(output, target): |
|
max_indices = np.argsort(output, axis=1)[:, ::-1] |
|
top5 = 100 * np.equal(max_indices[:, :5], target[:, np.newaxis]).sum(axis=1).mean() |
|
top1 = 100 * np.equal(max_indices[:, 0], target).mean() |
|
return top1, top5 |
|
|