From b77e258587d431e0d5f16d732c925d625973b559 Mon Sep 17 00:00:00 2001 From: tangjingqi Date: Thu, 6 Jul 2023 15:39:34 +0800 Subject: [PATCH 01/18] temp commit --- .../paddle/distributed/auto_tuner/search.py | 21 ++- python/paddle/distributed/auto_tuner/tuner.py | 7 +- python/paddle/distributed/auto_tuner/utils.py | 143 ++++++++++++++++++ python/paddle/distributed/launch/main.py | 32 ++++ 4 files changed, 201 insertions(+), 2 deletions(-) diff --git a/python/paddle/distributed/auto_tuner/search.py b/python/paddle/distributed/auto_tuner/search.py index 01029e7f3727b..8539a1a952fc0 100644 --- a/python/paddle/distributed/auto_tuner/search.py +++ b/python/paddle/distributed/auto_tuner/search.py @@ -16,7 +16,7 @@ from abc import ABC, abstractmethod from .prune import _PRUNE_FUNC -from .utils import search_all +from .utils import search_all, GBS_search_all class SearchAlgo(ABC): @@ -52,3 +52,22 @@ def search_once(self, history_cfgs): else: return None return new_cfg + +class GBSSearch(SearchAlgo): + def __init__(self, tuner_cfg): + super().__init__(tuner_cfg) + self.idx = 0 + self.all_tasks = GBS_search_all(tuner_cfg) + + def search_once(self, history_cfgs): + new_cfg = None + stop = False + while not stop: + if self.idx < len(self.all_tasks): + new_cfg = self.all_tasks[self.idx] + self.idx += 1 + stop = not self.prune(self.tuner_cfg, new_cfg, history_cfgs) + else: + return None + return new_cfg + \ No newline at end of file diff --git a/python/paddle/distributed/auto_tuner/tuner.py b/python/paddle/distributed/auto_tuner/tuner.py index 26831a2e8fcbd..981616254ad30 100644 --- a/python/paddle/distributed/auto_tuner/tuner.py +++ b/python/paddle/distributed/auto_tuner/tuner.py @@ -13,7 +13,7 @@ # limitations under the License. -from .utils import default_candidates +from .utils import default_candidates, GBS_default_candidates class AutoTuner: @@ -36,6 +36,11 @@ def __init__(self, tuner_cfg): from .search import GridSearch self.algo = GridSearch(tuner_cfg) + elif search_algo == "gbs": + from .search import GBSSearch + + tuner_cfg["candidates"] = default_candidates(tuner_cfg) + self.algo = GBSSearch(tuner_cfg) else: raise NotImplementedError() diff --git a/python/paddle/distributed/auto_tuner/utils.py b/python/paddle/distributed/auto_tuner/utils.py index 8db11df08c5a9..445f08ad8722a 100644 --- a/python/paddle/distributed/auto_tuner/utils.py +++ b/python/paddle/distributed/auto_tuner/utils.py @@ -355,3 +355,146 @@ def read_log( metric_ave = round(metric_ave, 5) res = metric_ave, flag return res + +def three_mul_combinations(target): + """Return the combinations of three numbers which product is target.""" + results = [] + for i in range(1, target // 3 + 1): + if target % i == 0: + for j in range(i, target // 2 + 1): + if (target // i) % j == 0: + results.append((i, j, target // i // j)) + return results + +def GBS_dp_mp_pp_candidates(tuner_cfg, num_gpus, num_nodes): + """Return middle candidates of dp, mp, pp""" + res = three_mul_combinations(num_gpus) + return res[len(res) // 2 ] + +def GBS_default_candidates(tuner_cfg): + """Return the default candidates of every hyper param which user defined auto""" + candidates = {} + assert num_gpus > 0 + if tuner_cfg["model_cfg"]["global_batch_size"] == "auto": + num_gpus = tuner_cfg["num_gpus"] + num_nodes = tuner_cfg["nodes"] + dp_candidate, mp_candidate, pp_candidate = GBS_dp_mp_pp_candidates(tuner_cfg, num_gpus, num_nodes) + candidates["dp_degree"] = [dp_candidate] + candidates["mp_degree"] = [mp_candidate] + candidates["pp_degree"] = [pp_candidate] + candidates["sharding_degree"] = [16] + candidates["sharding_stage"] = [1] + candidates["use_recompute"] = [True] + candidates["recompute_granularity"] = ["full"] + candidates["micro_batch_size"] = [i for i in range(1, 16)] + tuner_cfg["model_cfg"]["global_batch_size"] = [pp_candidate * dp_candidate * e for e in candidates["micro_batch_size"]] + else: + if tuner_cfg.get("dp_degree", None) == "auto": + candidates["dp_degree"] = dist_degree("dp", num_gpus, num_nodes) + elif tuner_cfg.get("dp_degree", None): + candidates["dp_degree"] = tuner_cfg.get("dp_degree") + else: + candidates["dp_degree"] = [1] + + if tuner_cfg.get("mp_degree", None) == "auto": + candidates["mp_degree"] = dist_degree("mp", num_gpus, num_nodes) + elif tuner_cfg.get("mp_degree", None): + candidates["mp_degree"] = tuner_cfg.get("mp_degree") + else: + candidates["mp_degree"] = [1] + + if tuner_cfg.get("pp_degree", None) == "auto": + candidates["pp_degree"] = dist_degree("pp", num_gpus, num_nodes) + elif tuner_cfg.get("pp_degree", None): + candidates["pp_degree"] = tuner_cfg.get("pp_degree") + else: + candidates["pp_degree"] = [1] + + if tuner_cfg.get("sharding_degree", None) == "auto": + candidates["sharding_degree"] = dist_degree( + "sharding", num_gpus, num_nodes + ) + elif tuner_cfg.get("sharding_degree", None): + candidates["sharding_degree"] = tuner_cfg.get("sharding_degree") + else: + candidates["sharding_degree"] = [1] + + if tuner_cfg.get("sharding_stage", None) == "auto": + candidates["sharding_stage"] = [1, 2, 3] + elif tuner_cfg.get("sharding_stage", None): + candidates["sharding_stage"] = tuner_cfg.get("sharding_stage") + else: + candidates["sharding_stage"] = [None] + + if tuner_cfg.get("use_recompute", None) == "auto": + candidates["use_recompute"] = [False, True] + elif tuner_cfg.get("use_recompute", None): + candidates["use_recompute"] = tuner_cfg.get("use_recompute") + else: + candidates["use_recompute"] = [None] + + if tuner_cfg.get("recompute_granularity", None) == "auto": + candidates["recompute_granularity"] = ["full_attn", "full"] + elif tuner_cfg.get("recompute_granularity", None): + candidates["recompute_granularity"] = tuner_cfg.get( + "recompute_granularity" + ) + else: + candidates["recompute_granularity"] = [None] + + if tuner_cfg.get("micro_batch_size", None) == "auto": + candidates["micro_batch_size"] = list( + range(tuner_cfg["model_cfg"]["global_batch_size"], 0, -1) + ) + elif tuner_cfg.get("micro_batch_size", None): + candidates["micro_batch_size"] = tuner_cfg.get("micro_batch_size") + else: + candidates["micro_batch_size"] = [ + tuner_cfg["model_cfg"]["global_batch_size"] + ] + return candidates + +def GBS_search_all(tuner_cfg): + """Permutate the candidates of all hyper params.""" + candidates = tuner_cfg["candidates"] + # Order: dp -> mp -> pp -> mbs -> sharding-> recompute + dp_degree_candidates = candidates["dp_degree"] + mp_degree_candidates = candidates["mp_degree"] + pp_degree_candidates = candidates["pp_degree"] + mbs_candidates = candidates["micro_batch_size"] + sharding_stage_candidates = candidates["sharding_stage"] + sharding_degree_candidates = candidates["sharding_degree"] + use_recompute_candidates = candidates["use_recompute"] + recompute_granularity_candidates = candidates["recompute_granularity"] + gbs_candidates = candidates["global_batch_size"] + all_cfgs = list( + itertools.product( + dp_degree_candidates, + mp_degree_candidates, + pp_degree_candidates, + mbs_candidates, + sharding_degree_candidates, + sharding_stage_candidates, + use_recompute_candidates, + recompute_granularity_candidates, + gbs_candidates, + ) + ) + mapping = { + 0: "dp_degree", + 1: "mp_degree", + 2: "pp_degree", + 3: "micro_batch_size", + 5: "sharding_stage", + 4: "sharding_degree", + 6: "use_recompute", + 7: "recompute_granularity", + 8: "global_batch_size", + } + new_all_cfgs = [] + for cfg in all_cfgs: + new_cfg = {} + for idx, val in enumerate(cfg): + new_cfg[mapping[idx]] = val + new_all_cfgs.append(new_cfg) + return new_all_cfgs \ No newline at end of file diff --git a/python/paddle/distributed/launch/main.py b/python/paddle/distributed/launch/main.py index 908c0af8cc18f..a159e309438d1 100644 --- a/python/paddle/distributed/launch/main.py +++ b/python/paddle/distributed/launch/main.py @@ -361,8 +361,31 @@ def launch(): recorder = History_recorder() job_id = 0 +<<<<<<< HEAD ctx.args.max_restart = -1 raw_ctx = copy.deepcopy(ctx) +======= + if tuner_cfg.get("global_batch_size") == "auto": + # search and set global batch size + # adjust micron batch with fixed dp mp pp + GBS_cur_cfg, GBS_cur_cfg = micron_search(cur_cfg, tuner_cfg) + # generate script args of task + new_args = gen_new_args(raw_args, cur_cfg, tuner_cfg) + ctx.args.training_script_args = new_args + # launch task + ctx.logger.info( + "Launch task from auto tuner: job_id {}, log_dir {}, config {}".format( + task_job_id, log_dir, cur_cfg + ) + ) + c = controllers.init(ctx) + # set per task timeout + signal.signal(signal.SIGALRM, c.not_exit_signal_handler) + signal.alarm(max_time_per_task) + c.run() + # Process generated result + +>>>>>>> a87d9dcc2e (temp commit) while cur_cfg: ctx = copy.deepcopy(raw_ctx) if is_first_task: @@ -409,6 +432,7 @@ def launch(): if err: ctx.logger.warning(f"Read log failed for parameters: {log_dir}") # for pruner use +<<<<<<< HEAD cur_cfg['time'] = -1 cur_cfg[tuner_cfg['metric_cfg']['name']] = None else: @@ -416,6 +440,14 @@ def launch(): cur_cfg['time'] = metric cur_cfg[tuner_cfg['metric_cfg']['name']] = metric +======= + cur_cfg['time'] = None + cur_cfg[tuner_cfg['metric_cfg']['name']] = None + else: + # for pruner use. + cur_cfg['time'] = metric + cur_cfg[tuner_cfg['metric_cfg']['name']] = metric +>>>>>>> a87d9dcc2e (temp commit) # record history cur_cfg['job_id'] = job_id recorder.add_cfg(**cur_cfg) From 33178e8925499aa83d0dd6d3b3ff3f9c7ab2f341 Mon Sep 17 00:00:00 2001 From: caozhou Date: Sun, 25 Jun 2023 02:50:42 +0000 Subject: [PATCH 02/18] distribute best cfg --- python/paddle/distributed/launch/main.py | 29 ++++++++++++------------ 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/python/paddle/distributed/launch/main.py b/python/paddle/distributed/launch/main.py index a159e309438d1..ce30d23572dbd 100644 --- a/python/paddle/distributed/launch/main.py +++ b/python/paddle/distributed/launch/main.py @@ -361,10 +361,6 @@ def launch(): recorder = History_recorder() job_id = 0 -<<<<<<< HEAD - ctx.args.max_restart = -1 - raw_ctx = copy.deepcopy(ctx) -======= if tuner_cfg.get("global_batch_size") == "auto": # search and set global batch size # adjust micron batch with fixed dp mp pp @@ -385,7 +381,6 @@ def launch(): c.run() # Process generated result ->>>>>>> a87d9dcc2e (temp commit) while cur_cfg: ctx = copy.deepcopy(raw_ctx) if is_first_task: @@ -432,7 +427,6 @@ def launch(): if err: ctx.logger.warning(f"Read log failed for parameters: {log_dir}") # for pruner use -<<<<<<< HEAD cur_cfg['time'] = -1 cur_cfg[tuner_cfg['metric_cfg']['name']] = None else: @@ -440,14 +434,6 @@ def launch(): cur_cfg['time'] = metric cur_cfg[tuner_cfg['metric_cfg']['name']] = metric -======= - cur_cfg['time'] = None - cur_cfg[tuner_cfg['metric_cfg']['name']] = None - else: - # for pruner use. - cur_cfg['time'] = metric - cur_cfg[tuner_cfg['metric_cfg']['name']] = metric ->>>>>>> a87d9dcc2e (temp commit) # record history cur_cfg['job_id'] = job_id recorder.add_cfg(**cur_cfg) @@ -469,7 +455,10 @@ def launch(): # generate a new config new_cfg = auto_tuner.search_once() cur_cfg = copy.deepcopy(new_cfg) +<<<<<<< HEAD auto_tuner.add_cfg(cur_cfg) +======= +>>>>>>> efdb222f61 (distribute best cfg) # per task launch interval time.sleep(3) @@ -477,7 +466,10 @@ def launch(): # get best config to run best_cfg = None +<<<<<<< HEAD ctx = copy.deepcopy(raw_ctx) +======= +>>>>>>> efdb222f61 (distribute best cfg) if nnodes > 1: import socket @@ -522,6 +514,7 @@ def launch(): ) assert best_cfg +<<<<<<< HEAD end_time = time.time() ctx.logger.info(f"AutoTuner ends in {end_time-start_time}s.") # launch best cfg @@ -532,6 +525,14 @@ def launch(): ctx.logger.info(f"Launch best cfg from auto tuner: {best_cfg}") ctx.args.log_dir = "best_cfg" # run best cfg +======= + # launch best cfg + ctx.status._current_status = None + new_args = gen_new_args(raw_args, best_cfg, tuner_cfg) + ctx.args.training_script_args = new_args + ctx.args.job_id = "best_cfg" + ctx.logger.info(f"Launch best cfg from auto tuner: {best_cfg}") +>>>>>>> efdb222f61 (distribute best cfg) c = controllers.init(ctx) c.run() c.finalize(exit=True) From 5ac9191373d21791b0f9802e04703df77ec66bdd Mon Sep 17 00:00:00 2001 From: caozhou Date: Sun, 25 Jun 2023 11:53:18 +0000 Subject: [PATCH 03/18] update metric extracting --- python/paddle/distributed/auto_tuner/utils.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/paddle/distributed/auto_tuner/utils.py b/python/paddle/distributed/auto_tuner/utils.py index 445f08ad8722a..d69b40fd9de1e 100644 --- a/python/paddle/distributed/auto_tuner/utils.py +++ b/python/paddle/distributed/auto_tuner/utils.py @@ -330,7 +330,11 @@ def read_log( with open(target_file, "r") as f: # read file re_metric_pattern = ( - target_metric + r":* *(\d+(\.\d*)?)|(\d+(\.\d*)?) *" + target_metric + r'(\d+(\.\d*)?) *' + + target_metric + + '|' + + target_metric + + r' *: *(\d+(\.\d*)?)' ) metric_list = [] From 046a774c36cd58a19ed611ef8b630557280d0be6 Mon Sep 17 00:00:00 2001 From: caozhou Date: Mon, 26 Jun 2023 03:40:56 +0000 Subject: [PATCH 04/18] fix bugs of prune and reading log --- python/paddle/distributed/auto_tuner/utils.py | 6 +----- python/paddle/distributed/launch/main.py | 18 +----------------- 2 files changed, 2 insertions(+), 22 deletions(-) diff --git a/python/paddle/distributed/auto_tuner/utils.py b/python/paddle/distributed/auto_tuner/utils.py index d69b40fd9de1e..445f08ad8722a 100644 --- a/python/paddle/distributed/auto_tuner/utils.py +++ b/python/paddle/distributed/auto_tuner/utils.py @@ -330,11 +330,7 @@ def read_log( with open(target_file, "r") as f: # read file re_metric_pattern = ( - r'(\d+(\.\d*)?) *' - + target_metric - + '|' - + target_metric - + r' *: *(\d+(\.\d*)?)' + target_metric + r":* *(\d+(\.\d*)?)|(\d+(\.\d*)?) *" + target_metric ) metric_list = [] diff --git a/python/paddle/distributed/launch/main.py b/python/paddle/distributed/launch/main.py index ce30d23572dbd..905dda77fa6d7 100644 --- a/python/paddle/distributed/launch/main.py +++ b/python/paddle/distributed/launch/main.py @@ -455,10 +455,6 @@ def launch(): # generate a new config new_cfg = auto_tuner.search_once() cur_cfg = copy.deepcopy(new_cfg) -<<<<<<< HEAD - auto_tuner.add_cfg(cur_cfg) -======= ->>>>>>> efdb222f61 (distribute best cfg) # per task launch interval time.sleep(3) @@ -466,10 +462,6 @@ def launch(): # get best config to run best_cfg = None -<<<<<<< HEAD - ctx = copy.deepcopy(raw_ctx) -======= ->>>>>>> efdb222f61 (distribute best cfg) if nnodes > 1: import socket @@ -514,7 +506,7 @@ def launch(): ) assert best_cfg -<<<<<<< HEAD + end_time = time.time() ctx.logger.info(f"AutoTuner ends in {end_time-start_time}s.") # launch best cfg @@ -525,14 +517,6 @@ def launch(): ctx.logger.info(f"Launch best cfg from auto tuner: {best_cfg}") ctx.args.log_dir = "best_cfg" # run best cfg -======= - # launch best cfg - ctx.status._current_status = None - new_args = gen_new_args(raw_args, best_cfg, tuner_cfg) - ctx.args.training_script_args = new_args - ctx.args.job_id = "best_cfg" - ctx.logger.info(f"Launch best cfg from auto tuner: {best_cfg}") ->>>>>>> efdb222f61 (distribute best cfg) c = controllers.init(ctx) c.run() c.finalize(exit=True) From b2b75dc60991ce8b8b04749d2ab4e62a1217193c Mon Sep 17 00:00:00 2001 From: caozhou Date: Tue, 27 Jun 2023 07:50:23 +0000 Subject: [PATCH 05/18] fix adding cfg bug --- python/paddle/distributed/launch/main.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/paddle/distributed/launch/main.py b/python/paddle/distributed/launch/main.py index 905dda77fa6d7..5595f50e2d385 100644 --- a/python/paddle/distributed/launch/main.py +++ b/python/paddle/distributed/launch/main.py @@ -455,6 +455,7 @@ def launch(): # generate a new config new_cfg = auto_tuner.search_once() cur_cfg = copy.deepcopy(new_cfg) + auto_tuner.add_cfg(cur_cfg) # per task launch interval time.sleep(3) From 09491561c9d7dba198ed5a826a8a2ab893bf4635 Mon Sep 17 00:00:00 2001 From: caozhou Date: Wed, 28 Jun 2023 07:00:22 +0000 Subject: [PATCH 06/18] reset status --- python/paddle/distributed/launch/controllers/controller.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/distributed/launch/controllers/controller.py b/python/paddle/distributed/launch/controllers/controller.py index 9769ec9d6bf3f..f550bf5ae28d8 100644 --- a/python/paddle/distributed/launch/controllers/controller.py +++ b/python/paddle/distributed/launch/controllers/controller.py @@ -183,8 +183,8 @@ def not_exit_signal_handler(self, sigint, frame): self.ctx.logger.info(f"Terminating with signal {sigint}") self.sigint = sigint - self.ctx.status.done() self.stop(sigint=sigint) + self.ctx.status._current_status = None self.ctx.logger.info(f"Exit with signal {sigint}") From 2e0640c53fd6133a95ae1faf6b05516df4dd85e5 Mon Sep 17 00:00:00 2001 From: caozhou Date: Wed, 28 Jun 2023 08:58:11 +0000 Subject: [PATCH 07/18] remove alarm and set logdir --- python/paddle/distributed/launch/main.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/paddle/distributed/launch/main.py b/python/paddle/distributed/launch/main.py index 5595f50e2d385..40e282efe7779 100644 --- a/python/paddle/distributed/launch/main.py +++ b/python/paddle/distributed/launch/main.py @@ -451,6 +451,7 @@ def launch(): "Get best config failed. Currently there are no appropriate configs." ) c.finalize(exit=False) + ctx.status._current_status = None # generate a new config new_cfg = auto_tuner.search_once() @@ -462,6 +463,7 @@ def launch(): recorder.store_history() # get best config to run + signal.alarm(0) best_cfg = None if nnodes > 1: import socket @@ -517,7 +519,10 @@ def launch(): ctx.args.job_id = "best_cfg" ctx.logger.info(f"Launch best cfg from auto tuner: {best_cfg}") ctx.args.log_dir = "best_cfg" +<<<<<<< HEAD # run best cfg +======= +>>>>>>> a8d316ae7c (remove alarm and set logdir) c = controllers.init(ctx) c.run() c.finalize(exit=True) From a489aea1331cd7afc552e3c88ec46ae811b3b352 Mon Sep 17 00:00:00 2001 From: caozhou Date: Thu, 29 Jun 2023 08:07:43 +0000 Subject: [PATCH 08/18] deepcopy ctx --- .../distributed/launch/controllers/controller.py | 1 - python/paddle/distributed/launch/main.py | 12 ++++++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/python/paddle/distributed/launch/controllers/controller.py b/python/paddle/distributed/launch/controllers/controller.py index f550bf5ae28d8..b5b595520dd48 100644 --- a/python/paddle/distributed/launch/controllers/controller.py +++ b/python/paddle/distributed/launch/controllers/controller.py @@ -184,7 +184,6 @@ def not_exit_signal_handler(self, sigint, frame): self.sigint = sigint self.stop(sigint=sigint) - self.ctx.status._current_status = None self.ctx.logger.info(f"Exit with signal {sigint}") diff --git a/python/paddle/distributed/launch/main.py b/python/paddle/distributed/launch/main.py index 40e282efe7779..635d36fa722d7 100644 --- a/python/paddle/distributed/launch/main.py +++ b/python/paddle/distributed/launch/main.py @@ -361,6 +361,7 @@ def launch(): recorder = History_recorder() job_id = 0 +<<<<<<< HEAD if tuner_cfg.get("global_batch_size") == "auto": # search and set global batch size # adjust micron batch with fixed dp mp pp @@ -381,11 +382,17 @@ def launch(): c.run() # Process generated result +======= + raw_ctx = copy.deepcopy(ctx) +>>>>>>> ebd4d85b44 (deepcopy ctx) while cur_cfg: ctx = copy.deepcopy(raw_ctx) +<<<<<<< HEAD if is_first_task: ctx.max_time_per_task = warmup_time is_first_task = False +======= +>>>>>>> dda7aa8154 (deepcopy ctx) # auto tuner supports dp, mp, pp, micro batch size, sharding, recompute by default and every task has own log dir log_dir = "DP{}_MP{}_PP{}_Sharding_degree_{}_stage_{}_MBS_{}_Recompute_{}_granularity_{}".format( cur_cfg["dp_degree"], @@ -451,7 +458,6 @@ def launch(): "Get best config failed. Currently there are no appropriate configs." ) c.finalize(exit=False) - ctx.status._current_status = None # generate a new config new_cfg = auto_tuner.search_once() @@ -465,6 +471,7 @@ def launch(): # get best config to run signal.alarm(0) best_cfg = None + ctx = copy.deepcopy(raw_ctx) if nnodes > 1: import socket @@ -519,10 +526,7 @@ def launch(): ctx.args.job_id = "best_cfg" ctx.logger.info(f"Launch best cfg from auto tuner: {best_cfg}") ctx.args.log_dir = "best_cfg" -<<<<<<< HEAD # run best cfg -======= ->>>>>>> a8d316ae7c (remove alarm and set logdir) c = controllers.init(ctx) c.run() c.finalize(exit=True) From b2c953453c03edf0ffe7939d3329a29757d85910 Mon Sep 17 00:00:00 2001 From: caozhou Date: Thu, 29 Jun 2023 08:41:59 +0000 Subject: [PATCH 09/18] change alarm --- .../distributed/launch/controllers/controller.py | 11 ++++------- python/paddle/distributed/launch/main.py | 8 -------- 2 files changed, 4 insertions(+), 15 deletions(-) diff --git a/python/paddle/distributed/launch/controllers/controller.py b/python/paddle/distributed/launch/controllers/controller.py index b5b595520dd48..8966a49cb8f5f 100644 --- a/python/paddle/distributed/launch/controllers/controller.py +++ b/python/paddle/distributed/launch/controllers/controller.py @@ -36,13 +36,10 @@ def __init__(self, ctx): signal.signal(signal.SIGTERM, self.signal_handler) signal.signal(signal.SIGABRT, self.signal_handler) signal.signal(signal.SIGINT, self.signal_handler) - if ctx.is_auto_tuner_mode(): - if not ctx.run_best: - # set per task timeout - signal.signal(signal.SIGALRM, self.not_exit_signal_handler) - signal.alarm(ctx.max_time_per_task) - else: - signal.alarm(0) + if ctx.is_auto_tuner_mode() and not ctx.run_best: + # set per task timeout + signal.signal(signal.SIGALRM, self.not_exit_signal_handler) + signal.alarm(ctx.max_time_per_task) self.ctx = ctx self.master = Master.factory(self.ctx) diff --git a/python/paddle/distributed/launch/main.py b/python/paddle/distributed/launch/main.py index 635d36fa722d7..f0453fa71df85 100644 --- a/python/paddle/distributed/launch/main.py +++ b/python/paddle/distributed/launch/main.py @@ -361,7 +361,6 @@ def launch(): recorder = History_recorder() job_id = 0 -<<<<<<< HEAD if tuner_cfg.get("global_batch_size") == "auto": # search and set global batch size # adjust micron batch with fixed dp mp pp @@ -382,17 +381,11 @@ def launch(): c.run() # Process generated result -======= - raw_ctx = copy.deepcopy(ctx) ->>>>>>> ebd4d85b44 (deepcopy ctx) while cur_cfg: ctx = copy.deepcopy(raw_ctx) -<<<<<<< HEAD if is_first_task: ctx.max_time_per_task = warmup_time is_first_task = False -======= ->>>>>>> dda7aa8154 (deepcopy ctx) # auto tuner supports dp, mp, pp, micro batch size, sharding, recompute by default and every task has own log dir log_dir = "DP{}_MP{}_PP{}_Sharding_degree_{}_stage_{}_MBS_{}_Recompute_{}_granularity_{}".format( cur_cfg["dp_degree"], @@ -469,7 +462,6 @@ def launch(): recorder.store_history() # get best config to run - signal.alarm(0) best_cfg = None ctx = copy.deepcopy(raw_ctx) if nnodes > 1: From fb75a01235a2f94c9aed3ee2a1cd483b7b8eff11 Mon Sep 17 00:00:00 2001 From: caozhou Date: Thu, 29 Jun 2023 12:11:52 +0000 Subject: [PATCH 10/18] fix restart bug --- .../paddle/distributed/launch/controllers/controller.py | 1 + python/paddle/distributed/launch/main.py | 9 +++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/python/paddle/distributed/launch/controllers/controller.py b/python/paddle/distributed/launch/controllers/controller.py index 8966a49cb8f5f..03870dc5b24ca 100644 --- a/python/paddle/distributed/launch/controllers/controller.py +++ b/python/paddle/distributed/launch/controllers/controller.py @@ -180,6 +180,7 @@ def not_exit_signal_handler(self, sigint, frame): self.ctx.logger.info(f"Terminating with signal {sigint}") self.sigint = sigint + self.ctx.status.done() self.stop(sigint=sigint) self.ctx.logger.info(f"Exit with signal {sigint}") diff --git a/python/paddle/distributed/launch/main.py b/python/paddle/distributed/launch/main.py index f0453fa71df85..901fc260df904 100644 --- a/python/paddle/distributed/launch/main.py +++ b/python/paddle/distributed/launch/main.py @@ -295,6 +295,7 @@ def launch(): elif ctx.is_auto_tuner_mode(): import copy import json + import signal import sys import time @@ -343,7 +344,6 @@ def launch(): # build AutoTuner to get new config auto_tuner = AutoTuner(tuner_cfg) cur_cfg = auto_tuner.search_once() - auto_tuner.add_cfg(cur_cfg) # get max time per task run max_time_per_task = tuner_cfg.get("max_time_per_task", 1800) @@ -416,6 +416,9 @@ def launch(): ) ) c = controllers.init(ctx) + # set per task timeout + signal.signal(signal.SIGALRM, c.not_exit_signal_handler) + signal.alarm(max_time_per_task) c.run() # process generated result @@ -455,7 +458,6 @@ def launch(): # generate a new config new_cfg = auto_tuner.search_once() cur_cfg = copy.deepcopy(new_cfg) - auto_tuner.add_cfg(cur_cfg) # per task launch interval time.sleep(3) @@ -463,7 +465,6 @@ def launch(): # get best config to run best_cfg = None - ctx = copy.deepcopy(raw_ctx) if nnodes > 1: import socket @@ -512,8 +513,8 @@ def launch(): end_time = time.time() ctx.logger.info(f"AutoTuner ends in {end_time-start_time}s.") # launch best cfg + ctx.status._current_status = None new_args = gen_new_args(raw_args, best_cfg, tuner_cfg) - ctx.run_best = True ctx.args.training_script_args = new_args ctx.args.job_id = "best_cfg" ctx.logger.info(f"Launch best cfg from auto tuner: {best_cfg}") From 475504c937e970ed16f8b066d249bbc1d6a60d0b Mon Sep 17 00:00:00 2001 From: caozhou Date: Fri, 30 Jun 2023 03:14:37 +0000 Subject: [PATCH 11/18] best no need alarm --- python/paddle/distributed/launch/controllers/controller.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/paddle/distributed/launch/controllers/controller.py b/python/paddle/distributed/launch/controllers/controller.py index 03870dc5b24ca..d6d8499433c79 100644 --- a/python/paddle/distributed/launch/controllers/controller.py +++ b/python/paddle/distributed/launch/controllers/controller.py @@ -40,6 +40,8 @@ def __init__(self, ctx): # set per task timeout signal.signal(signal.SIGALRM, self.not_exit_signal_handler) signal.alarm(ctx.max_time_per_task) + else: + signal.alarm(0) self.ctx = ctx self.master = Master.factory(self.ctx) From b320a8a48ce9a464487bdd4fb9f5dff42bfbaf53 Mon Sep 17 00:00:00 2001 From: tangjingqi Date: Mon, 17 Jul 2023 10:21:21 +0800 Subject: [PATCH 12/18] add gbs search, add gpu memory to history csv, add memory detect --- python/paddle/distributed/auto_tuner/prune.py | 2 + .../paddle/distributed/auto_tuner/recorder.py | 10 +- .../paddle/distributed/auto_tuner/search.py | 8 +- python/paddle/distributed/auto_tuner/tuner.py | 6 +- python/paddle/distributed/auto_tuner/utils.py | 169 +++++++++--------- python/paddle/distributed/launch/main.py | 154 ++++++++++++++-- 6 files changed, 245 insertions(+), 104 deletions(-) diff --git a/python/paddle/distributed/auto_tuner/prune.py b/python/paddle/distributed/auto_tuner/prune.py index 888b53ee6b2b6..6f6d549e50492 100644 --- a/python/paddle/distributed/auto_tuner/prune.py +++ b/python/paddle/distributed/auto_tuner/prune.py @@ -122,6 +122,8 @@ def prune_by_mbs(tuner_cfg, cur_cfg, history_cfgs=None): """ micro_batch_size = cur_cfg.get("micro_batch_size", None) global_batch_size = tuner_cfg["model_cfg"].get("global_batch_size", None) + if global_batch_size == "auto": + global_batch_size = cur_cfg["global_batch_size"] if global_batch_size: local_batch_size = ( global_batch_size diff --git a/python/paddle/distributed/auto_tuner/recorder.py b/python/paddle/distributed/auto_tuner/recorder.py index ad388a9bfe2f7..ad8847c8a0dc6 100644 --- a/python/paddle/distributed/auto_tuner/recorder.py +++ b/python/paddle/distributed/auto_tuner/recorder.py @@ -19,7 +19,7 @@ import pandas as pd -class History_recorder: +class HistoryRecorder: # NOTE increase extenable ablitity def __init__(self) -> None: self.history = [] @@ -63,7 +63,9 @@ def store_history(self, path="./history.csv"): cols = df.columns.tolist() cols.insert(0, cols.pop(cols.index('job_id'))) df = df.reindex(columns=cols) - df = df.drop(columns=['time']) + # check if 'time' exists + if 'time' in df.columns: + df = df.drop(columns=['time']) # write to csv df.to_csv(self.store_path, index=False) @@ -79,3 +81,7 @@ def load_history(self, path="./history.csv") -> Tuple[list, bool]: reader = csv.reader(f) self.history = list(reader) return (self.history, err) + + def clean_history(self) -> None: + """Clean history.""" + self.history = [] diff --git a/python/paddle/distributed/auto_tuner/search.py b/python/paddle/distributed/auto_tuner/search.py index 8539a1a952fc0..f3b4960da1c91 100644 --- a/python/paddle/distributed/auto_tuner/search.py +++ b/python/paddle/distributed/auto_tuner/search.py @@ -16,7 +16,7 @@ from abc import ABC, abstractmethod from .prune import _PRUNE_FUNC -from .utils import search_all, GBS_search_all +from .utils import gbs_search_all, search_all class SearchAlgo(ABC): @@ -53,6 +53,7 @@ def search_once(self, history_cfgs): return None return new_cfg + class GBSSearch(SearchAlgo): def __init__(self, tuner_cfg): super().__init__(tuner_cfg) @@ -66,8 +67,9 @@ def search_once(self, history_cfgs): if self.idx < len(self.all_tasks): new_cfg = self.all_tasks[self.idx] self.idx += 1 + glb = new_cfg.get("global_batch_size", None) + self.tuner_cfg["model_cfg"]["global_batch_size"] = glb stop = not self.prune(self.tuner_cfg, new_cfg, history_cfgs) else: return None - return new_cfg - \ No newline at end of file + return new_cfg diff --git a/python/paddle/distributed/auto_tuner/tuner.py b/python/paddle/distributed/auto_tuner/tuner.py index 981616254ad30..bdc6bed5c6a08 100644 --- a/python/paddle/distributed/auto_tuner/tuner.py +++ b/python/paddle/distributed/auto_tuner/tuner.py @@ -13,7 +13,7 @@ # limitations under the License. -from .utils import default_candidates, GBS_default_candidates +from .utils import default_candidates, gbs_default_candidates class AutoTuner: @@ -29,17 +29,17 @@ def __init__(self, tuner_cfg): self.cur_task_id = 1 self.task_limit = tuner_cfg.get("task_limit", 100) - tuner_cfg["candidates"] = default_candidates(tuner_cfg) search_algo = tuner_cfg.get("search_algo", "grid") if search_algo == "grid": from .search import GridSearch + tuner_cfg["candidates"] = default_candidates(tuner_cfg) self.algo = GridSearch(tuner_cfg) elif search_algo == "gbs": from .search import GBSSearch - tuner_cfg["candidates"] = default_candidates(tuner_cfg) + tuner_cfg["candidates"] = gbs_default_candidates(tuner_cfg) self.algo = GBSSearch(tuner_cfg) else: raise NotImplementedError() diff --git a/python/paddle/distributed/auto_tuner/utils.py b/python/paddle/distributed/auto_tuner/utils.py index 445f08ad8722a..9c70bc08070b8 100644 --- a/python/paddle/distributed/auto_tuner/utils.py +++ b/python/paddle/distributed/auto_tuner/utils.py @@ -13,6 +13,7 @@ # limitations under the License. import copy +import csv import itertools import os import re @@ -320,7 +321,7 @@ def gen_new_args(raw_args, cfg, tuner_cfg): return res_args -def read_log( +def read_metric_log( path, file="workerlog.0", target_metric='step/s' ) -> Tuple[float, bool]: """For extracting metric from log file.""" @@ -332,14 +333,19 @@ def read_log( re_metric_pattern = ( target_metric + r":* *(\d+(\.\d*)?)|(\d+(\.\d*)?) *" + target_metric ) - + re_out_of_memory_pattern = r"out of memory" + out_of_memory_flag = False metric_list = [] lines = f.readlines() for line in lines: metric = re.findall(re_metric_pattern, line) + out_of_memory = re.findall(re_out_of_memory_pattern, line) if metric: metric_list.append(float(metric[0][0])) - if not metric_list: + if out_of_memory: + out_of_memory_flag = True + + if not metric_list or out_of_memory_flag: metric_ave = 0.0 flag = True elif len(metric_list) < 10: @@ -356,6 +362,32 @@ def read_log( res = metric_ave, flag return res + +def read_memory_log(path, file) -> Tuple[float, bool]: + log_path = os.path.join(path, file) + if not os.path.exists(log_path): + return (0.0, True) + memory_used = [] + utilization_gpu = [] + indexs = [] + + with open(log_path, 'r') as f: + reader = csv.reader(f) + + # skip headers + while next(reader)[1] != 'utilization_gpu': + next(reader) + for row in reader: + # If row length is 6 then it's a utilization data row + # skip header + if len(row) == 6: + index, util_gpu, _, mem_used, _, _ = row + indexs.append(int(index)) + memory_used.append(int(mem_used)) + utilization_gpu.append(int(util_gpu)) + return max(memory_used), False + + def three_mul_combinations(target): """Return the combinations of three numbers which product is target.""" results = [] @@ -366,95 +398,55 @@ def three_mul_combinations(target): results.append((i, j, target // i // j)) return results -def GBS_dp_mp_pp_candidates(tuner_cfg, num_gpus, num_nodes): + +def gbs_dp_mp_pp_candidates(tuner_cfg, num_gpus, num_nodes): """Return middle candidates of dp, mp, pp""" - res = three_mul_combinations(num_gpus) - return res[len(res) // 2 ] - -def GBS_default_candidates(tuner_cfg): + + start = round(num_gpus ** (1 / 3)) + + # find factors that can be evenly distributed + for i in range(start, 0, -1): + if num_gpus % i == 0: + remaining = num_gpus // i + # find the square root as a factor for the remaining part + j = round(remaining**0.5) + while remaining % j != 0: + j -= 1 + return i, j, remaining // j + + raise ValueError("Cannot distribute GPUs equally") + + +def gbs_default_candidates(tuner_cfg): """Return the default candidates of every hyper param which user defined auto""" candidates = {} + num_gpus = tuner_cfg["num_gpus"] + num_nodes = tuner_cfg["nodes"] assert num_gpus > 0 - if tuner_cfg["model_cfg"]["global_batch_size"] == "auto": - num_gpus = tuner_cfg["num_gpus"] - num_nodes = tuner_cfg["nodes"] - dp_candidate, mp_candidate, pp_candidate = GBS_dp_mp_pp_candidates(tuner_cfg, num_gpus, num_nodes) - candidates["dp_degree"] = [dp_candidate] + global_batch_size = tuner_cfg.get("model_cfg", {}).get( + "global_batch_size", "auto" + ) + if global_batch_size == "auto": + dp_candidate, mp_candidate, pp_candidate = gbs_dp_mp_pp_candidates( + tuner_cfg, num_gpus, num_nodes + ) + sharding_dgree_candidate = dp_candidate + candidates["dp_degree"] = [1] candidates["mp_degree"] = [mp_candidate] candidates["pp_degree"] = [pp_candidate] - candidates["sharding_degree"] = [16] + candidates["sharding_degree"] = [sharding_dgree_candidate] candidates["sharding_stage"] = [1] candidates["use_recompute"] = [True] - candidates["recompute_granularity"] = ["full"] - candidates["micro_batch_size"] = [i for i in range(1, 16)] - tuner_cfg["model_cfg"]["global_batch_size"] = [pp_candidate * dp_candidate * e for e in candidates["micro_batch_size"]] - else: - if tuner_cfg.get("dp_degree", None) == "auto": - candidates["dp_degree"] = dist_degree("dp", num_gpus, num_nodes) - elif tuner_cfg.get("dp_degree", None): - candidates["dp_degree"] = tuner_cfg.get("dp_degree") - else: - candidates["dp_degree"] = [1] - - if tuner_cfg.get("mp_degree", None) == "auto": - candidates["mp_degree"] = dist_degree("mp", num_gpus, num_nodes) - elif tuner_cfg.get("mp_degree", None): - candidates["mp_degree"] = tuner_cfg.get("mp_degree") - else: - candidates["mp_degree"] = [1] - - if tuner_cfg.get("pp_degree", None) == "auto": - candidates["pp_degree"] = dist_degree("pp", num_gpus, num_nodes) - elif tuner_cfg.get("pp_degree", None): - candidates["pp_degree"] = tuner_cfg.get("pp_degree") - else: - candidates["pp_degree"] = [1] - - if tuner_cfg.get("sharding_degree", None) == "auto": - candidates["sharding_degree"] = dist_degree( - "sharding", num_gpus, num_nodes - ) - elif tuner_cfg.get("sharding_degree", None): - candidates["sharding_degree"] = tuner_cfg.get("sharding_degree") - else: - candidates["sharding_degree"] = [1] - - if tuner_cfg.get("sharding_stage", None) == "auto": - candidates["sharding_stage"] = [1, 2, 3] - elif tuner_cfg.get("sharding_stage", None): - candidates["sharding_stage"] = tuner_cfg.get("sharding_stage") - else: - candidates["sharding_stage"] = [None] - - if tuner_cfg.get("use_recompute", None) == "auto": - candidates["use_recompute"] = [False, True] - elif tuner_cfg.get("use_recompute", None): - candidates["use_recompute"] = tuner_cfg.get("use_recompute") - else: - candidates["use_recompute"] = [None] - - if tuner_cfg.get("recompute_granularity", None) == "auto": - candidates["recompute_granularity"] = ["full_attn", "full"] - elif tuner_cfg.get("recompute_granularity", None): - candidates["recompute_granularity"] = tuner_cfg.get( - "recompute_granularity" - ) - else: - candidates["recompute_granularity"] = [None] - - if tuner_cfg.get("micro_batch_size", None) == "auto": - candidates["micro_batch_size"] = list( - range(tuner_cfg["model_cfg"]["global_batch_size"], 0, -1) - ) - elif tuner_cfg.get("micro_batch_size", None): - candidates["micro_batch_size"] = tuner_cfg.get("micro_batch_size") - else: - candidates["micro_batch_size"] = [ - tuner_cfg["model_cfg"]["global_batch_size"] - ] + candidates["recompute_granularity"] = ["full"] + candidates["micro_batch_size"] = [2**i for i in range(0, 10)] + candidates["global_batch_size"] = [ + pp_candidate * dp_candidate * e + for e in candidates["micro_batch_size"] + ] return candidates -def GBS_search_all(tuner_cfg): + +def gbs_search_all(tuner_cfg): """Permutate the candidates of all hyper params.""" candidates = tuner_cfg["candidates"] # Order: dp -> mp -> pp -> mbs -> sharding-> recompute @@ -466,7 +458,7 @@ def GBS_search_all(tuner_cfg): sharding_degree_candidates = candidates["sharding_degree"] use_recompute_candidates = candidates["use_recompute"] recompute_granularity_candidates = candidates["recompute_granularity"] - gbs_candidates = candidates["global_batch_size"] + # gbs_candidates = candidates["global_batch_size"] all_cfgs = list( itertools.product( dp_degree_candidates, @@ -477,7 +469,7 @@ def GBS_search_all(tuner_cfg): sharding_stage_candidates, use_recompute_candidates, recompute_granularity_candidates, - gbs_candidates, + # gbs_candidates, ) ) mapping = { @@ -489,12 +481,17 @@ def GBS_search_all(tuner_cfg): 4: "sharding_degree", 6: "use_recompute", 7: "recompute_granularity", - 8: "global_batch_size", + # 8: "global_batch_size", } new_all_cfgs = [] for cfg in all_cfgs: new_cfg = {} for idx, val in enumerate(cfg): new_cfg[mapping[idx]] = val + new_cfg["global_batch_size"] = ( + new_cfg["pp_degree"] + * new_cfg["dp_degree"] + * new_cfg["micro_batch_size"] + ) new_all_cfgs.append(new_cfg) - return new_all_cfgs \ No newline at end of file + return new_all_cfgs diff --git a/python/paddle/distributed/launch/main.py b/python/paddle/distributed/launch/main.py index 901fc260df904..27fbcf70c2235 100644 --- a/python/paddle/distributed/launch/main.py +++ b/python/paddle/distributed/launch/main.py @@ -299,9 +299,13 @@ def launch(): import sys import time - from ..auto_tuner.recorder import History_recorder + from ..auto_tuner.recorder import HistoryRecorder from ..auto_tuner.tuner import AutoTuner - from ..auto_tuner.utils import gen_new_args, read_log + from ..auto_tuner.utils import ( + gen_new_args, + read_memory_log, + read_metric_log, + ) from . import controllers start_time = time.time() @@ -341,10 +345,13 @@ def launch(): client = etcd3.client(host=master_ip, port=port) client.delete("best_cfg") +<<<<<<< HEAD # build AutoTuner to get new config auto_tuner = AutoTuner(tuner_cfg) cur_cfg = auto_tuner.search_once() +======= +>>>>>>> 80c6b1768c (add gbs search, add gpu memory to history csv, add memory detect) # get max time per task run max_time_per_task = tuner_cfg.get("max_time_per_task", 1800) ctx.max_time_per_task = max_time_per_task @@ -358,9 +365,10 @@ def launch(): is_first_task = True # build history recorder - recorder = History_recorder() + recorder = HistoryRecorder() job_id = 0 +<<<<<<< HEAD if tuner_cfg.get("global_batch_size") == "auto": # search and set global batch size # adjust micron batch with fixed dp mp pp @@ -374,12 +382,127 @@ def launch(): task_job_id, log_dir, cur_cfg ) ) - c = controllers.init(ctx) - # set per task timeout - signal.signal(signal.SIGALRM, c.not_exit_signal_handler) - signal.alarm(max_time_per_task) - c.run() - # Process generated result +======= + ctx.args.max_restart = -1 + raw_ctx = copy.deepcopy(ctx) + + # gbs search + if ( + tuner_cfg.get('model_cfg', {}).get('global_batch_size', 'auto') + == "auto" + ): + # adjust micron batch size until out of memory to get best global batch size + gbs_tuner_cfg = copy.deepcopy(tuner_cfg) + gbs_tuner_cfg["search_algo"] = "gbs" + gbs_tuner = AutoTuner(gbs_tuner_cfg) + + gbs_cur_cfg = gbs_tuner.search_once() + best_gbs = None + while gbs_cur_cfg: + ctx = copy.deepcopy(raw_ctx) + log_dir = "GBSSearch/GBS{}_DP{}_MP{}_PP{}_Sharding_degree_{}_stage_{}_MBS_{}_Recompute_{}_granularity_{}".format( + gbs_cur_cfg["global_batch_size"], + gbs_cur_cfg["dp_degree"], + gbs_cur_cfg["mp_degree"], + gbs_cur_cfg["pp_degree"], + gbs_cur_cfg["sharding_degree"], + gbs_cur_cfg["sharding_stage"], + gbs_cur_cfg["micro_batch_size"], + gbs_cur_cfg["use_recompute"], + gbs_cur_cfg["recompute_granularity"], +>>>>>>> 80c6b1768c (add gbs search, add gpu memory to history csv, add memory detect) + ) + ctx.args.log_dir = log_dir + + # every task has own job id + job_id += 1 + task_job_id = "gbs_tuner_" + str(job_id) + ctx.args.job_id = task_job_id + + # generate script args of task + gbs_new_args = gen_new_args( + raw_args, gbs_cur_cfg, gbs_tuner_cfg + ) + ctx.args.training_script_args = gbs_new_args + + # launch task + ctx.logger.info( + "Launch task from auto tuner: job_id {}, log_dir {}, config {}".format( + task_job_id, log_dir, gbs_cur_cfg + ) + ) + c = controllers.init(ctx) + c.run() + + # process generated result + # TODO diffentiate out of memory and no loss(maybe over time) + # TODO integragte memory and metric read + mem, err = read_memory_log( + path=ctx.args.log_dir, file=f"{ctx.args.job_id}.gpu.log" + ) + if err: + ctx.logger.warning( + f"Read memory log failed for parameters: {log_dir}" + ) + gbs_cur_cfg["max_mem_usage"] = None + else: + gbs_cur_cfg["max_mem_usage"] = mem + + metric, err = read_metric_log( + path=ctx.args.log_dir, + file="workerlog.0", + target_metric=gbs_tuner_cfg["metric_cfg"]["name"], + ) + if err: + ctx.logger.warning( + f"Read metric log failed for parameters: {log_dir}, there may be over time or out of memory." + ) + # for pruner use + gbs_cur_cfg['time'] = -1 + gbs_cur_cfg[gbs_tuner_cfg['metric_cfg']['name']] = None + gbs_cur_cfg["max_mem_usage"] = "out of memory or over time" + # end gbs search + break + else: + # for pruner use + gbs_cur_cfg['time'] = metric + gbs_cur_cfg[gbs_tuner_cfg['metric_cfg']['name']] = metric + + # store and update args for next round + gbs_cur_cfg["job_id"] = job_id + best_gbs = gbs_cur_cfg["global_batch_size"] + recorder.add_cfg(**gbs_cur_cfg) + c.finalize(exit=False) + recorder.store_history("./tuner_gbs_history.csv") + + # new cfgs for next round + gbs_new_cfg = gbs_tuner.search_once() + gbs_cur_cfg = copy.deepcopy(gbs_new_cfg) + gbs_tuner.add_cfg(gbs_cur_cfg) + + # per task launch interval + time.sleep(3) + # prevent no valid global batch size found + if best_gbs is None: + raise ValueError( + "No valid global batch size found, check memory or valid search time. cur_tuner_cfg{}".format( + gbs_tuner_cfg + ) + ) + # set best global batch size to tuner cfg + tuner_cfg["model_cfg"]["global_batch_size"] = best_gbs + + recorder.store_history("./tuner_gbs_history.csv") + recorder.clean_history() + + end_time = time.time() + ctx.logger.info( + f"AtuoTuner for GBS search ends in {end_time-start_time}s." + ) + # build AutoTuner to get new config + auto_tuner = AutoTuner(tuner_cfg) + cur_cfg = auto_tuner.search_once() + auto_tuner.add_cfg(cur_cfg) while cur_cfg: ctx = copy.deepcopy(raw_ctx) @@ -422,7 +545,7 @@ def launch(): c.run() # process generated result - metric, err = read_log( + metric, err = read_metric_log( path=ctx.args.log_dir, file="workerlog.0", target_metric=tuner_cfg["metric_cfg"]["name"], @@ -436,7 +559,18 @@ def launch(): # for pruner use cur_cfg['time'] = metric cur_cfg[tuner_cfg['metric_cfg']['name']] = metric +<<<<<<< HEAD +======= + mem, err = read_memory_log( + path=ctx.args.log_dir, file=f"{ctx.args.job_id}.gpu.log" + ) + if err: + ctx.logger.warning(f"Read log failed for parameters: {log_dir}") + break + else: + cur_cfg["max_mem_usage"] = mem +>>>>>>> 80c6b1768c (add gbs search, add gpu memory to history csv, add memory detect) # record history cur_cfg['job_id'] = job_id recorder.add_cfg(**cur_cfg) From 0c3e63736b3af460b680a831e068b99dc39de74c Mon Sep 17 00:00:00 2001 From: tangjingqi Date: Mon, 17 Jul 2023 10:44:28 +0800 Subject: [PATCH 13/18] fix bug --- .../paddle/distributed/auto_tuner/search.py | 4 +- python/paddle/distributed/auto_tuner/utils.py | 2 +- .../launch/controllers/controller.py | 13 ++++--- python/paddle/distributed/launch/main.py | 37 ++----------------- 4 files changed, 14 insertions(+), 42 deletions(-) diff --git a/python/paddle/distributed/auto_tuner/search.py b/python/paddle/distributed/auto_tuner/search.py index f3b4960da1c91..0e0114a5249f0 100644 --- a/python/paddle/distributed/auto_tuner/search.py +++ b/python/paddle/distributed/auto_tuner/search.py @@ -58,8 +58,8 @@ class GBSSearch(SearchAlgo): def __init__(self, tuner_cfg): super().__init__(tuner_cfg) self.idx = 0 - self.all_tasks = GBS_search_all(tuner_cfg) - + self.all_tasks = gbs_search_all(tuner_cfg) + def search_once(self, history_cfgs): new_cfg = None stop = False diff --git a/python/paddle/distributed/auto_tuner/utils.py b/python/paddle/distributed/auto_tuner/utils.py index 9c70bc08070b8..61ebbe4906855 100644 --- a/python/paddle/distributed/auto_tuner/utils.py +++ b/python/paddle/distributed/auto_tuner/utils.py @@ -437,7 +437,7 @@ def gbs_default_candidates(tuner_cfg): candidates["sharding_degree"] = [sharding_dgree_candidate] candidates["sharding_stage"] = [1] candidates["use_recompute"] = [True] - candidates["recompute_granularity"] = ["full"] + candidates["recompute_granularity"] = [None] candidates["micro_batch_size"] = [2**i for i in range(0, 10)] candidates["global_batch_size"] = [ pp_candidate * dp_candidate * e diff --git a/python/paddle/distributed/launch/controllers/controller.py b/python/paddle/distributed/launch/controllers/controller.py index d6d8499433c79..9769ec9d6bf3f 100644 --- a/python/paddle/distributed/launch/controllers/controller.py +++ b/python/paddle/distributed/launch/controllers/controller.py @@ -36,12 +36,13 @@ def __init__(self, ctx): signal.signal(signal.SIGTERM, self.signal_handler) signal.signal(signal.SIGABRT, self.signal_handler) signal.signal(signal.SIGINT, self.signal_handler) - if ctx.is_auto_tuner_mode() and not ctx.run_best: - # set per task timeout - signal.signal(signal.SIGALRM, self.not_exit_signal_handler) - signal.alarm(ctx.max_time_per_task) - else: - signal.alarm(0) + if ctx.is_auto_tuner_mode(): + if not ctx.run_best: + # set per task timeout + signal.signal(signal.SIGALRM, self.not_exit_signal_handler) + signal.alarm(ctx.max_time_per_task) + else: + signal.alarm(0) self.ctx = ctx self.master = Master.factory(self.ctx) diff --git a/python/paddle/distributed/launch/main.py b/python/paddle/distributed/launch/main.py index 27fbcf70c2235..c7466dfe9557b 100644 --- a/python/paddle/distributed/launch/main.py +++ b/python/paddle/distributed/launch/main.py @@ -295,7 +295,6 @@ def launch(): elif ctx.is_auto_tuner_mode(): import copy import json - import signal import sys import time @@ -345,13 +344,6 @@ def launch(): client = etcd3.client(host=master_ip, port=port) client.delete("best_cfg") -<<<<<<< HEAD - # build AutoTuner to get new config - auto_tuner = AutoTuner(tuner_cfg) - cur_cfg = auto_tuner.search_once() - -======= ->>>>>>> 80c6b1768c (add gbs search, add gpu memory to history csv, add memory detect) # get max time per task run max_time_per_task = tuner_cfg.get("max_time_per_task", 1800) ctx.max_time_per_task = max_time_per_task @@ -368,21 +360,6 @@ def launch(): recorder = HistoryRecorder() job_id = 0 -<<<<<<< HEAD - if tuner_cfg.get("global_batch_size") == "auto": - # search and set global batch size - # adjust micron batch with fixed dp mp pp - GBS_cur_cfg, GBS_cur_cfg = micron_search(cur_cfg, tuner_cfg) - # generate script args of task - new_args = gen_new_args(raw_args, cur_cfg, tuner_cfg) - ctx.args.training_script_args = new_args - # launch task - ctx.logger.info( - "Launch task from auto tuner: job_id {}, log_dir {}, config {}".format( - task_job_id, log_dir, cur_cfg - ) - ) -======= ctx.args.max_restart = -1 raw_ctx = copy.deepcopy(ctx) @@ -410,7 +387,6 @@ def launch(): gbs_cur_cfg["micro_batch_size"], gbs_cur_cfg["use_recompute"], gbs_cur_cfg["recompute_granularity"], ->>>>>>> 80c6b1768c (add gbs search, add gpu memory to history csv, add memory detect) ) ctx.args.log_dir = log_dir @@ -455,7 +431,7 @@ def launch(): ) if err: ctx.logger.warning( - f"Read metric log failed for parameters: {log_dir}, there may be over time or out of memory." + f"Read metric log failed for parameters: {log_dir}, it may be over time or out of memory." ) # for pruner use gbs_cur_cfg['time'] = -1 @@ -539,9 +515,6 @@ def launch(): ) ) c = controllers.init(ctx) - # set per task timeout - signal.signal(signal.SIGALRM, c.not_exit_signal_handler) - signal.alarm(max_time_per_task) c.run() # process generated result @@ -559,8 +532,6 @@ def launch(): # for pruner use cur_cfg['time'] = metric cur_cfg[tuner_cfg['metric_cfg']['name']] = metric -<<<<<<< HEAD -======= mem, err = read_memory_log( path=ctx.args.log_dir, file=f"{ctx.args.job_id}.gpu.log" @@ -570,7 +541,6 @@ def launch(): break else: cur_cfg["max_mem_usage"] = mem ->>>>>>> 80c6b1768c (add gbs search, add gpu memory to history csv, add memory detect) # record history cur_cfg['job_id'] = job_id recorder.add_cfg(**cur_cfg) @@ -592,6 +562,7 @@ def launch(): # generate a new config new_cfg = auto_tuner.search_once() cur_cfg = copy.deepcopy(new_cfg) + auto_tuner.add_cfg(cur_cfg) # per task launch interval time.sleep(3) @@ -599,6 +570,7 @@ def launch(): # get best config to run best_cfg = None + ctx = copy.deepcopy(raw_ctx) if nnodes > 1: import socket @@ -643,12 +615,11 @@ def launch(): ) assert best_cfg - end_time = time.time() ctx.logger.info(f"AutoTuner ends in {end_time-start_time}s.") # launch best cfg - ctx.status._current_status = None new_args = gen_new_args(raw_args, best_cfg, tuner_cfg) + ctx.run_best = True ctx.args.training_script_args = new_args ctx.args.job_id = "best_cfg" ctx.logger.info(f"Launch best cfg from auto tuner: {best_cfg}") From 3f0d76cb7527f1e781c8ca52446c453e4af7c381 Mon Sep 17 00:00:00 2001 From: tangjingqi Date: Tue, 25 Jul 2023 16:54:38 +0800 Subject: [PATCH 14/18] fix memory read bug; fix etcd connection bug --- python/paddle/distributed/auto_tuner/utils.py | 11 +++++++---- .../distributed/launch/controllers/master.py | 19 +++++++++++++++++-- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/python/paddle/distributed/auto_tuner/utils.py b/python/paddle/distributed/auto_tuner/utils.py index 61ebbe4906855..b4ac013bb68d5 100644 --- a/python/paddle/distributed/auto_tuner/utils.py +++ b/python/paddle/distributed/auto_tuner/utils.py @@ -373,10 +373,13 @@ def read_memory_log(path, file) -> Tuple[float, bool]: with open(log_path, 'r') as f: reader = csv.reader(f) - + flag = False # skip headers - while next(reader)[1] != 'utilization_gpu': - next(reader) + while not flag: + # show the first line of reader + row = next(reader) + if len(row) == 6 and 'memory_used' in row: + flag = True for row in reader: # If row length is 6 then it's a utilization data row # skip header @@ -436,7 +439,7 @@ def gbs_default_candidates(tuner_cfg): candidates["pp_degree"] = [pp_candidate] candidates["sharding_degree"] = [sharding_dgree_candidate] candidates["sharding_stage"] = [1] - candidates["use_recompute"] = [True] + candidates["use_recompute"] = [False] candidates["recompute_granularity"] = [None] candidates["micro_batch_size"] = [2**i for i in range(0, 10)] candidates["global_batch_size"] = [ diff --git a/python/paddle/distributed/launch/controllers/master.py b/python/paddle/distributed/launch/controllers/master.py index ccc54d4580d45..4707ae639aedd 100644 --- a/python/paddle/distributed/launch/controllers/master.py +++ b/python/paddle/distributed/launch/controllers/master.py @@ -252,8 +252,23 @@ def register_heartbeat(self, job_id, pod_id, ttl=10): self.job_prefix = f'/paddle/{job_id}' self.heartbeat_prefix = f'{self.job_prefix}/heartbeat' - - lease = self.client.lease(ttl) + if self.ctx.is_auto_tuner_mode(): + delete_success = False + while not delete_success: + try: + self.client.delete_prefix(self.job_prefix) + delete_success = True + except: + time.sleep(1) + + if self.ctx.is_auto_tuner_mode(): + lease_success = False + while not lease_success: + try: + lease = self.client.lease(ttl) + lease_success = True + except: + time.sleep(1) # self.client.delete_prefix(self.job_prefix) From e0d3c0e40f0745b9b916eab277d6d3fc6c537cf2 Mon Sep 17 00:00:00 2001 From: tangjingqi Date: Wed, 2 Aug 2023 14:06:14 +0800 Subject: [PATCH 15/18] fix memory read bug, add oom detection for all ranks --- python/paddle/distributed/auto_tuner/utils.py | 34 +++++++++++- python/paddle/distributed/launch/main.py | 53 +++++++++++-------- 2 files changed, 64 insertions(+), 23 deletions(-) diff --git a/python/paddle/distributed/auto_tuner/utils.py b/python/paddle/distributed/auto_tuner/utils.py index b4ac013bb68d5..75954aacffc27 100644 --- a/python/paddle/distributed/auto_tuner/utils.py +++ b/python/paddle/distributed/auto_tuner/utils.py @@ -333,13 +333,15 @@ def read_metric_log( re_metric_pattern = ( target_metric + r":* *(\d+(\.\d*)?)|(\d+(\.\d*)?) *" + target_metric ) - re_out_of_memory_pattern = r"out of memory" + re_out_of_memory_pattern = r"Out of memory" out_of_memory_flag = False metric_list = [] lines = f.readlines() for line in lines: metric = re.findall(re_metric_pattern, line) - out_of_memory = re.findall(re_out_of_memory_pattern, line) + out_of_memory = re.findall( + re_out_of_memory_pattern, line, re.IGNORECASE + ) if metric: metric_list.append(float(metric[0][0])) if out_of_memory: @@ -391,6 +393,34 @@ def read_memory_log(path, file) -> Tuple[float, bool]: return max(memory_used), False +def read_log( + path, + metric_file="workerlog.0", + target_metric='step/s', + memory_file="0.gpu.log", +) -> Tuple[float, float, int]: + """extract metric and out of memory from log file + return: + metric: average metric of last 10 steps + memory: max memory used + err_code: 00: no error, 01: no metric, 10: out of memory + """ + err_code = 0 + # check all workerlog files in target path + for root, dirs, files in os.walk(path): + for file in files: + metric, metric_flag = read_metric_log(path, file, target_metric) + if metric_flag: + err_code = metric_flag | err_code + if file == metric_file: + res_metric = metric + + res_memory, memory_flag = read_memory_log(path, memory_file) + if memory_flag: + err_code = (memory_flag << 1) | err_code + return res_metric, res_memory, err_code + + def three_mul_combinations(target): """Return the combinations of three numbers which product is target.""" results = [] diff --git a/python/paddle/distributed/launch/main.py b/python/paddle/distributed/launch/main.py index c7466dfe9557b..b9354ece09199 100644 --- a/python/paddle/distributed/launch/main.py +++ b/python/paddle/distributed/launch/main.py @@ -300,11 +300,7 @@ def launch(): from ..auto_tuner.recorder import HistoryRecorder from ..auto_tuner.tuner import AutoTuner - from ..auto_tuner.utils import ( - gen_new_args, - read_memory_log, - read_metric_log, - ) + from ..auto_tuner.utils import gen_new_args, read_log from . import controllers start_time = time.time() @@ -413,10 +409,17 @@ def launch(): # process generated result # TODO diffentiate out of memory and no loss(maybe over time) # TODO integragte memory and metric read - mem, err = read_memory_log( - path=ctx.args.log_dir, file=f"{ctx.args.job_id}.gpu.log" + metric, mem, err = read_log( + path=ctx.args.log_dir, + metric_file="workerlog.0", + target_metric=tuner_cfg["metric_cfg"]["name"], + memory_file=f"{ctx.args.job_id}.gpu.log", ) - if err: + + # mem, err = read_memory_log( + # path=ctx.args.log_dir, file=f"{ctx.args.job_id}.gpu.log" + # ) + if err & (1 << 0): ctx.logger.warning( f"Read memory log failed for parameters: {log_dir}" ) @@ -424,12 +427,12 @@ def launch(): else: gbs_cur_cfg["max_mem_usage"] = mem - metric, err = read_metric_log( - path=ctx.args.log_dir, - file="workerlog.0", - target_metric=gbs_tuner_cfg["metric_cfg"]["name"], - ) - if err: + # metric, err = read_metric_log( + # path=ctx.args.log_dir, + # file="workerlog.0", + # target_metric=gbs_tuner_cfg["metric_cfg"]["name"], + # ) + if err & (1 << 1): ctx.logger.warning( f"Read metric log failed for parameters: {log_dir}, it may be over time or out of memory." ) @@ -518,12 +521,20 @@ def launch(): c.run() # process generated result - metric, err = read_metric_log( + + metric, mem, err = read_log( path=ctx.args.log_dir, - file="workerlog.0", + metric_file="workerlog.0", target_metric=tuner_cfg["metric_cfg"]["name"], + memory_file=f"{ctx.args.job_id}.gpu.log", ) - if err: + + # metric, err = read_metric_log( + # path=ctx.args.log_dir, + # file="workerlog.0", + # target_metric=tuner_cfg["metric_cfg"]["name"], + # ) + if err & (1 << 0): ctx.logger.warning(f"Read log failed for parameters: {log_dir}") # for pruner use cur_cfg['time'] = -1 @@ -533,10 +544,10 @@ def launch(): cur_cfg['time'] = metric cur_cfg[tuner_cfg['metric_cfg']['name']] = metric - mem, err = read_memory_log( - path=ctx.args.log_dir, file=f"{ctx.args.job_id}.gpu.log" - ) - if err: + # mem, err = read_memory_log( + # path=ctx.args.log_dir, file=f"{ctx.args.job_id}.gpu.log" + # ) + if err & (1 << 1): ctx.logger.warning(f"Read log failed for parameters: {log_dir}") break else: From a934d4811dfa6fe41972ef1692263fb97bacb294 Mon Sep 17 00:00:00 2001 From: tangjingqi Date: Wed, 2 Aug 2023 21:57:38 +0800 Subject: [PATCH 16/18] fix read log and oom detaction, add error code for read log --- python/paddle/distributed/auto_tuner/utils.py | 46 +++++++---- .../distributed/launch/controllers/master.py | 2 + python/paddle/distributed/launch/main.py | 76 +++++++++++-------- 3 files changed, 75 insertions(+), 49 deletions(-) diff --git a/python/paddle/distributed/auto_tuner/utils.py b/python/paddle/distributed/auto_tuner/utils.py index 75954aacffc27..33a52c51ea767 100644 --- a/python/paddle/distributed/auto_tuner/utils.py +++ b/python/paddle/distributed/auto_tuner/utils.py @@ -323,8 +323,16 @@ def gen_new_args(raw_args, cfg, tuner_cfg): def read_metric_log( path, file="workerlog.0", target_metric='step/s' -) -> Tuple[float, bool]: +) -> Tuple[float, int]: """For extracting metric from log file.""" + """ + return: + metric: average metric of last 10 steps + err_code: + 00: no error + 01: no metric + 10: out of memory + """ target_file = path + "/" + file if not os.path.exists(target_file): return (0.0, True) @@ -334,7 +342,8 @@ def read_metric_log( target_metric + r":* *(\d+(\.\d*)?)|(\d+(\.\d*)?) *" + target_metric ) re_out_of_memory_pattern = r"Out of memory" - out_of_memory_flag = False + out_of_memory_flag = 0 + err_code = 0 metric_list = [] lines = f.readlines() for line in lines: @@ -345,23 +354,23 @@ def read_metric_log( if metric: metric_list.append(float(metric[0][0])) if out_of_memory: - out_of_memory_flag = True + out_of_memory_flag = 1 - if not metric_list or out_of_memory_flag: + if out_of_memory_flag: + metric_ave = 0.0 + err_code = err_code | (out_of_memory_flag << 1) + if not metric_list: metric_ave = 0.0 - flag = True + err_code = err_code | 1 elif len(metric_list) < 10: metric_ave = metric_list[-1] - flag = False elif len(metric_list) < 20: metric_ave = sum(metric_list[9:]) / (len(metric_list[9:])) - flag = False else: metric_ave = sum(metric_list[-10:]) / 10 - flag = False # round to 5 decimal places metric_ave = round(metric_ave, 5) - res = metric_ave, flag + res = metric_ave, err_code return res @@ -399,25 +408,30 @@ def read_log( target_metric='step/s', memory_file="0.gpu.log", ) -> Tuple[float, float, int]: - """extract metric and out of memory from log file + """ + extract metric and max memory usage from log file return: metric: average metric of last 10 steps memory: max memory used - err_code: 00: no error, 01: no metric, 10: out of memory + err_code: 00: no error, 01: no metric, 10: out of memory, 100: no memory log """ err_code = 0 - # check all workerlog files in target path + # check out of memory for root, dirs, files in os.walk(path): for file in files: + if not file.startswith("workerlog"): + continue metric, metric_flag = read_metric_log(path, file, target_metric) if metric_flag: - err_code = metric_flag | err_code - if file == metric_file: - res_metric = metric + err_code = (metric_flag & 2) | err_code + # read metric + res_metric, metric_flag = read_metric_log(path, metric_file, target_metric) + err_code = metric_flag | err_code + # check max memory usage res_memory, memory_flag = read_memory_log(path, memory_file) if memory_flag: - err_code = (memory_flag << 1) | err_code + err_code = (memory_flag << 2) | err_code return res_metric, res_memory, err_code diff --git a/python/paddle/distributed/launch/controllers/master.py b/python/paddle/distributed/launch/controllers/master.py index 4707ae639aedd..e04ee59b24285 100644 --- a/python/paddle/distributed/launch/controllers/master.py +++ b/python/paddle/distributed/launch/controllers/master.py @@ -269,6 +269,8 @@ def register_heartbeat(self, job_id, pod_id, ttl=10): lease_success = True except: time.sleep(1) + else: + lease = self.client.lease(ttl) # self.client.delete_prefix(self.job_prefix) diff --git a/python/paddle/distributed/launch/main.py b/python/paddle/distributed/launch/main.py index b9354ece09199..bd77c84bbcc87 100644 --- a/python/paddle/distributed/launch/main.py +++ b/python/paddle/distributed/launch/main.py @@ -416,36 +416,40 @@ def launch(): memory_file=f"{ctx.args.job_id}.gpu.log", ) - # mem, err = read_memory_log( - # path=ctx.args.log_dir, file=f"{ctx.args.job_id}.gpu.log" - # ) if err & (1 << 0): ctx.logger.warning( - f"Read memory log failed for parameters: {log_dir}" + f"Read metric failed for parameters: {log_dir}" ) - gbs_cur_cfg["max_mem_usage"] = None - else: + # for pruner use + gbs_cur_cfg['time'] = -1 + gbs_cur_cfg[tuner_cfg['metric_cfg']['name']] = None gbs_cur_cfg["max_mem_usage"] = mem - # metric, err = read_metric_log( - # path=ctx.args.log_dir, - # file="workerlog.0", - # target_metric=gbs_tuner_cfg["metric_cfg"]["name"], - # ) if err & (1 << 1): ctx.logger.warning( - f"Read metric log failed for parameters: {log_dir}, it may be over time or out of memory." + f"Out of memory for parameters: {log_dir}" ) # for pruner use gbs_cur_cfg['time'] = -1 - gbs_cur_cfg[gbs_tuner_cfg['metric_cfg']['name']] = None - gbs_cur_cfg["max_mem_usage"] = "out of memory or over time" - # end gbs search - break - else: + gbs_cur_cfg[tuner_cfg['metric_cfg']['name']] = None + gbs_cur_cfg["max_mem_usage"] = "OOM" + + # not err & (1 << 1): do not record memory usage when out of memory + if err & (1 << 2) and not err & (1 << 1): + ctx.logger.warning( + f"Read memory usage failed for parameters: {log_dir}" + ) + gbs_cur_cfg["max_mem_usage"] = None + + if not err: # for pruner use gbs_cur_cfg['time'] = metric - gbs_cur_cfg[gbs_tuner_cfg['metric_cfg']['name']] = metric + gbs_cur_cfg[tuner_cfg['metric_cfg']['name']] = metric + gbs_cur_cfg["max_mem_usage"] = mem + + if err & (1 << 0) or err & (1 << 1): + # no metric or out of memory, end gbs search + break # store and update args for next round gbs_cur_cfg["job_id"] = job_id @@ -529,29 +533,35 @@ def launch(): memory_file=f"{ctx.args.job_id}.gpu.log", ) - # metric, err = read_metric_log( - # path=ctx.args.log_dir, - # file="workerlog.0", - # target_metric=tuner_cfg["metric_cfg"]["name"], - # ) if err & (1 << 0): - ctx.logger.warning(f"Read log failed for parameters: {log_dir}") + ctx.logger.warning( + f"Read metric failed for parameters: {log_dir}" + ) # for pruner use cur_cfg['time'] = -1 cur_cfg[tuner_cfg['metric_cfg']['name']] = None - else: + cur_cfg["max_mem_usage"] = mem + + if err & (1 << 1): + ctx.logger.warning(f"Out of memory for parameters: {log_dir}") + # for pruner use + cur_cfg['time'] = -1 + cur_cfg[tuner_cfg['metric_cfg']['name']] = None + cur_cfg["max_mem_usage"] = "OOM" + + # not err & (1 << 1): do not record memory usage when out of memory + if err & (1 << 2) and not err & (1 << 1): + ctx.logger.warning( + f"Read memory usage failed for parameters: {log_dir}" + ) + cur_cfg["max_mem_usage"] = None + + if not err: # for pruner use cur_cfg['time'] = metric cur_cfg[tuner_cfg['metric_cfg']['name']] = metric - - # mem, err = read_memory_log( - # path=ctx.args.log_dir, file=f"{ctx.args.job_id}.gpu.log" - # ) - if err & (1 << 1): - ctx.logger.warning(f"Read log failed for parameters: {log_dir}") - break - else: cur_cfg["max_mem_usage"] = mem + # record history cur_cfg['job_id'] = job_id recorder.add_cfg(**cur_cfg) From 6ffa8ce3a4d6a4fd08b68418df26d187f116025c Mon Sep 17 00:00:00 2001 From: tangjingqi Date: Sun, 6 Aug 2023 22:57:15 +0800 Subject: [PATCH 17/18] add unit test --- python/paddle/distributed/auto_tuner/utils.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/python/paddle/distributed/auto_tuner/utils.py b/python/paddle/distributed/auto_tuner/utils.py index 33a52c51ea767..43ac4bddf48e6 100644 --- a/python/paddle/distributed/auto_tuner/utils.py +++ b/python/paddle/distributed/auto_tuner/utils.py @@ -333,9 +333,10 @@ def read_metric_log( 01: no metric 10: out of memory """ + err_code = 0 target_file = path + "/" + file if not os.path.exists(target_file): - return (0.0, True) + return (0.0, 1) with open(target_file, "r") as f: # read file re_metric_pattern = ( @@ -343,7 +344,6 @@ def read_metric_log( ) re_out_of_memory_pattern = r"Out of memory" out_of_memory_flag = 0 - err_code = 0 metric_list = [] lines = f.readlines() for line in lines: @@ -429,9 +429,12 @@ def read_log( res_metric, metric_flag = read_metric_log(path, metric_file, target_metric) err_code = metric_flag | err_code # check max memory usage - res_memory, memory_flag = read_memory_log(path, memory_file) - if memory_flag: + try: + res_memory, memory_flag = read_memory_log(path, memory_file) err_code = (memory_flag << 2) | err_code + except: + res_memory = 0.0 + err_code = (1 << 2) | err_code return res_metric, res_memory, err_code From ed0a75110eac583f2be1384c46206c1e33bb1306 Mon Sep 17 00:00:00 2001 From: Azure <50126533+Azure-Tang@users.noreply.github.com> Date: Wed, 9 Aug 2023 10:56:16 +0800 Subject: [PATCH 18/18] Update master.py --- python/paddle/distributed/launch/controllers/master.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/paddle/distributed/launch/controllers/master.py b/python/paddle/distributed/launch/controllers/master.py index e0bccce06193b..e04ee59b24285 100644 --- a/python/paddle/distributed/launch/controllers/master.py +++ b/python/paddle/distributed/launch/controllers/master.py @@ -272,7 +272,6 @@ def register_heartbeat(self, job_id, pod_id, ttl=10): else: lease = self.client.lease(ttl) - # self.client.delete_prefix(self.job_prefix) beat_path = f"{self.heartbeat_prefix}/{pod_id}"