sglang_v0.5.2/nvshmem_src/perftest/perftestCommon.py

466 lines
20 KiB
Python

#!/usr/bin/env python3
import sys
import os
import re
import time
import signal
from subprocess import Popen, PIPE
from threading import Thread
test_process = 0
failed_binary_cmdlines_list = []
NVSHMEM_LAUNCHER = 0
MPI_LAUNCHER = 1
SHMEM_LAUNCHER = 2
def to_bytes(s):
if type(s) is bytes:
return s
elif type(s) is str or (sys.version_info[0] < 3 and type(s) is unicode):
return codecs.encode(s, 'utf-8')
else:
raise TypeError("Expected bytes or string, but got %s." % type(s))
def display_time(func):
def wrapper(*args):
t1 = time.time()
req = func(*args)
t2 = time.time()
print('Total time {:.4}s'.format(t2 - t1))
return req
return wrapper
def report_failure(cmd_line, test_path, ftesto, fteste):
global failed_tests_list
Popen(['echo', ' '.join([str(elem) for elem in cmd_line]) + ' failed\r\n'], stdout=fteste)
failed_binary_cmdlines_list.append((test_path, str(cmd_line)))
return
def get_all_tests(ftestlist):
tests_set = []
skipped_tests_set = []
with open(ftestlist, 'r') as f:
for line in f:
if line.startswith("#"):
skipped_tests_set.append(line[1:-1].strip())
tests_set.append(line[1:-1].strip())
else:
tests_set.append(line.strip())
return (tests_set, skipped_tests_set)
def get_args_combinations_pe_range(full_test_path, npe_start_end_step, max_pes):
args_combs = []
npe_range_ = list(npe_start_end_step)
npe_range = [npe for npe in npe_range_ if npe <= max_pes]
full_args_path = full_test_path + '.args'
if 'pt-to-pt' in full_test_path:
npe_range[0] = 2
if 1 < len(npe_range):
elemsDelCnt = len(npe_range) - 1
for cnt in range(0, elemsDelCnt):
del npe_range[-1]
#TODO : delete the test of this def
if not os.path.isfile(full_args_path):
return (args_combs, npe_range)
else:
print(full_args_path)
with open(full_args_path) as f:
lines = f.readlines()
for i in range(0, len(lines)):
if lines[i]:
print("Add parameters: %s" % lines[i])
args_combs.append(lines[i])
return (args_combs, npe_range)
if not os.path.isfile(full_args_path):
return (args_combs, npe_range)
else:
print(full_args_path)
with open(full_args_path) as f:
lines = f.readlines()
# f.seek(0)
for i in range(0,len(lines)):
print("Add parameters: %s" % lines[i])
if lines[i]:
args_combs.append(lines[i])
return (args_combs, npe_range)
def get_env_combinations(full_test_path):
envs = []
env_combs = []
full_env_path = full_test_path+'.env'
if not os.path.isfile(full_env_path):
return env_combs
with open(full_env_path) as f:
for line in f:
envs.append(line)
for e in envs:
env_combs.append(e.split())
return env_combs
def show_table_partial_data_only(data):
"""
Prints the first data row and the last data row of each table and table's header lines in the provided data(output).
Parameters:
data (str): A string containing one or more text-based tables.
"""
lines = data.split("\n")
env_value = os.getenv('NVSHMEM_MACHINE_READABLE_OUTPUT')
no_new_program = True
i = 0
total_lines = len(lines)
if env_value == '1':
# NVSHMEM_MACHINE_READABLE_OUTPUT is 1
table_sep_pattern = r'^&&&&'
separator = re.compile(table_sep_pattern)
result_pattern = r'^&&&& PERF\s(\w+?)__+(.+?)_size\D+(\d+)_+(\w+)\s+(\S+)\s+(\S+)$'
result_line = re.compile(result_pattern)
while i < total_lines:
no_new_program = False
found_first_report = False
while i < total_lines and not separator.match(lines[i]):
if found_first_report and not no_new_program and len(lines[i]) > 0 and lines[i] != '\n':
no_new_program = True
break
i += 1
continue
if i >= total_lines or no_new_program:
continue
perf_line = result_line.match(lines[i])
if perf_line:
# Only catch the first and the last for each paragraph.
if "&&&&" not in lines[i - 1] or "&&&&" not in lines[i + 1]:
print(lines[i])
if i < total_lines - 1:
i += 1
else:
break
else:
# Tables
table_header1_pattern = r'\|\s*(.+)\s*\|\s*(\w[\w -]*?)\s*\|'
table_header2_pattern = r'\|\s*([\w-]+)[\s\w\(\)-]*\|\s*([\w-]+)\s*([\w/]+)\s*\|'
table_content_pattern = r'\|\s*([\d]*)\s*\|\s*([\d.]+)\s*\|'
table_sep_pattern = r'^\+\-+\+\-+\+$'
separator = re.compile(table_sep_pattern)
theader1 = re.compile(table_header1_pattern)
theader2 = re.compile(table_header2_pattern)
tcont = re.compile(table_content_pattern)
while i < total_lines:
# Maybe include multi tables in one output.
no_new_program = False
found_first_report = False
while i < total_lines and not separator.match(lines[i]):
if found_first_report and not no_new_program and len(lines[i]) > 0 and lines[i] != '\n':
no_new_program = True
break
i += 1
continue
if i >= total_lines or no_new_program:
continue
found_first_report = True
i += 1
th1 = theader1.match(lines[i])
print(lines[i - 1])
print(lines[i])
i += 2
th2 = theader2.match(lines[i])
print(lines[i - 1])
print(lines[i])
i += 2
if not th1 or not th2:
continue
data = []
content = tcont.match(lines[i])
while content is not None:
if float(content.group(2)) > 0.0:
data.append((content.group(1), content.group(2)))
if len(data) == 1:
print(lines[i])
i += 2
content = tcont.match(lines[i])
if len(data) != 1:
print(lines[i-2])
print(lines[i-1])
else:
print(lines[i-1])
print("")
i += 1
def thread_func(cmd_line, ftesto, fteste):
global test_process
cmd_line_str = ' '.join([str(elem) for elem in cmd_line])
print(cmd_line_str)
# test_process = Popen(['echo', 'Running ' + ' '.join([str(elem) for elem in cmd_line]) + '\r\n'], stdout=ftesto)
fteste.write('Running ' + ' '.join([str(elem) for elem in cmd_line]) + '\r\n')
fteste.flush()
ftesto.write('Running ' + ' '.join([str(elem) for elem in cmd_line]) + '\r\n')
ftesto.flush()
try:
# Run the command and capture stdout and stderr
command_line_list = []
command_line_list.append(cmd_line_str)
test_process = Popen(command_line_list, stdout=PIPE, stderr=PIPE, shell=True, preexec_fn=os.setsid)
stdout_data, stderr_data = test_process.communicate()
# Write the stderr and stdout data to the respective files
fteste.write(stderr_data.decode('utf-8'))
fteste.flush()
ftesto.write(stdout_data.decode('utf-8'))
ftesto.flush()
# Optionally print stdout data if SHOW_PERF_DATA is set to "Yes"
show_perf_data = os.environ.get('SHOW_PERF_DATA', 'No')
if show_perf_data == "Yes":
show_table_partial_data_only(stdout_data.decode('utf-8'))
test_process.stderr_data = stderr_data
test_process.stdout_data = stdout_data
return test_process.returncode
except Exception as err:
print(str(err))
return 254
@display_time
def run_cmd(cmd_line, test_path, timeout, ftesto, fteste):
th = Thread(target=thread_func, args=(cmd_line, ftesto, fteste))
th.start()
th.join(timeout)
if th.is_alive():
# Popen(['echo', 'Timed out ' + ' '.join([str(elem) for elem in cmd_line]) + '\r\n'], stdout=fteste)
fteste.write('Timed out ' + ' '.join([str(elem) for elem in cmd_line]) + '\r\n')
fteste.flush()
print("Timed out " + ' '.join([str(elem) for elem in cmd_line]))
os.killpg(os.getpgid(test_process.pid), signal.SIGTERM)
# test_process.terminate()
th.join()
report_failure(cmd_line, test_path, ftesto, fteste)
if test_process.returncode:
if hasattr(test_process, 'stderr_data'):
print(test_process.stderr_data.decode('utf-8'))
p = Popen(['echo', 'EXPECTING PASSED, GOT FAILURE'], stdout=PIPE)
print(to_bytes(p.communicate()[0]).decode('utf-8'))
report_failure(cmd_line, test_path, ftesto, fteste)
else:
p = Popen(['echo', 'PASSED'], stdout=PIPE)
print(to_bytes(p.communicate()[0]).decode('utf-8'))
cmd = 'rm'
args = "%s*" % '/dev/shm/nvshmem-shm'
Popen("%s %s" % (cmd, args), shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE, close_fds=True)
return
def run_cmd_given_pes(cmd_line_prefix, cmd_line_suffix, test_install_path, full_test_path, npe_all, ppn, timeout, launcher_choice, ftesto, fteste):
cmd_line = cmd_line_prefix[:]
cmd_line.append(str(npe_all))
if launcher_choice == NVSHMEM_LAUNCHER:
cmd_line.append('-ppn')
elif launcher_choice == MPI_LAUNCHER or launcher_choice == SHMEM_LAUNCHER:
cmd_line.append('-npernode')
cmd_line.append(str(ppn))
bind_scr = os.environ.get("GPUBIND_SCRIPT")
if bind_scr != "" and bind_scr is not None:
cmd_line.append("%s" % bind_scr)
cmd_line.append(full_test_path)
if cmd_line_suffix:
cmd_line.append(cmd_line_suffix)
run_cmd(cmd_line, full_test_path.replace(test_install_path, ''), timeout, ftesto, fteste)
def run_cmd_vary_pes(cmd_line_prefix, cmd_line_suffix, test_install_path, full_test_path, npe_range, nhosts, timeout, launcher_choice, ftesto, fteste):
cmd_line = cmd_line_prefix[:]
if 'pt-to-pt' in full_test_path:
if nhosts == 1:
ppn = 2
npe_all = 2
else:
ppn = 1
npe_all = 2
run_cmd_given_pes(cmd_line_prefix, cmd_line_suffix, test_install_path, full_test_path, npe_all, ppn, timeout, launcher_choice, ftesto, fteste)
elif 'coll' in full_test_path or 'init' in full_test_path:
if nhosts == 1:
npe_range_ = npe_range[1:]
else:
npe_range_ = npe_range
for npe in npe_range_:
ppn = npe
npe_all = nhosts*ppn
run_cmd_given_pes(cmd_line_prefix, cmd_line_suffix, test_install_path, full_test_path, npe_all, ppn, timeout, launcher_choice, ftesto, fteste)
return
def enumerate_env_lines(env_combs, cmd_line_suffix, nvshmem_install_path, test_install_path, full_test_path, npe_range, hosts, timeout, launcher_choice, mpi_install_path, extra_parameters_string, ftesto, fteste):
nhosts = hosts.count(",")+1
if 'CUDA_HOME' in os.environ:
cuda_install_path = os.environ['CUDA_HOME']
else:
print('CUDA_HOME not set. Try to use the default value: /usr/local/cuda')
cuda_install_path = '/usr/local/cuda'
if 'GDRCOPY_HOME' in os.environ:
gdrcopy_install_path = "%s/lib:%s/lib64" % (os.environ['GDRCOPY_HOME'], os.environ['GDRCOPY_HOME'])
else:
gdrcopy_install_path = ""
print('GDRCOPY_HOME not set, will not use gdrcopy')
if 'NCCL_HOME' in os.environ:
nccl_install_lib = ":%s/lib64:%s/lib" % (os.environ['NCCL_HOME'], os.environ['NCCL_HOME'])
else:
nccl_install_lib = ""
if 'PMIX_HOME' in os.environ:
pmix_install_lib = ":%s/lib" % os.environ['PMIX_HOME']
else:
pmix_install_lib = ""
if 'QA_BOOTSTRAP' in os.environ:
QA_BOOTSTRAP = os.environ['QA_BOOTSTRAP']
else:
QA_BOOTSTRAP = "pmi"
if 'QA_BIND_TO' in os.environ:
QA_BIND_TO = os.environ['QA_BIND_TO']
else:
QA_BIND_TO = "socket"
if QA_BOOTSTRAP == "uid":
bootstrap_str = "NVSHMEMTEST_USE_UID_BOOTSTRAP=1"
elif QA_BOOTSTRAP == "mpi":
bootstrap_str = "NVSHMEM_BOOTSTRAP=MPI"
else:
bootstrap_str = "NVSHMEMTEST_USE_MPI_LAUNCHER=1"
if env_combs:
for combidx in range(0, len(env_combs)):
if launcher_choice == NVSHMEM_LAUNCHER:
cmd_line_prefix = [nvshmem_install_path+'/bin/nvshmrun.hydra', '--bind-to', QA_BIND_TO, '--launcher', 'ssh', '--hosts', hosts]
extra_parameters = extra_parameters_string.split()
first_e = cmd_line_prefix.index("--launcher")
for item in extra_parameters[::-1]:
cmd_line_prefix.insert(first_e, "-genv=%s" % item)
for envidx in range(0, len(env_combs[0]), 2):
var = env_combs[combidx][envidx]
val = env_combs[combidx][envidx + 1]
cmd_line_prefix.append('-genv')
cmd_line_prefix.append(var)
cmd_line_prefix.append(val)
cmd_line_prefix.append('-n')
run_cmd_vary_pes(cmd_line_prefix, cmd_line_suffix, test_install_path, full_test_path, npe_range, nhosts, timeout, launcher_choice, ftesto, fteste)
if launcher_choice == MPI_LAUNCHER:
cmd_line_prefix = [mpi_install_path+'/bin/mpirun', '--mca', 'btl', '^uct', '--allow-run-as-root', '-oversubscribe', '--bind-to', QA_BIND_TO, '-x', 'LD_LIBRARY_PATH='+cuda_install_path+'/lib64:'+gdrcopy_install_path+nccl_install_lib+pmix_install_lib+':'+nvshmem_install_path+'/lib'+':$LD_LIBRARY_PATH', '-x', bootstrap_str , '--host', hosts]
extra_parameters = extra_parameters_string.split()
first_x = cmd_line_prefix.index("-x")
for item in extra_parameters[::-1]:
cmd_line_prefix.insert(first_x, item)
cmd_line_prefix.insert(first_x, "-x")
for envidx in range(0, len(env_combs[0]), 2):
var = env_combs[combidx][envidx]
val = env_combs[combidx][envidx + 1]
cmd_line_prefix.append('-x')
cmd_line_prefix.append(var+'='+val)
cmd_line_prefix.append('-n')
run_cmd_vary_pes(cmd_line_prefix, cmd_line_suffix, test_install_path, full_test_path, npe_range, nhosts, timeout, launcher_choice, ftesto, fteste)
if launcher_choice == SHMEM_LAUNCHER:
cmd_line_prefix = [mpi_install_path+'/bin/oshrun', '--mca', 'btl', '^uct', '--allow-run-as-root', '-oversubscribe', '--bind-to', QA_BIND_TO, '-x', 'LD_LIBRARY_PATH='+cuda_install_path+'/lib64:'+gdrcopy_install_path+nccl_install_lib+pmix_install_lib+':'+nvshmem_install_path+'/lib'+':$LD_LIBRARY_PATH', '-x', 'NVSHMEMTEST_USE_SHMEM_LAUNCHER=1' , '--host', hosts]
first_x = cmd_line_prefix.index("-x")
for item in extra_parameters[::-1]:
cmd_line_prefix.insert(first_x, item)
cmd_line_prefix.insert(first_x, "-x")
for envidx in range(0, len(env_combs[0]), 2):
var = env_combs[combidx][envidx]
val = env_combs[combidx][envidx + 1]
cmd_line_prefix.append('-x')
cmd_line_prefix.append(var+'='+val)
cmd_line_prefix.append('-n')
run_cmd_vary_pes(cmd_line_prefix, cmd_line_suffix, test_install_path, full_test_path, npe_range, nhosts, timeout, launcher_choice, ftesto, fteste)
else:
if launcher_choice == NVSHMEM_LAUNCHER:
cmd_line_prefix = [nvshmem_install_path+'/bin/nvshmrun.hydra', '--bind-to', QA_BIND_TO, '--launcher', 'ssh', '--hosts', hosts, '-n']
extra_parameters = extra_parameters_string.split()
first_e = cmd_line_prefix.index("--launcher")
for item in extra_parameters[::-1]:
cmd_line_prefix.insert(first_e, "-genv=%s" % item)
run_cmd_vary_pes(cmd_line_prefix, cmd_line_suffix, test_install_path, full_test_path, npe_range, nhosts, timeout, launcher_choice, ftesto, fteste)
if launcher_choice == MPI_LAUNCHER:
cmd_line_prefix = [mpi_install_path+'/bin/mpirun', '--mca', 'btl', '^uct', '--allow-run-as-root', '-oversubscribe', '--bind-to', QA_BIND_TO, '-x', 'LD_LIBRARY_PATH='+cuda_install_path+'/lib64:'+gdrcopy_install_path+nccl_install_lib+pmix_install_lib+':'+nvshmem_install_path+'/lib'+':$LD_LIBRARY_PATH', '-x', bootstrap_str, '--host', hosts, '-n']
extra_parameters = extra_parameters_string.split()
first_x = cmd_line_prefix.index("-x")
for item in extra_parameters[::-1]:
cmd_line_prefix.insert(first_x, item)
cmd_line_prefix.insert(first_x, "-x")
run_cmd_vary_pes(cmd_line_prefix, cmd_line_suffix, test_install_path, full_test_path, npe_range, nhosts, timeout, launcher_choice, ftesto, fteste)
if launcher_choice == SHMEM_LAUNCHER:
cmd_line_prefix = [mpi_install_path+'/bin/oshrun', '--mca', 'btl', '^uct', '--allow-run-as-root', '-oversubscribe', '--bind-to', QA_BIND_TO, '-x', 'LD_LIBRARY_PATH='+cuda_install_path+'/lib64:'+gdrcopy_install_path+nccl_install_lib+pmix_install_lib+':'+nvshmem_install_path+'/lib'+':$LD_LIBRARY_PATH', '-x', 'NVSHMEMTEST_USE_SHMEM_LAUNCHER=1', '--host', hosts, '-n']
extra_parameters = extra_parameters_string.split()
first_x = cmd_line_prefix.index("-x")
for item in extra_parameters[::-1]:
cmd_line_prefix.insert(first_x, item)
cmd_line_prefix.insert(first_x, "-x")
run_cmd_vary_pes(cmd_line_prefix, cmd_line_suffix, test_install_path, full_test_path, npe_range, nhosts, timeout, launcher_choice, ftesto, fteste)
return
def enumerate_args_lines(args_combs, env_combs, nvshmem_install_path, test_install_path, full_test_path, npe_range, hosts, timeout, launcher_choice, mpi_install_path, extra_parameters_string, ftesto, fteste):
if args_combs:
for args in args_combs:
cmd_line_suffix = args.rstrip()
enumerate_env_lines(env_combs, cmd_line_suffix, nvshmem_install_path, test_install_path, full_test_path, npe_range, hosts, timeout, launcher_choice, mpi_install_path, extra_parameters_string, ftesto, fteste)
else:
enumerate_env_lines(env_combs, '', nvshmem_install_path, test_install_path, full_test_path, npe_range, hosts, timeout, launcher_choice, mpi_install_path, extra_parameters_string, ftesto, fteste)
return
def walk_dir_on_set(nvshmem_install_path, test_install_path, npe_start_end_step, max_pes, hosts, timeout, enable_skip, tests_set, skipped_tests_set, launcher_choice, mpi_install_path, extra_parameters_string, ftesto, fteste):
for test_path in tests_set:
full_test_path = os.path.join(test_install_path, test_path.lstrip(os.path.sep))
if enable_skip and (test_path in skipped_tests_set):
Popen(['echo', (full_test_path)+' found in list and skipped\r\n'], stdout=ftesto)
continue
if not os.access(full_test_path, os.X_OK):
Popen(['echo', (full_test_path)+' found in list and binary missing\r\n'], stdout=fteste)
continue
env_combs = get_env_combinations(full_test_path)
tup = get_args_combinations_pe_range(full_test_path, npe_start_end_step, max_pes)
enumerate_args_lines(tup[0], env_combs, nvshmem_install_path, test_install_path, full_test_path, tup[1], hosts, timeout, launcher_choice, mpi_install_path, extra_parameters_string, ftesto, fteste)
return
def walk_dir(nvshmem_install_path, mpi_install_path, test_install_path, launcher_choice, npe_start_end_step, max_pes, hosts, timeout, enable_skip, ftestlist_any_launcher, extra_parameters_string, ftesto, fteste):
stup = get_all_tests(ftestlist_any_launcher)
if len(stup[0]) != 0:
if launcher_choice == 1:
walk_dir_on_set(nvshmem_install_path, test_install_path, npe_start_end_step, max_pes, hosts, timeout, enable_skip, stup[0], stup[1], NVSHMEM_LAUNCHER, mpi_install_path, extra_parameters_string, ftesto, fteste)
walk_dir_on_set(nvshmem_install_path, test_install_path, npe_start_end_step, max_pes, hosts, timeout, enable_skip, stup[0], stup[1], MPI_LAUNCHER, mpi_install_path, extra_parameters_string, ftesto, fteste)
walk_dir_on_set(nvshmem_install_path, test_install_path, npe_start_end_step, max_pes, hosts, timeout, enable_skip, stup[0], stup[1], SHMEM_LAUNCHER, mpi_install_path, extra_parameters_string, ftesto, fteste)
elif launcher_choice == 2:
walk_dir_on_set(nvshmem_install_path, test_install_path, npe_start_end_step, max_pes, hosts, timeout, enable_skip, stup[0], stup[1], SHMEM_LAUNCHER, mpi_install_path, extra_parameters_string, ftesto, fteste)
elif launcher_choice == 3:
walk_dir_on_set(nvshmem_install_path, test_install_path, npe_start_end_step, max_pes, hosts, timeout, enable_skip, stup[0], stup[1], NVSHMEM_LAUNCHER, mpi_install_path, extra_parameters_string, ftesto, fteste)
elif launcher_choice == 0:
walk_dir_on_set(nvshmem_install_path, test_install_path, npe_start_end_step, max_pes, hosts, timeout, enable_skip, stup[0], stup[1], MPI_LAUNCHER, mpi_install_path, extra_parameters_string, ftesto, fteste)
else:
print("Please select launcher use 0/1/2/3. [1: Three launchers, 0: mpirun, 2: openshmem, 3: nvshmem]")
return