Spaces:
Runtime error
Runtime error
| # -*- coding: utf-8 -*- | |
| import importlib | |
| from omegaconf import OmegaConf, DictConfig, ListConfig | |
| import torch | |
| import torch.distributed as dist | |
| from typing import Union | |
| def get_config_from_file(config_file: str) -> Union[DictConfig, ListConfig]: | |
| config_file = OmegaConf.load(config_file) | |
| if 'base_config' in config_file.keys(): | |
| if config_file['base_config'] == "default_base": | |
| base_config = OmegaConf.create() | |
| # base_config = get_default_config() | |
| elif config_file['base_config'].endswith(".yaml"): | |
| base_config = get_config_from_file(config_file['base_config']) | |
| else: | |
| raise ValueError(f"{config_file} must be `.yaml` file or it contains `base_config` key.") | |
| config_file = {key: value for key, value in config_file if key != "base_config"} | |
| return OmegaConf.merge(base_config, config_file) | |
| return config_file | |
| def get_obj_from_str(string, reload=False): | |
| module, cls = string.rsplit(".", 1) | |
| if reload: | |
| module_imp = importlib.import_module(module) | |
| importlib.reload(module_imp) | |
| return getattr(importlib.import_module(module, package=None), cls) | |
| def get_obj_from_config(config): | |
| if "target" not in config: | |
| raise KeyError("Expected key `target` to instantiate.") | |
| return get_obj_from_str(config["target"]) | |
| def instantiate_from_config(config, **kwargs): | |
| if "target" not in config: | |
| raise KeyError("Expected key `target` to instantiate.") | |
| cls = get_obj_from_str(config["target"]) | |
| params = config.get("params", dict()) | |
| # params.update(kwargs) | |
| # instance = cls(**params) | |
| kwargs.update(params) | |
| instance = cls(**kwargs) | |
| return instance | |
| def is_dist_avail_and_initialized(): | |
| if not dist.is_available(): | |
| return False | |
| if not dist.is_initialized(): | |
| return False | |
| return True | |
| def get_rank(): | |
| if not is_dist_avail_and_initialized(): | |
| return 0 | |
| return dist.get_rank() | |
| def get_world_size(): | |
| if not is_dist_avail_and_initialized(): | |
| return 1 | |
| return dist.get_world_size() | |
| def all_gather_batch(tensors): | |
| """ | |
| Performs all_gather operation on the provided tensors. | |
| """ | |
| # Queue the gathered tensors | |
| world_size = get_world_size() | |
| # There is no need for reduction in the single-proc case | |
| if world_size == 1: | |
| return tensors | |
| tensor_list = [] | |
| output_tensor = [] | |
| for tensor in tensors: | |
| tensor_all = [torch.ones_like(tensor) for _ in range(world_size)] | |
| dist.all_gather( | |
| tensor_all, | |
| tensor, | |
| async_op=False # performance opt | |
| ) | |
| tensor_list.append(tensor_all) | |
| for tensor_all in tensor_list: | |
| output_tensor.append(torch.cat(tensor_all, dim=0)) | |
| return output_tensor | |