Skip to content

Commit

Permalink
[CPU support] Optionally bind each rank to different cores on host (#…
Browse files Browse the repository at this point in the history
…2881)

* add fallback path for kernels used in megatron

* temporary numactl WA for SPR 56core

* adapt core allocation according to number of ranks

* add switch to turn on numactl

* detect number of cores on the system

* allow select a subset of the cores on the system to bind

* remove unneeded changes

* use current_env to set OMP_NUM_THREADS in subprocess

* add test for ds_arguments

* change --bind_cores_to_rank option to store_true

* add test for parse_range_list

* add comment for parse range list

* add test for parse range list, rewrite parse_range_list

* fix format error

* fix format

* add -m parameter to numactl when necessary

* Check KMP_AFFINITY to avoid conflict with numactl

* fix format

* negative case for parse_range_list

* detect whether numactl is installed before use numactl to bind cores

* check numactl with package manager of distro

---------

Co-authored-by: sdp <[email protected]>
Co-authored-by: Olatunji Ruwase <[email protected]>
  • Loading branch information
3 people authored Apr 12, 2023
1 parent 9408a86 commit 0b5252b
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 1 deletion.
133 changes: 132 additions & 1 deletion deepspeed/launcher/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import time
import signal
import psutil
import distutils
from collections import defaultdict
from typing import Dict
from argparse import ArgumentParser, REMAINDER
Expand Down Expand Up @@ -89,6 +90,18 @@ def parse_args():
type=str,
help="redirect the stdout and stderr from each rank into different log files")

parser.add_argument("--bind_cores_to_rank",
action="store_true",
help="Bind each rank to different cores of the host. "
"This improves host efficiency especially for CPU backend")

parser.add_argument("--bind_core_list",
type=str,
default=None,
help="List of cores to bind to with comma separated list of "
"numbers and range. i.e. 1,3-5,7 => [1,3,4,5,7]. When not "
"specified, all cores on system would be used rank binding")

