File size: 8,575 Bytes
73eff0b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
from .configuration_mobilenetv3_large_100 import RyzenAIORTModelForImageClassificationConfig
import numpy as np
import time
import os
import torch
from typing import Optional, Union
from pathlib import Path
import onnxruntime as ort
import datasets
from transformers.image_utils import load_image
from transformers import AutoConfig, AutoModel, AutoModelForImageClassification
from transformers.modeling_utils import custom_object_save
from transformers.modeling_outputs import ModelOutput
from huggingface_hub import hf_hub_download
from huggingface_hub.utils import EntryNotFoundError
from optimum.onnxruntime import ORTModelForCustomTasks
from optimum.onnxruntime.modeling_ort import logger
from optimum.utils.save_utils import maybe_load_preprocessors


class RyzenAIORTModelForImageClassification(ORTModelForCustomTasks):
    model_type = "RyzenAIORTModelForImageClassification_mobilenetv3_large_100"
    config_class = AutoConfig
    auto_model_class = AutoModelForImageClassification

    def __init__(self,
        model: ort.InferenceSession,
        config: RyzenAIORTModelForImageClassificationConfig,
        **kwargs):
        super().__init__(model=model, config=config, **kwargs)

    def _forward(self, **kwargs):
        return super().forward(**kwargs)

    def forward(self, **kwargs):
        pixel_values = kwargs.pop("pixel_values")
        kwargs["MobileNetV3::input_0"] = pixel_values
        res = dict(self._forward(**kwargs))
        logits = res.pop("2208")
        res["logits"] = logits
        res = ModelOutput(res)
        return res

    def infer(self, image):
        image = load_image(image)
        output = self.forward(**self.preprocessors[0]([image], return_tensors="np"))
        top5_probabilities, top5_class_indices = torch.topk(torch.nn.functional.softmax(torch.tensor(output["logits"]), dim=-1), k=5)
        return top5_probabilities, top5_class_indices 

    def eval_imagenet(self, dataset="imagenet-1k", split="valid"):

        batch_time = AverageMeter()
        top1 = AverageMeter()
        top5 = AverageMeter()
        end = time.time()
        print_freq = 10
        def transforms(examples):
            images = [img.convert("RGB") for img in examples["image"]]
            examples["pixel_values"] = images
            return examples
        dataset = datasets.load_dataset("zh-plus/tiny-imagenet", split=split)
        dataset.set_transform(transforms)
        for i, data in enumerate(dataset):
            label = data['label']
            output = self.forward(**self.preprocessors[0]([data["pixel_values"]], return_tensors="np"))
            logits = output["logits"]

            # measure accuracy and record loss
            n = 1
            prec1, prec5 = accuracy_np(logits, np.array([label]))
            top1.update(prec1.item(), n)
            top5.update(prec5.item(), n)

            # measure elapsed time
            batch_time.update(time.time() - end)
            end = time.time()
            if i % print_freq == 0:
                print(
                    f'Test: [{i}/{len(dataset)}]\t'
                    f'Time {batch_time.val:.3f} ({batch_time.avg:.3f}, {n / batch_time.avg:.3f}/s, '
                    f'{100 * batch_time.avg / n:.3f} ms/sample) \t'
                    f'Prec@1 {top1.val:.3f} ({top1.avg:.3f})\t'
                    f'Prec@5 {top5.val:.3f} ({top5.avg:.3f})'
                )

        print(f' * Prec@1 {top1.avg:.3f} ({100-top1.avg:.3f}) Prec@5 {top5.avg:.3f} ({100.-top5.avg:.3f})')

    @classmethod
    def register_for_auto_class(cls, auto_class="AutoModel"):
        """
        Register this class with a given auto class. This should only be used for custom models as the ones in the
        library are already mapped with an auto class.

        <Tip warning={true}>

        This API is experimental and may have some slight breaking changes in the next releases.

        </Tip>

        Args:
            auto_class (`str` or `type`, *optional*, defaults to `"AutoModel"`):
                The auto class to register this new model with.
        """
        if not isinstance(auto_class, str):
            auto_class = auto_class.__name__

        import transformers.models.auto as auto_module

        if not hasattr(auto_module, auto_class):
            raise ValueError(f"{auto_class} is not a valid auto class.")

        cls._auto_class = auto_class

    def save_pretrained(
        self,
        save_directory: Union[str, os.PathLike],
        push_to_hub: bool = False,
        **kwargs,
    ):
        """
        Saves a model and its configuration file to a directory, so that it can be re-loaded using the
        [`from_pretrained`] class method.

        Args:
            save_directory (`Union[str, os.PathLike]`):
                Directory to which to save. Will be created if it doesn't exist.
            push_to_hub (`bool`, *optional*, defaults to `False`):
                Whether or not to push your model to the Hugging Face model hub after saving it.

                <Tip warning={true}>

                Using `push_to_hub=True` will synchronize the repository you are pushing to with `save_directory`,
                which requires `save_directory` to be a local clone of the repo you are pushing to if it's an existing
                folder. Pass along `temp_dir=True` to use a temporary directory instead.

                </Tip>
        """
        if os.path.isfile(save_directory):
            logger.error(f"Provided path ({save_directory}) should be a directory, not a file")
            return

        os.makedirs(save_directory, exist_ok=True)

        for preprocessor in self.preprocessors:
            preprocessor.save_pretrained(save_directory)
        self._save_pretrained(save_directory)
        if self._auto_class is not None:
            custom_object_save(self, save_directory, config=self.config)
        self._save_config(save_directory)

        if push_to_hub:
            return self.push_to_hub(save_directory, **kwargs)

    @staticmethod
    def _cached_file(
        model_path: Union[Path, str],
        use_auth_token: Optional[Union[bool, str]] = None,
        revision: Optional[str] = None,
        force_download: bool = False,
        cache_dir: Optional[str] = None,
        file_name: Optional[str] = None,
        subfolder: str = "",
        local_files_only: bool = False,
    ):
        model_path = Path(model_path)

        # locates a file in a local folder and repo, downloads and cache it if necessary.
        if model_path.is_dir():
            model_cache_path = model_path / file_name
            preprocessors = maybe_load_preprocessors(model_path.as_posix(), trust_remote_code=True)
        else:
            model_cache_path = hf_hub_download(
                repo_id=model_path.as_posix(),
                filename=file_name,
                subfolder=subfolder,
                use_auth_token=use_auth_token,
                revision=revision,
                cache_dir=cache_dir,
                force_download=force_download,
                local_files_only=local_files_only,
            )
            # try download external data
            try:
                hf_hub_download(
                    repo_id=model_path.as_posix(),
                    subfolder=subfolder,
                    filename=file_name + "_data",
                    use_auth_token=use_auth_token,
                    revision=revision,
                    cache_dir=cache_dir,
                    force_download=force_download,
                    local_files_only=local_files_only,
                )
            except EntryNotFoundError:
                # model doesn't use external data
                pass

            model_cache_path = Path(model_cache_path)
            preprocessors = maybe_load_preprocessors(model_path.as_posix(), subfolder=subfolder, trust_remote_code=True)

        return model_cache_path, preprocessors


class AverageMeter:
    """Computes and stores the average and current value"""
    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count


def accuracy_np(output, target):
    max_indices = np.argsort(output, axis=1)[:, ::-1]
    top5 = 100 * np.equal(max_indices[:, :5], target[:, np.newaxis]).sum(axis=1).mean()
    top1 = 100 * np.equal(max_indices[:, 0], target).mean()
    return top1, top5