From 0b5252bbd3347a25e1a541119129be269a6899cc Mon Sep 17 00:00:00 2001 From: "Ma, Guokai" Date: Thu, 13 Apr 2023 07:00:34 +0800 Subject: [PATCH] [CPU support] Optionally bind each rank to different cores on host (#2881) * add fallback path for kernels used in megatron * temporary numactl WA for SPR 56core * adapt core allocation according to number of ranks * add switch to turn on numactl * detect number of cores on the system * allow select a subset of the cores on the system to bind * remove unneeded changes * use current_env to set OMP_NUM_THREADS in subprocess * add test for ds_arguments * change --bind_cores_to_rank option to store_true * add test for parse_range_list * add comment for parse range list * add test for parse range list, rewrite parse_range_list * fix format error * fix format * add -m parameter to numactl when necessary * Check KMP_AFFINITY to avoid conflict with numactl * fix format * negative case for parse_range_list * detect whether numactl is installed before use numactl to bind cores * check numactl with package manager of distro --------- Co-authored-by: sdp Co-authored-by: Olatunji Ruwase --- deepspeed/launcher/launch.py | 133 ++++++++++++++++++++++- deepspeed/launcher/runner.py | 13 +++ tests/unit/launcher/test_ds_arguments.py | 33 ++++++ 3 files changed, 178 insertions(+), 1 deletion(-) diff --git a/deepspeed/launcher/launch.py b/deepspeed/launcher/launch.py index fc188d0baa5c..d84fe9586b96 100755 --- a/deepspeed/launcher/launch.py +++ b/deepspeed/launcher/launch.py @@ -19,6 +19,7 @@ import time import signal import psutil +import distutils from collections import defaultdict from typing import Dict from argparse import ArgumentParser, REMAINDER @@ -89,6 +90,18 @@ def parse_args(): type=str, help="redirect the stdout and stderr from each rank into different log files") + parser.add_argument("--bind_cores_to_rank", + action="store_true", + help="Bind each rank to different cores of the host. " + "This improves host efficiency especially for CPU backend") + + parser.add_argument("--bind_core_list", + type=str, + default=None, + help="List of cores to bind to with comma separated list of " + "numbers and range. i.e. 1,3-5,7 => [1,3,4,5,7]. When not " + "specified, all cores on system would be used rank binding") + # positional parser.add_argument("training_script", type=str, @@ -117,6 +130,89 @@ def terminate_process_tree(pid): p.kill() +def parse_range(rng): + try: + value = int(rng) + return range(value, value + 1) + except ValueError: + # value is not a single number + parts = rng.split('-') + if len(parts) != 2: + raise ValueError("Bad range: '%s', range must be either a number or two number separated by dash" % + (rng, )) + start = int(parts[0]) + end = int(parts[1]) + if start > end: + raise ValueError("Bad range: '%s', range end must larger than or equal to start" % (rng, )) + return range(start, end + 1) + + +# parse comma and dash separated range list into list +# i.e. "0,2-4,6" --> [0, 2, 3, 4, 6] +# rules: +# 1. Range list numser be comma sepeaated, each item are either a single number, +# or a range marked by two numbers (both number are included in the range) +# 2. Sub ranges must be in ascend order and not overlap with each other +# 3. No space in the range expression +def parse_range_list(range_str): + number_list = [] + last = -1 + range_list = range_str.split(',') + for sub_range in range_list: + sub_number_list = parse_range(sub_range) + if sub_number_list[0] <= last: + raise ValueError( + "Bad range: '%s', sub ranges must not overlap with each other and should be in ascend order" % + (range_str, )) + last = sub_number_list[-1] + number_list.extend(sub_number_list) + return number_list + + +# return a list of list for cores to numa mapping +# [ +# [ cores for numa 0 ] +# [ cores belong to numa 1 ] +# ... +# ] +def get_numa_cores(): + ret = [] + output = subprocess.check_output(['numactl', '--hardware']).decode("utf-8") + lines = output.split('\n') + for line in lines: + if line.startswith('available:'): + num_numas = int(line.split(' ')[1]) + break + for numa in range(num_numas): + for line in lines: + if line.startswith(f'node {numa} cpus:'): + cores = line.split(' ')[3:] + ret.append([int(core) for core in cores]) + return ret + + +def check_for_numactl_pkg(): + libs = dict( + dpkg=["-l", "numactl", "apt"], + pacman=["-Q", "numactl", "pacman"], + rpm=["-q", "numactl", "yum"], + ) + + found = False + for pkgmgr, data in libs.items(): + flag, lib, tool = data + path = distutils.spawn.find_executable(pkgmgr) + if path is not None: + cmd = f"{pkgmgr} {flag} {lib}" + result = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + if result.wait() == 0: + found = True + else: + print(f"please install the {lib} package with {tool}") + break + return found + + def main(): args = parse_args() current_env = os.environ.copy() @@ -211,8 +307,43 @@ def main(): # spawn the processes cmd = [] + if args.bind_cores_to_rank: + check_for_numactl_pkg() + if 'KMP_AFFINITY' in os.environ.keys(): + raise ValueError("Environment variable KMP_AFFINITY conflicts with numactl " + "because it interfere with how many CPU cores numactl can set. " + "Unset KMP_AFFINITY before launching deepspeed.\n\n" + "\t$ unset KMP_AFFINITY\n" + "\t$ deepspeed ") + if args.bind_core_list != None: + core_list = parse_range_list(args.bind_core_list) + total_cores = len(core_list) + else: + total_cores = psutil.cpu_count(logical=False) + core_list = range(total_cores) + cores_per_rank = total_cores // num_local_procs + assert cores_per_rank >= 1, "At least one core needs to be assigned to each rank" + core_list_for_rank = core_list[cores_per_rank * local_rank:cores_per_rank * (local_rank + 1)] + current_env["OMP_NUM_THREADS"] = f"{cores_per_rank}" + cmd.append("numactl") + + # check if all cores belong to same numa, if true, bind process to that numa domain with -m parameter + numa_cores = get_numa_cores() + num_numas = len(numa_cores) + for i in range(num_numas): + if set(core_list_for_rank) <= set(numa_cores[i]): + cmd.append("-m") + cmd.append(f"{i}") + break + + cmd.append("-C") + core_list_str = f"{core_list_for_rank[0]}" + for core_id in core_list_for_rank[1:]: + core_list_str = f"{core_list_str},{core_id}" + cmd.append(f"{core_list_str}") if not args.no_python: - cmd = [sys.executable, "-u"] + cmd.append(sys.executable) + cmd.append("-u") if args.module: cmd.append("-m") else: diff --git a/deepspeed/launcher/runner.py b/deepspeed/launcher/runner.py index e3c15e119e57..cc34af81b2fe 100755 --- a/deepspeed/launcher/runner.py +++ b/deepspeed/launcher/runner.py @@ -173,6 +173,15 @@ def parse_args(args=None): parser.add_argument("user_script", type=str, help="User script to launch, followed by any required " "arguments.") parser.add_argument('user_args', nargs=argparse.REMAINDER) + parser.add_argument("--bind_cores_to_rank", + action="store_true", + help="Bind each rank to different cores of the host") + parser.add_argument("--bind_core_list", + type=str, + default=None, + help="List of cores to bind to with comma separated list of " + "numbers and range. i.e. 1,3-5,7 => [1,3,4,5,7]. When not " + "specified, all cores on system would be used rank binding") return parser.parse_args(args=args) @@ -481,6 +490,10 @@ def main(args=None): deepspeed_launch.append("--enable_elastic_training") deepspeed_launch.append(f"--max_elastic_nodes={args.max_elastic_nodes}") deepspeed_launch.append(f"--min_elastic_nodes={args.min_elastic_nodes}") + if args.bind_cores_to_rank: + deepspeed_launch.append("--bind_cores_to_rank") + if args.bind_core_list != None: + deepspeed_launch.append(f"--bind_core_list={args.bind_core_list}") cmd = deepspeed_launch + [args.user_script] + args.user_args else: args.launcher = args.launcher.lower() diff --git a/tests/unit/launcher/test_ds_arguments.py b/tests/unit/launcher/test_ds_arguments.py index 7defcb3590aa..7155beebc902 100644 --- a/tests/unit/launcher/test_ds_arguments.py +++ b/tests/unit/launcher/test_ds_arguments.py @@ -6,6 +6,7 @@ import argparse import pytest import deepspeed +from deepspeed.launcher.launch import parse_range_list def basic_parser(): @@ -98,3 +99,35 @@ def test_core_deepscale_arguments(): assert hasattr(args, 'deepspeed_config') assert type(args.deepspeed_config) == str assert args.deepspeed_config == 'foo.json' + + +def test_core_binding_arguments(): + core_list = parse_range_list("0,2-4,6,8-9") + assert core_list == [0, 2, 3, 4, 6, 8, 9] + + try: + # negative case for range overlapping + core_list = parse_range_list("0,2-6,5-9") + except ValueError as e: + pass + else: + # invalid core list must fail + assert False + + try: + # negative case for reverse order -- case 1 + core_list = parse_range_list("8,2-6") + except ValueError as e: + pass + else: + # invalid core list must fail + assert False + + try: + # negative case for reverse order -- case 2 + core_list = parse_range_list("1,6-2") + except ValueError as e: + pass + else: + # invalid core list must fail + assert False