# positional
parser.add_argument("training_script",
type=str,
Expand Down Expand Up @@ -117,6 +130,89 @@ def terminate_process_tree(pid):
p.kill()


def parse_range(rng):
try:
value = int(rng)
return range(value, value + 1)
except ValueError:
# value is not a single number
parts = rng.split('-')
if len(parts) != 2:
raise ValueError("Bad range: '%s', range must be either a number or two number separated by dash" %
(rng, ))
start = int(parts[0])
end = int(parts[1])
if start > end:
raise ValueError("Bad range: '%s', range end must larger than or equal to start" % (rng, ))
return range(start, end + 1)


# parse comma and dash separated range list into list
# i.e. "0,2-4,6" --> [0, 2, 3, 4, 6]
# rules:
# 1. Range list numser be comma sepeaated, each item are either a single number,
# or a range marked by two numbers (both number are included in the range)
# 2. Sub ranges must be in ascend order and not overlap with each other
# 3. No space in the range expression
def parse_range_list(range_str):
number_list = []
last = -1
range_list = range_str.split(',')
for sub_range in range_list:
sub_number_list = parse_range(sub_range)
if sub_number_list[0] <= last:
raise ValueError(
"Bad range: '%s', sub ranges must not overlap with each other and should be in ascend order" %
(range_str, ))
last = sub_number_list[-1]
number_list.extend(sub_number_list)
return number_list


# return a list of list for cores to numa mapping
# [
# [ cores for numa 0 ]
# [ cores belong to numa 1 ]
# ...
# ]
def get_numa_cores():
ret = []
output = subprocess.check_output(['numactl', '--hardware']).decode("utf-8")
lines = output.split('\n')
for line in lines:
if line.startswith('available:'):
num_numas = int(line.split(' ')[1])
break
for numa in range(num_numas):
for line in lines:
if line.startswith(f'node {numa} cpus:'):
cores = line.split(' ')[3:]
ret.append([int(core) for core in cores])
return ret


def check_for_numactl_pkg():
libs = dict(
dpkg=["-l", "numactl", "apt"],
pacman=["-Q", "numactl", "pacman"],
rpm=["-q", "numactl", "yum"],
)

found = False
for pkgmgr, data in libs.items():
flag, lib, tool = data
path = distutils.spawn.find_executable(pkgmgr)
if path is not None:
cmd = f"{pkgmgr} {flag} {lib}"
result = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
if result.wait() == 0:
found = True
else:
print(f"please install the {lib} package with {tool}")
break
return found


def main():
args = parse_args()
current_env = os.environ.copy()
Expand Down Expand Up @@ -211,8 +307,43 @@ def main():

# spawn the processes
cmd = []
if args.bind_cores_to_rank:
check_for_numactl_pkg()
if 'KMP_AFFINITY' in os.environ.keys():
raise ValueError("Environment variable KMP_AFFINITY conflicts with numactl "
"because it interfere with how many CPU cores numactl can set. "
"Unset KMP_AFFINITY before launching deepspeed.\n\n"
"\t$ unset KMP_AFFINITY\n"
"\t$ deepspeed <deepspeed command parameters>")
if args.bind_core_list != None:
core_list = parse_range_list(args.bind_core_list)
total_cores = len(core_list)
else:
total_cores = psutil.cpu_count(logical=False)
core_list = range(total_cores)
cores_per_rank = total_cores // num_local_procs
assert cores_per_rank >= 1, "At least one core needs to be assigned to each rank"
core_list_for_rank = core_list[cores_per_rank * local_rank:cores_per_rank * (local_rank + 1)]
current_env["OMP_NUM_THREADS"] = f"{cores_per_rank}"
cmd.append("numactl")

# check if all cores belong to same numa, if true, bind process to that numa domain with -m parameter
numa_cores = get_numa_cores()
num_numas = len(numa_cores)
for i in range(num_numas):
if set(core_list_for_rank) <= set(numa_cores[i]):
cmd.append("-m")
cmd.append(f"{i}")
break

cmd.append("-C")
core_list_str = f"{core_list_for_rank[0]}"
for core_id in core_list_for_rank[1:]:
core_list_str = f"{core_list_str},{core_id}"
cmd.append(f"{core_list_str}")
if not args.no_python:
cmd = [sys.executable, "-u"]
cmd.append(sys.executable)
cmd.append("-u")
if args.module:
cmd.append("-m")
else:
Expand Down
13 changes: 13 additions & 0 deletions deepspeed/launcher/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ def parse_args(args=None):
parser.add_argument("user_script", type=str, help="User script to launch, followed by any required "
"arguments.")
parser.add_argument('user_args', nargs=argparse.REMAINDER)
parser.add_argument("--bind_cores_to_rank",
action="store_true",
help="Bind each rank to different cores of the host")
parser.add_argument("--bind_core_list",
type=str,
default=None,
help="List of cores to bind to with comma separated list of "
"numbers and range. i.e. 1,3-5,7 => [1,3,4,5,7]. When not "
"specified, all cores on system would be used rank binding")
return parser.parse_args(args=args)


Expand Down Expand Up @@ -481,6 +490,10 @@ def main(args=None):
deepspeed_launch.append("--enable_elastic_training")
deepspeed_launch.append(f"--max_elastic_nodes={args.max_elastic_nodes}")
deepspeed_launch.append(f"--min_elastic_nodes={args.min_elastic_nodes}")
if args.bind_cores_to_rank:
deepspeed_launch.append("--bind_cores_to_rank")
if args.bind_core_list != None:
deepspeed_launch.append(f"--bind_core_list={args.bind_core_list}")
cmd = deepspeed_launch + [args.user_script] + args.user_args
else:
args.launcher = args.launcher.lower()
Expand Down
33 changes: 33 additions & 0 deletions tests/unit/launcher/test_ds_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import argparse
import pytest
import deepspeed
from deepspeed.launcher.launch import parse_range_list


def basic_parser():
Expand Down Expand Up @@ -98,3 +99,35 @@ def test_core_deepscale_arguments():
assert hasattr(args, 'deepspeed_config')
assert type(args.deepspeed_config) == str
assert args.deepspeed_config == 'foo.json'


def test_core_binding_arguments():
core_list = parse_range_list("0,2-4,6,8-9")
assert core_list == [0, 2, 3, 4, 6, 8, 9]

try:
# negative case for range overlapping
core_list = parse_range_list("0,2-6,5-9")
except ValueError as e:
pass
else:
# invalid core list must fail
assert False

try:
# negative case for reverse order -- case 1
core_list = parse_range_list("8,2-6")
except ValueError as e:
pass
else:
# invalid core list must fail
assert False

try:
# negative case for reverse order -- case 2
core_list = parse_range_list("1,6-2")
except ValueError as e:
pass
else:
# invalid core list must fail
assert False

0 comments on commit 0b5252b

Please sign in to comment.