# Copyright (c) Microsoft Corporation. # SPDX-License-Identifier: Apache-2.0 # DeepSpeed Team """ Functionality of swapping optimizer tensors to/from (NVMe) storage devices. """ import argparse import os from .test_ds_aio_utils import refine_integer_value from .ds_aio_constants import AIO_HANDLE, AIO_BASIC, TORCH_FAST_IO, TORCH_IO, VALID_ENGINES from deepspeed.accelerator import get_accelerator MAPPING_DELIMITER = ':' def refine_args(args): if args.io_size and type(args.io_size) == str: args.io_size = refine_integer_value(args.io_size) if args.block_size and type(args.block_size) == str: args.block_size = refine_integer_value(args.block_size) if args.fast_io_size and type(args.fast_io_size) == str: args.fast_io_size = refine_integer_value(args.fast_io_size) return args def _get_mapping_dict(args): if args.folder is not None: d = {i: args.folder for i in range(args.multi_process)} else: d = {} for m in args.folder_to_device_mapping: fields = m.split(MAPPING_DELIMITER) d[fields[1]] = fields[0] return d def _validate_folder_mapping(args): no_error = True error_messages = [] invalid_mappings = [m for m in args.folder_to_device_mapping if MAPPING_DELIMITER not in m] if len(invalid_mappings) > 0: error_messages.append( f'Missing delimiter ({MAPPING_DELIMITER}) in folder_to_device_mapping {invalid_mappings}') no_error = False folder_list = [m.split(MAPPING_DELIMITER)[0] for m in args.folder_to_device_mapping] invalid_folders = [d for d in folder_list if not os.path.exists(d)] if len(invalid_folders) > 0: error_messages.append(f'Invalid folders in folder_to_device_mapping: {invalid_folders}') no_error = False if args.gpu: device_list = [int(m.split(MAPPING_DELIMITER)[1]) for m in args.folder_to_device_mapping] invalid_device_list = [dev_id for dev_id in device_list if not dev_id < get_accelerator().device_count()] if len(invalid_device_list) > 0: error_messages.append(f'Invalid device ids in folder_to_device_mapping: {invalid_device_list}') no_error = False return no_error, error_messages def validate_args(args): no_error = True error_messages = [] if args.folder is not None and len(args.folder_to_device_mapping) > 0: error_messages.append(f'--folder and --folder_to_device_mapping cannot be specified together.') no_error = False elif args.folder is None and len(args.folder_to_device_mapping) == 0: error_messages.append(f'At least one of --folder or --folder_to_device_mapping must be specified.') no_error = False # Validate --folder if args.folder is not None and not os.path.exists(args.folder): no_error = False error_messages.append(f'Invalid folder in --folder: {args.folder} ') # Validate --folder_mapping_to_device if len(args.folder_to_device_mapping) > 0: no_mapping_error, mapping_error_messages = _validate_folder_mapping(args) no_error = no_error and no_mapping_error error_messages += mapping_error_messages # Validate --engine if args.engine not in VALID_ENGINES: no_error = False error_messages.append(f'Invalid engine {args.engine}. Valid options = {VALID_ENGINES}') # Validate --engine=torch_io if args.engine == TORCH_IO: if args.read: no_error = False error_messages.append(f'Read not currently supported for --engine={TORCH_IO}') if not no_error: print(f'Found {len(error_messages)} validation error(s)') # Validate --gpu, --use_gds if args.use_gds and not args.gpu: error_messages.append(f'--gpu must be set to transfer with --use_gds') no_error = False if not no_error: print(f'Found {len(error_messages)} validation errors') for i, msg in enumerate(error_messages): print(f'{i+1}: {msg}') return no_error def parse_arguments(): parser = argparse.ArgumentParser() parser.add_argument('--folder', default=None, type=str, help='Folder to use for I/O.') parser.add_argument('--folder_to_device_mapping', default=[], nargs='+', help='Specification of mapping of folder to (gpu) device id, (ignored for cpu accesses).' 'Can be specified multiple times for multi-process runs,' 'e.g. --folder_to_device_mapping /mnt/nvme0:0 --folder_to_device_mapping /mnt/nvme1:15 --gpu' 'means access /mnt/nvme0 with gpu 0 and /mnt/nvme1 with gpu 15') parser.add_argument('--io_size', type=str, default=None, required=True, help='Number of bytes to read or write.') parser.add_argument('--fast_io_size', type=str, default='64M', help='Size of fast_io pinned buffer (bytes).') parser.add_argument('--read', action='store_true', help='Perform read I/O (default is write)') parser.add_argument('--multi_process', type=int, default=1, help='Number of parallel processes doing I/O (default 1).') parser.add_argument('--block_size', type=str, default='1M', help='I/O block size. Can use K, M, or G suffix (default 1M for 1 megabytes).') parser.add_argument('--queue_depth', type=int, default=32, help='I/O queue depth (default 32).') parser.add_argument('--single_submit', action='store_true', help='Submit I/O requests in singles (default is submit queue_depth amount at once.).') parser.add_argument( '--sequential_requests', action='store_true', help= 'Delay I/O request submission until completion of prior requests (default is overlap I/O submission and completion requests.).' ) parser.add_argument('--validate', action='store_true', help='Perform validation of I/O transfer in library.') parser.add_argument( '--engine', type=str, default=AIO_HANDLE, help= f'Engine to perform I/O. Options are [{AIO_HANDLE}, {AIO_BASIC}, {TORCH_IO}, {TORCH_FAST_IO}]. Default is aio_handle' ) parser.add_argument('--loops', type=int, default=3, help='Count of operation repetitions') parser.add_argument('--io_parallel', type=int, default=None, help='Per iop parallelism') parser.add_argument('--gpu', action='store_true', help='Use GPU memory') parser.add_argument('--use_gds', action='store_true', help='Enable GDS AIO') parser.add_argument('--slow_bounce_buffer', action='store_true', help='For GPU memory transfers, measure impact of bounce buffer pinning on critical path.') parser.add_argument('--torch_legacy_save', action='store_true', help='Use torch legacy save approach') parser.add_argument('--use_accelerator_pin_memory', action='store_true', help='Obtain pinned (CPU page-locked) tensors from accelerator') parser.add_argument('--warmup_loops', type=int, default=1, help='Count of operation warmup repetitions') parser.add_argument('--include_warmup_time', action='store_true', help='Include warmup latency in results') parser.add_argument('--different_file_each_iteration', action='store_true', help='Read/write a different file on each iteration.') args = parser.parse_args() print(f'args = {args}') return args def get_validated_args(): args = parse_arguments() args = refine_args(args) if not validate_args(args): quit() print(f'Successful validation of command line arguments') args.total_loops = args.warmup_loops + args.loops peer_tag = 'gpu' if args.gpu else 'process' args.mapping_dict = _get_mapping_dict(args) args.mapping_list = [(device_id, folder) for device_id, folder in args.mapping_dict.items()] assert len(args.mapping_dict) == len(args.mapping_list) print(f'Configuring {len(args.mapping_list)} {peer_tag} to folder mapping') for i, (device_id, folder) in enumerate(args.mapping_list): print(f'[{i}]: {peer_tag} {device_id} <----> {folder}') return args