From 9cc0eb0522e2a375d877a083b1559e8281fbfcf3 Mon Sep 17 00:00:00 2001 From: eunwoosh Date: Fri, 19 May 2023 17:25:25 +0900 Subject: [PATCH 01/10] change pipe to queue --- otx/hpo/hpo_runner.py | 109 +++++++++++++++++++++++++++++------------- 1 file changed, 77 insertions(+), 32 deletions(-) diff --git a/otx/hpo/hpo_runner.py b/otx/hpo/hpo_runner.py index 855d92757e8..e949f11bf7e 100644 --- a/otx/hpo/hpo_runner.py +++ b/otx/hpo/hpo_runner.py @@ -17,9 +17,14 @@ import logging import multiprocessing import os +import queue +import time +import sys +import signal from copy import deepcopy from functools import partial from typing import Any, Callable, Dict, Literal, Optional, Union +from dataclasses import dataclass from otx.hpo.hpo_base import HpoBase, Trial, TrialStatus from otx.hpo.resource_manager import get_resource_manager @@ -27,6 +32,14 @@ logger = logging.getLogger(__name__) +@dataclass +class RunningTrial: + """Data class for a running trial""" + process: multiprocessing.Process + trial: Trial + queue: multiprocessing.Queue + + class HpoLoop: """HPO loop manager to run trials. @@ -54,12 +67,17 @@ def __init__( ): self._hpo_algo = hpo_algo self._train_func = train_func - self._running_trials: Dict[int, Dict] = {} + self._running_trials: Dict[int, RunningTrial] = {} self._mp = multiprocessing.get_context("spawn") + self._report_queue = self._mp.Queue() self._uid_index = 0 self._resource_manager = get_resource_manager( resource_type, num_parallel_trial, num_gpu_for_single_trial, available_gpu ) + self._main_pid = os.getpid() + + signal.signal(signal.SIGINT, self._terminate_signal_handler) + signal.signal(signal.SIGTERM, self._terminate_signal_handler) def run(self): """Run a HPO loop.""" @@ -73,6 +91,8 @@ def run(self): self._remove_finished_process() self._get_reports() + time.sleep(1) + logger.info("HPO loop is done.") self._get_reports() self._join_all_processes() @@ -90,56 +110,49 @@ def _start_trial_process(self, trial: Trial): for key, val in env.items(): os.environ[key] = val - pipe1, pipe2 = self._mp.Pipe(True) + trial_queue = self._mp.Queue() process = self._mp.Process( target=_run_train, args=( self._train_func, trial.get_train_configuration(), - partial(_report_score, pipe=pipe2, trial_id=trial.id), + partial(_report_score, recv_queue=trial_queue, send_queue=self._report_queue, uid=uid), ), ) os.environ = origin_env - self._running_trials[uid] = {"process": process, "trial": trial, "pipe": pipe1} + self._running_trials[uid] = RunningTrial(process, trial, trial_queue) process.start() def _remove_finished_process(self): trial_to_remove = [] - for uid, val in self._running_trials.items(): - process = val["process"] - if not process.is_alive(): - val["pipe"].close() - process.join() + for uid, trial in self._running_trials.items(): + if not trial.process.is_alive(): + trial.queue.close() + trial.process.join() trial_to_remove.append(uid) for uid in trial_to_remove: - trial = self._running_trials[uid]["trial"] - trial.status = TrialStatus.STOP + self._running_trials[uid].trial.status = TrialStatus.STOP self._resource_manager.release_resource(uid) del self._running_trials[uid] def _get_reports(self): - for trial in self._running_trials.values(): - pipe = trial["pipe"] - if pipe.poll(): - try: - report = pipe.recv() - except EOFError: - continue - trial_status = self._hpo_algo.report_score( - report["score"], report["progress"], report["trial_id"], report["done"] - ) - pipe.send(trial_status) + while not self._report_queue.empty(): + report = self._report_queue.get_nowait() + trial = self._running_trials[report["uid"]] + trial_status = self._hpo_algo.report_score( + report["score"], report["progress"], trial.trial.id, report["done"] + ) + trial.queue.put_nowait(trial_status) self._hpo_algo.save_results() def _join_all_processes(self): for val in self._running_trials.values(): - val["pipe"].close() + val.queue.close() for val in self._running_trials.values(): - process = val["process"] - process.join() + val.process.join() self._running_trials = {} @@ -148,6 +161,25 @@ def _get_uid(self) -> int: self._uid_index += 1 return uid + def _terminate_all_running_processes(self): + for trial in self._running_trials.values(): + process = trial.process + if process.is_alive(): + logger.warning(f"Kill child process {process.pid}") + process.kill() + + def _terminate_signal_handler(self, signum, _frame): + # This code prevents child processses from being killed unintentionally by proccesses forked from main process + if self._main_pid != os.getpid(): + sys.exit() + + self._terminate_all_running_processes() + + singal_name = {2: "SIGINT", 15: "SIGTERM"} + logger.warning(f"{singal_name[signum]} is sent. process exited.") + + sys.exit(1) + def _run_train(train_func: Callable, hp_config: Dict, report_func: Callable): # set multi process method as default @@ -155,16 +187,29 @@ def _run_train(train_func: Callable, hp_config: Dict, report_func: Callable): train_func(hp_config, report_func) -def _report_score(score: Union[int, float], progress: Union[int, float], pipe, trial_id: Any, done: bool = False): - logger.debug(f"score : {score}, progress : {progress}, trial_id : {trial_id}, pid : {os.getpid()}, done : {done}") +def _report_score( + score: Union[int, float], + progress: Union[int, float], + recv_queue: multiprocessing.Queue, + send_queue: multiprocessing.Queue, + uid: Any, + done: bool = False +): + logger.debug(f"score : {score}, progress : {progress}, uid : {uid}, pid : {os.getpid()}, done : {done}") try: - pipe.send({"score": score, "progress": progress, "trial_id": trial_id, "pid": os.getpid(), "done": done}) - except BrokenPipeError: + send_queue.put_nowait( + {"score": score, "progress": progress, "uid": uid, "pid": os.getpid(), "done": done} + ) + except ValueError: return TrialStatus.STOP + try: - trial_status = pipe.recv() - except EOFError: - return TrialStatus.STOP + trial_status = recv_queue.get(timeout=3) + except queue.Empty: + pass + + while not recv_queue.empty(): + trial_status = recv_queue.get_nowait() logger.debug(f"trial_status : {trial_status}") return trial_status From 620bcf6789967ee23e567e29a9657990e26e3543 Mon Sep 17 00:00:00 2001 From: eunwoosh Date: Mon, 22 May 2023 16:50:51 +0900 Subject: [PATCH 02/10] fix a bug that HPO in multi GPU doesn't work rightly --- otx/cli/utils/hpo.py | 6 ++ otx/hpo/hpo_base.py | 18 ++--- otx/hpo/hpo_runner.py | 5 +- otx/hpo/hyperband.py | 4 +- tests/unit/cli/utils/test_hpo.py | 121 +++++++++++++++++++++---------- tests/unit/hpo/test_hpo_base.py | 2 +- tests/unit/hpo/test_hyperband.py | 3 +- 7 files changed, 104 insertions(+), 55 deletions(-) diff --git a/otx/cli/utils/hpo.py b/otx/cli/utils/hpo.py index b98b4118617..1d280181fd8 100644 --- a/otx/cli/utils/hpo.py +++ b/otx/cli/utils/hpo.py @@ -679,6 +679,12 @@ def run(self): need_to_save_initial_weight = False resume_weight_path = self._get_resume_weight_path() if resume_weight_path is not None: + ret = re.search(r"(\d+)\.pth", resume_weight_path) + if ret is not None: + resume_epoch = int(ret.group(1)) + if self._epoch <= resume_epoch: # given epoch is already done + self._report_func(0, 0, done=True) + return environment.resume_model_weight(resume_weight_path, dataset) else: initial_weight = self._load_fixed_initial_weight() diff --git a/otx/hpo/hpo_base.py b/otx/hpo/hpo_base.py index 8e22eb2169f..dc03f5cb501 100644 --- a/otx/hpo/hpo_base.py +++ b/otx/hpo/hpo_base.py @@ -183,6 +183,7 @@ def __init__(self, trial_id: Any, configuration: Dict, train_environment: Option self._train_environment = train_environment self._iteration = None self.status: TrialStatus = TrialStatus.READY + self._done = False @property def id(self): @@ -204,6 +205,8 @@ def iteration(self, val): """Setter for iteration.""" check_positive(val, "iteration") self._iteration = val + if self.get_progress() < val: + self._done = False @property def train_environment(self): @@ -279,21 +282,16 @@ def save_results(self, save_path: str): json.dump(results, f) def finalize(self): - """Let the trial know that training is done. - - If the trial isn't trained until given resource, then make it pretend to train until resouce. - """ - if self.get_progress() < self.iteration: - best_score = self.get_best_score() - if best_score is None: - raise RuntimeError(f"Although {self.id} trial doesn't report any score but it's done") - self.register_score(best_score, self.iteration) + """Set done as True.""" + if not self.score: + raise RuntimeError(f"Trial{self.id} didn't report any score but tries to be done.") + self._done = True def is_done(self): """Check the trial is done.""" if self.iteration is None: raise ValueError("iteration isn't set yet.") - return self.get_progress() >= self.iteration + return self._done or self.get_progress() >= self.iteration class TrialStatus(IntEnum): diff --git a/otx/hpo/hpo_runner.py b/otx/hpo/hpo_runner.py index e949f11bf7e..0526ef32fc0 100644 --- a/otx/hpo/hpo_runner.py +++ b/otx/hpo/hpo_runner.py @@ -163,9 +163,10 @@ def _get_uid(self) -> int: def _terminate_all_running_processes(self): for trial in self._running_trials.values(): + trial.queue.close() process = trial.process if process.is_alive(): - logger.warning(f"Kill child process {process.pid}") + logger.info(f"Kill child process {process.pid}") process.kill() def _terminate_signal_handler(self, signum, _frame): @@ -206,7 +207,7 @@ def _report_score( try: trial_status = recv_queue.get(timeout=3) except queue.Empty: - pass + return TrialStatus.RUNNING while not recv_queue.empty(): trial_status = recv_queue.get_nowait() diff --git a/otx/hpo/hyperband.py b/otx/hpo/hyperband.py index d1aa96e10eb..eea8bac0f57 100644 --- a/otx/hpo/hyperband.py +++ b/otx/hpo/hyperband.py @@ -53,6 +53,7 @@ def __init__(self, trial_id: Any, configuration: Dict, train_environment: Option super().__init__(trial_id, configuration, train_environment) self._rung: Optional[int] = None self._bracket: Optional[int] = None + self.estimating_max_resource: bool = False @property def rung(self): @@ -708,6 +709,7 @@ def _make_trial_to_estimate_resource(self) -> AshaTrial: if len(self._trials) == 1: # first trial to estimate trial.bracket = 0 trial.iteration = self.num_full_iterations + trial.estimating_max_resource = True elif self._minimum_resource is not None: trial.iteration = self._minimum_resource else: @@ -917,7 +919,7 @@ def report_score( """ trial = self._trials[trial_id] if done: - if self.maximum_resource is None: + if self.maximum_resource is None and trial.estimating_max_resource: self.maximum_resource = trial.get_progress() self.num_full_iterations = self.maximum_resource if not self._need_to_find_resource_value(): diff --git a/tests/unit/cli/utils/test_hpo.py b/tests/unit/cli/utils/test_hpo.py index 80ea8061227..df2b81de8b7 100644 --- a/tests/unit/cli/utils/test_hpo.py +++ b/tests/unit/cli/utils/test_hpo.py @@ -476,6 +476,16 @@ def test_run_hpo_w_dataset_smaller_than_batch(self, mocker, cls_task_env): class TestTrainer: + @pytest.fixture(autouse=True) + def setup(self, tmp_dir): + self.weight_format = "epoch_{}.pth" + self.hpo_workdir = Path(tmp_dir) / "hpo_dir" + + @pytest.fixture + def tmp_dir(self): + with TemporaryDirectory() as tmp_dir: + yield tmp_dir + @e2e_pytest_unit def test_init(self, mocker, cls_template_path): Trainer( @@ -489,49 +499,82 @@ def test_init(self, mocker, cls_template_path): metric="fake", ) + @pytest.fixture + def mock_task(self, mocker, tmp_dir): + fake_project_path = Path(tmp_dir) / "fake_proejct" + fake_project_path.mkdir(parents=True) + for i in range(1, 5): + (fake_project_path / self.weight_format.format(i)).write_text("fake") + + mock_get_train_task = mocker.patch.object(TaskEnvironmentManager, "get_train_task") + mock_task = mocker.MagicMock() + mock_task.project_path = str(fake_project_path) + mock_get_train_task.return_value = mock_task + + return mock_task + @e2e_pytest_unit - def test_run(self, mocker, cls_template_path): - with TemporaryDirectory() as tmp_dir: - # prepare - trial_id = "1" - weight_format = "epoch_{}.pth" - hpo_workdir = Path(tmp_dir) / "hpo_dir" - fake_project_path = Path(tmp_dir) / "fake_proejct" - fake_project_path.mkdir(parents=True) - for i in range(1, 5): - (fake_project_path / weight_format.format(i)).write_text("fake") - - mock_get_train_task = mocker.patch.object(TaskEnvironmentManager, "get_train_task") - mock_task = mocker.MagicMock() - mock_task.project_path = str(fake_project_path) - mock_get_train_task.return_value = mock_task - - mock_report_func = mocker.MagicMock() - - mocker.patch("otx.cli.utils.hpo.get_dataset_adapter") - mocker.patch("otx.cli.utils.hpo.HpoDataset") - - # run - trainer = Trainer( - hp_config={"configuration": {"iterations": 10}, "id": trial_id}, - report_func=mock_report_func, - model_template=find_and_parse_model_template(cls_template_path), - data_roots=mocker.MagicMock(), - task_type=TaskType.CLASSIFICATION, - hpo_workdir=hpo_workdir, - initial_weight_name="fake", - metric="fake", - ) - trainer.run() - - # check - mock_report_func.assert_called_once_with(0, 0, done=True) # finilize report - assert hpo_workdir.exists() # make a directory to copy weight - for i in range(1, 5): # check model weights are copied - assert (hpo_workdir / "weight" / trial_id / weight_format.format(i)).exists() + def test_run(self, mocker, cls_template_path, mock_task, tmp_dir): + # prepare + trial_id = "1" + mock_report_func = mocker.MagicMock() + + mocker.patch("otx.cli.utils.hpo.get_dataset_adapter") + mocker.patch("otx.cli.utils.hpo.HpoDataset") + + # run + trainer = Trainer( + hp_config={"configuration": {"iterations": 10}, "id": trial_id}, + report_func=mock_report_func, + model_template=find_and_parse_model_template(cls_template_path), + data_roots=mocker.MagicMock(), + task_type=TaskType.CLASSIFICATION, + hpo_workdir=self.hpo_workdir, + initial_weight_name="fake", + metric="fake", + ) + trainer.run() + + # check + mock_report_func.assert_called_once_with(0, 0, done=True) # finilize report + assert self.hpo_workdir.exists() # make a directory to copy weight + for i in range(1, 5): # check model weights are copied + assert (self.hpo_workdir / "weight" / trial_id / self.weight_format.format(i)).exists() mock_task.train.assert_called() # check task.train() is called + @e2e_pytest_unit + def test_run_trial_already_done(self, mocker, cls_template_path, mock_task, tmp_dir): + """Test a case where trial to run already training given epoch.""" + # prepare + trial_id = "1" + epoch_to_run = 10 + weight_dir = self.hpo_workdir / "weight" / trial_id + # prepare a weight trained more than given epoch + weight_dir.mkdir(parents=True) + (weight_dir / self.weight_format.format(epoch_to_run+1)).touch() + mock_report_func = mocker.MagicMock() + + mocker.patch("otx.cli.utils.hpo.get_dataset_adapter") + mocker.patch("otx.cli.utils.hpo.HpoDataset") + + # run + trainer = Trainer( + hp_config={"configuration": {"iterations": epoch_to_run}, "id": trial_id}, + report_func=mock_report_func, + model_template=find_and_parse_model_template(cls_template_path), + data_roots=mocker.MagicMock(), + task_type=TaskType.CLASSIFICATION, + hpo_workdir=self.hpo_workdir, + initial_weight_name="fake", + metric="fake", + ) + trainer.run() + + # check + mock_report_func.assert_called_once_with(0, 0, done=True) # finilize report + mock_task.train.assert_not_called() # check task.train() is called + class TestHpoCallback: @e2e_pytest_unit diff --git a/tests/unit/hpo/test_hpo_base.py b/tests/unit/hpo/test_hpo_base.py index 2feadedecf5..38639e9b97d 100644 --- a/tests/unit/hpo/test_hpo_base.py +++ b/tests/unit/hpo/test_hpo_base.py @@ -139,7 +139,7 @@ def test_finalize(self, trial): trial.iteration = 10 trial.register_score(10, 5) trial.finalize() - assert trial.iteration == trial.get_progress() + assert trial.is_done() @e2e_pytest_component def test_finalize_without_registered_score(self, trial): diff --git a/tests/unit/hpo/test_hyperband.py b/tests/unit/hpo/test_hyperband.py index e395ac472c5..6fee35059d5 100644 --- a/tests/unit/hpo/test_hyperband.py +++ b/tests/unit/hpo/test_hyperband.py @@ -715,7 +715,7 @@ def test_report_score_trial_done(self, hyper_band): trial = hyper_band.get_next_sample() hyper_band.report_score(100, 0.1, trial.id) hyper_band.report_score(0, 0, trial.id, done=True) - assert trial.get_progress() == trial.iteration + assert trial.is_done() @e2e_pytest_component def test_get_best_config(self, hyper_band): @@ -1051,7 +1051,6 @@ def test_without_minimum_maximum_resource(self, good_hyperband_args, num_trial_t if hyper_band.report_score(score=1, resource=iter, trial_id=trial.id) == TrialStatus.STOP: break - first_trial = trials_to_estimate[0] hyper_band.report_score(score=1, resource=max_validation, trial_id=first_trial.id) hyper_band.report_score(score=0, resource=0, trial_id=first_trial.id, done=True) From eb8eac2edaaaa2072ee376426fee1e570fac98ff Mon Sep 17 00:00:00 2001 From: eunwoosh Date: Mon, 22 May 2023 16:57:22 +0900 Subject: [PATCH 03/10] add unit test --- tests/unit/hpo/test_hyperband.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/unit/hpo/test_hyperband.py b/tests/unit/hpo/test_hyperband.py index 6fee35059d5..649a488d281 100644 --- a/tests/unit/hpo/test_hyperband.py +++ b/tests/unit/hpo/test_hyperband.py @@ -932,6 +932,7 @@ def test_without_maximum_resource(self, good_hyperband_args, num_trial_to_estima hyper_band.report_score(score=0, resource=0, trial_id=first_trial.id, done=True) assert hyper_band.maximum_resource == max_validation + assert first_trial.estimating_max_resource @e2e_pytest_component @pytest.mark.parametrize("num_trial_to_estimate", [10, 30, 100]) @@ -961,6 +962,8 @@ def test_auto_config_decrease_without_maximum_resource(self, good_hyperband_args break first_trial = trials_to_estimate[0] + assert first_trial.estimating_max_resource + hyperband.report_score(score=1, resource=max_validation, trial_id=first_trial.id) hyperband.report_score(score=0, resource=0, trial_id=first_trial.id, done=True) @@ -1006,6 +1009,8 @@ def test_auto_config_increase_without_maximum_resource(self, good_hyperband_args break first_trial = trials_to_estimate[0] + assert first_trial.estimating_max_resource + hyperband.report_score(score=1, resource=max_validation, trial_id=first_trial.id) hyperband.report_score(score=0, resource=0, trial_id=first_trial.id, done=True) @@ -1066,6 +1071,7 @@ def test_without_minimum_maximum_resource(self, good_hyperband_args, num_trial_t expected_min = hyper_band.maximum_resource * (good_hyperband_args["reduction_factor"] ** -s_max) + assert first_trial.estimating_max_resource assert min(iter_set) == expected_min assert hyper_band.maximum_resource == max_validation From a2d03fe4191dc45a689c8436b1b3a9d5017a5f59 Mon Sep 17 00:00:00 2001 From: eunwoosh Date: Mon, 22 May 2023 18:00:27 +0900 Subject: [PATCH 04/10] remove unused hpo weight --- otx/cli/utils/hpo.py | 8 ++++++++ tests/unit/cli/utils/test_hpo.py | 6 ++++++ 2 files changed, 14 insertions(+) diff --git a/otx/cli/utils/hpo.py b/otx/cli/utils/hpo.py index 1d280181fd8..c5e7700eeef 100644 --- a/otx/cli/utils/hpo.py +++ b/otx/cli/utils/hpo.py @@ -582,9 +582,17 @@ def run_hpo( logger.debug(f"{best_hpo_weight} will be loaded as best HPO weight") env_manager.load_model_weight(best_hpo_weight, dataset) + _remove_unused_model_weights(hpo_save_path, best_hpo_weight) return env_manager.environment +def _remove_unused_model_weights(hpo_save_path: Path, best_hpo_weight: Optional[str] = None): + for weight in hpo_save_path.rglob("*.pth"): + if best_hpo_weight is not None and str(weight) == best_hpo_weight: + continue + weight.unlink() + + def get_best_hpo_weight(hpo_dir: Union[str, Path], trial_id: Union[str, Path]) -> Optional[str]: """Get best model weight path of the HPO trial. diff --git a/tests/unit/cli/utils/test_hpo.py b/tests/unit/cli/utils/test_hpo.py index df2b81de8b7..afb0c52452b 100644 --- a/tests/unit/cli/utils/test_hpo.py +++ b/tests/unit/cli/utils/test_hpo.py @@ -660,6 +660,11 @@ def test_run_hpo(mocker, mock_environment): output = Path(tmp_dir) / "fake" mock_get_best_hpo_weight = mocker.patch("otx.cli.utils.hpo.get_best_hpo_weight") mock_get_best_hpo_weight.return_value = "mock_best_weight_path" + hpo_weight_dir = output / "hpo" / "weight" + for i in range(3): + trial_weight_dir = hpo_weight_dir / str(i) + trial_weight_dir.mkdir(parents=True) + (trial_weight_dir / "fake.pth").touch() def mock_run_hpo(*args, **kwargs): return {"config": {"a.b": 1, "c.d": 2}, "id": "1"} @@ -686,6 +691,7 @@ def mock_read_model(args1, path, arg2): assert env_hp.a.b == 1 assert env_hp.c.d == 2 assert environment.model == "mock_best_weight_path" # check that best model weight is used + assert not list(hpo_weight_dir.rglob("*.pth")) # check unused weight is removed @e2e_pytest_unit From eb62c07267b4d7094adf7e273e7c02160b8c53e8 Mon Sep 17 00:00:00 2001 From: eunwoosh Date: Tue, 23 May 2023 10:21:55 +0900 Subject: [PATCH 05/10] add try except statement --- otx/hpo/hpo_runner.py | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/otx/hpo/hpo_runner.py b/otx/hpo/hpo_runner.py index 0526ef32fc0..0f16da7956a 100644 --- a/otx/hpo/hpo_runner.py +++ b/otx/hpo/hpo_runner.py @@ -82,18 +82,22 @@ def __init__( def run(self): """Run a HPO loop.""" logger.info("HPO loop starts.") - while not self._hpo_algo.is_done(): - if self._resource_manager.have_available_resource(): - trial = self._hpo_algo.get_next_sample() - if trial is not None: - self._start_trial_process(trial) - - self._remove_finished_process() - self._get_reports() - - time.sleep(1) - + try: + while not self._hpo_algo.is_done(): + if self._resource_manager.have_available_resource(): + trial = self._hpo_algo.get_next_sample() + if trial is not None: + self._start_trial_process(trial) + + self._remove_finished_process() + self._get_reports() + + time.sleep(1) + except Exception as e: + self._terminate_all_running_processes() + raise e logger.info("HPO loop is done.") + self._get_reports() self._join_all_processes() From 1685115cfa39b2e47e428dcdc1f63a079ef1b69c Mon Sep 17 00:00:00 2001 From: eunwoosh Date: Tue, 23 May 2023 10:50:00 +0900 Subject: [PATCH 06/10] stop HPO if trials failed more than tree times --- otx/cli/utils/hpo.py | 24 ++++++++++++++---------- otx/hpo/hpo_runner.py | 9 ++++++++- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/otx/cli/utils/hpo.py b/otx/cli/utils/hpo.py index c5e7700eeef..f9c5e72c30f 100644 --- a/otx/cli/utils/hpo.py +++ b/otx/cli/utils/hpo.py @@ -447,7 +447,7 @@ def _align_batch_size_search_space_to_dataset_size(self): self._fixed_hp[batch_size_name] = self._train_dataset_size self._environment.set_hyper_parameter_using_str_key(self._fixed_hp) - def run_hpo(self, train_func: Callable, data_roots: Dict[str, Dict]) -> Dict[str, Any]: + def run_hpo(self, train_func: Callable, data_roots: Dict[str, Dict]) -> Union[Dict[str, Any], None]: """Run HPO and provides optimized hyper parameters. Args: @@ -455,7 +455,7 @@ def run_hpo(self, train_func: Callable, data_roots: Dict[str, Dict]) -> Dict[str data_roots (Dict[str, Dict]): dataset path of each dataset type Returns: - Dict[str, Any]: optimized hyper parameters + Union[Dict[str, Any], None]: Optimized hyper parameters. If there is no best hyper parameter, return None. """ self._environment.save_initial_weight(self._get_initial_model_weight_path()) hpo_algo = self._get_hpo_algo() @@ -474,7 +474,8 @@ def run_hpo(self, train_func: Callable, data_roots: Dict[str, Dict]) -> Dict[str resource_type, # type: ignore ) best_config = hpo_algo.get_best_config() - self._restore_fixed_hp(best_config["config"]) + if best_config is not None: + self._restore_fixed_hp(best_config["config"]) hpo_algo.print_result() return best_config @@ -574,13 +575,16 @@ def run_hpo( logger.info("completed hyper-parameter optimization") env_manager = TaskEnvironmentManager(environment) - env_manager.set_hyper_parameter_using_str_key(best_config["config"]) - best_hpo_weight = get_best_hpo_weight(hpo_save_path, best_config["id"]) - if best_hpo_weight is None: - logger.warning("Can not find the best HPO weight. Best HPO wegiht won't be used.") - else: - logger.debug(f"{best_hpo_weight} will be loaded as best HPO weight") - env_manager.load_model_weight(best_hpo_weight, dataset) + best_hpo_weight = None + + if best_config is not None: + env_manager.set_hyper_parameter_using_str_key(best_config["config"]) + best_hpo_weight = get_best_hpo_weight(hpo_save_path, best_config["id"]) + if best_hpo_weight is None: + logger.warning("Can not find the best HPO weight. Best HPO wegiht won't be used.") + else: + logger.debug(f"{best_hpo_weight} will be loaded as best HPO weight") + env_manager.load_model_weight(best_hpo_weight, dataset) _remove_unused_model_weights(hpo_save_path, best_hpo_weight) return env_manager.environment diff --git a/otx/hpo/hpo_runner.py b/otx/hpo/hpo_runner.py index 0f16da7956a..c031eda6a90 100644 --- a/otx/hpo/hpo_runner.py +++ b/otx/hpo/hpo_runner.py @@ -71,6 +71,7 @@ def __init__( self._mp = multiprocessing.get_context("spawn") self._report_queue = self._mp.Queue() self._uid_index = 0 + self._trial_fault_count = 0 self._resource_manager = get_resource_manager( resource_type, num_parallel_trial, num_gpu_for_single_trial, available_gpu ) @@ -83,7 +84,7 @@ def run(self): """Run a HPO loop.""" logger.info("HPO loop starts.") try: - while not self._hpo_algo.is_done(): + while not self._hpo_algo.is_done() and self._trial_fault_count < 3: if self._resource_manager.have_available_resource(): trial = self._hpo_algo.get_next_sample() if trial is not None: @@ -98,6 +99,9 @@ def run(self): raise e logger.info("HPO loop is done.") + if self._trial_fault_count >= 3: + logger.warning("HPO trials exited abnormally more than three times. HPO is suspended.") + self._get_reports() self._join_all_processes() @@ -131,6 +135,8 @@ def _remove_finished_process(self): trial_to_remove = [] for uid, trial in self._running_trials.items(): if not trial.process.is_alive(): + if trial.process.exitcode != 0: + self._trial_fault_count += 1 trial.queue.close() trial.process.join() trial_to_remove.append(uid) @@ -188,6 +194,7 @@ def _terminate_signal_handler(self, signum, _frame): def _run_train(train_func: Callable, hp_config: Dict, report_func: Callable): # set multi process method as default + raise RuntimeError multiprocessing.set_start_method(None, True) # type: ignore train_func(hp_config, report_func) From f1bc2235defe0935a3b7e9649caf5141e570fcc3 Mon Sep 17 00:00:00 2001 From: eunwoosh Date: Tue, 23 May 2023 10:50:49 +0900 Subject: [PATCH 07/10] align with pre-commit --- otx/hpo/hpo_runner.py | 15 +++++++-------- tests/unit/cli/utils/test_hpo.py | 2 +- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/otx/hpo/hpo_runner.py b/otx/hpo/hpo_runner.py index c031eda6a90..92a358142b0 100644 --- a/otx/hpo/hpo_runner.py +++ b/otx/hpo/hpo_runner.py @@ -18,13 +18,13 @@ import multiprocessing import os import queue -import time -import sys import signal +import sys +import time from copy import deepcopy +from dataclasses import dataclass from functools import partial from typing import Any, Callable, Dict, Literal, Optional, Union -from dataclasses import dataclass from otx.hpo.hpo_base import HpoBase, Trial, TrialStatus from otx.hpo.resource_manager import get_resource_manager @@ -34,7 +34,8 @@ @dataclass class RunningTrial: - """Data class for a running trial""" + """Data class for a running trial.""" + process: multiprocessing.Process trial: Trial queue: multiprocessing.Queue @@ -205,13 +206,11 @@ def _report_score( recv_queue: multiprocessing.Queue, send_queue: multiprocessing.Queue, uid: Any, - done: bool = False + done: bool = False, ): logger.debug(f"score : {score}, progress : {progress}, uid : {uid}, pid : {os.getpid()}, done : {done}") try: - send_queue.put_nowait( - {"score": score, "progress": progress, "uid": uid, "pid": os.getpid(), "done": done} - ) + send_queue.put_nowait({"score": score, "progress": progress, "uid": uid, "pid": os.getpid(), "done": done}) except ValueError: return TrialStatus.STOP diff --git a/tests/unit/cli/utils/test_hpo.py b/tests/unit/cli/utils/test_hpo.py index afb0c52452b..55846bfdbad 100644 --- a/tests/unit/cli/utils/test_hpo.py +++ b/tests/unit/cli/utils/test_hpo.py @@ -552,7 +552,7 @@ def test_run_trial_already_done(self, mocker, cls_template_path, mock_task, tmp_ weight_dir = self.hpo_workdir / "weight" / trial_id # prepare a weight trained more than given epoch weight_dir.mkdir(parents=True) - (weight_dir / self.weight_format.format(epoch_to_run+1)).touch() + (weight_dir / self.weight_format.format(epoch_to_run + 1)).touch() mock_report_func = mocker.MagicMock() mocker.patch("otx.cli.utils.hpo.get_dataset_adapter") From 2fd3b8d5ff9e420713c1831725593f16a93cf5b3 Mon Sep 17 00:00:00 2001 From: eunwoosh Date: Tue, 23 May 2023 10:55:33 +0900 Subject: [PATCH 08/10] fix mypy issue --- otx/hpo/hpo_runner.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/otx/hpo/hpo_runner.py b/otx/hpo/hpo_runner.py index 92a358142b0..27565329625 100644 --- a/otx/hpo/hpo_runner.py +++ b/otx/hpo/hpo_runner.py @@ -129,7 +129,7 @@ def _start_trial_process(self, trial: Trial): ), ) os.environ = origin_env - self._running_trials[uid] = RunningTrial(process, trial, trial_queue) + self._running_trials[uid] = RunningTrial(process, trial, trial_queue) # type: ignore process.start() def _remove_finished_process(self): @@ -195,7 +195,6 @@ def _terminate_signal_handler(self, signum, _frame): def _run_train(train_func: Callable, hp_config: Dict, report_func: Callable): # set multi process method as default - raise RuntimeError multiprocessing.set_start_method(None, True) # type: ignore train_func(hp_config, report_func) From 06790e887fe260d3ef76d971eac3b75f03af15d9 Mon Sep 17 00:00:00 2001 From: eunwoosh Date: Tue, 23 May 2023 11:10:33 +0900 Subject: [PATCH 09/10] update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f87748dcb61..e1bdfc06af3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,6 +68,7 @@ All notable changes to this project will be documented in this file. - Action task refactoring () - Optimize data preprocessing time and enhance overall performance in semantic segmentation () - Support automatic batch size decrease when there is no enough GPU memory () +- Refine HPO usability () ### Bug fixes From e755fb34b2cb7a33e61ccb692422b1d1a885af15 Mon Sep 17 00:00:00 2001 From: eunwoosh Date: Tue, 23 May 2023 14:06:30 +0900 Subject: [PATCH 10/10] align with mypy --- otx/api/configuration/helper/convert.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/otx/api/configuration/helper/convert.py b/otx/api/configuration/helper/convert.py index 1c8a84ed8eb..b6f94e9ae7c 100644 --- a/otx/api/configuration/helper/convert.py +++ b/otx/api/configuration/helper/convert.py @@ -131,7 +131,7 @@ def convert( if target == str: result = yaml.dump(config_dict) elif target == dict: - result = config_dict + result = config_dict # type: ignore elif target == DictConfig: result = OmegaConf.create(config_dict) else: