From f1cc9c847573b73e42144712613375d3376faa14 Mon Sep 17 00:00:00 2001 From: Steffen Vogler Date: Thu, 7 Sep 2023 19:28:15 +0000 Subject: [PATCH] HPO logger handler --- examples/mnist/mnist_vertex_hpo_logger.py | 194 ++++++++++++++++++++++ ignite/contrib/handlers/hpo_logger.py | 137 +++++++++++++++ 2 files changed, 331 insertions(+) create mode 100644 examples/mnist/mnist_vertex_hpo_logger.py create mode 100644 ignite/contrib/handlers/hpo_logger.py diff --git a/examples/mnist/mnist_vertex_hpo_logger.py b/examples/mnist/mnist_vertex_hpo_logger.py new file mode 100644 index 00000000000..6a1e2186e84 --- /dev/null +++ b/examples/mnist/mnist_vertex_hpo_logger.py @@ -0,0 +1,194 @@ +""" +MNIST example that uses logger which can communicate back to the hyperparameter optimization +service in Vertex AI (GCP). + +Ref: https://cloud.google.com/vertex-ai/docs/training/using-hyperparameter-tuning + +Requirements: + Since Vertex tuning job is a serverless HPO orchestration service, the training script needs + to be (1) dockerized and (2) accept input relevant arguments for hyperparameters (via argparse). + Otherwise the communication between the outside of the container (i.e. Vertex GCP platform) and + the inside of the container (your script with current loss and accuracy values) is not possible. + Assuming this very script is dockerized and is called as ENTRYPOINT from the Dockerfile, only a HPO + config file is needed. The user can specify # of parallel runs, target metric, tunable hyperparameters, + search algorithm and hardware spec. + + exemplary Dockerfile: + ``` + FROM pytorch/pytorch:1.11.0-cuda11.3-cudnn8-runtime + COPY mnist/ /mnist/ + + RUN pip install pytorch-ignite pyyaml torchvision pynvml + + ENTRYPOINT ["python", "/mnist/mnist_vertex_hpo_logger.py"] + ``` + + see this example of `config_HPO.yaml`: + ``` + studySpec: + metrics: + - metricId: nll + goal: MINIMIZE + parameters: + - parameterId: lr + scaleType: UNIT_LINEAR_SCALE + doubleValueSpec: + minValue: 0.000001 + maxValue: 0.001 + - parameterId: hp-tune + categoricalValueSpec: + values: + - "y" + algorithm: RANDOM_SEARCH + trialJobSpec: + workerPoolSpecs: + - machineSpec: + machineType: n1-standard-8 + acceleratorType: NVIDIA_TESLA_T4 + acceleratorCount: 1 + replicaCount: 1 + containerSpec: + imageUri: gcr.io/: + ``` + + Usage: + + submit this script as a Vertex tuning job via: + ```bash + gcloud alpha ai hp-tuning-jobs create --display-name=my_tuning_job \ + --project=$PROJECT --region=$REGION --config=config_HPO.yaml \ + --max-trial-count=12 --parallel-trial-count=4 + ``` + + for testing run this scrip on a regular machine and check whether `/tmp/hypertune/output.metrics` has been created + ```bash + python mnist_vertex_hpo_logger.py + ``` +""" + +from argparse import ArgumentParser + +import torch +import torch.nn.functional as F +from torch import nn +from torch.optim import SGD +from torch.utils.data import DataLoader +from torchvision.datasets import MNIST +from torchvision.transforms import Compose, Normalize, ToTensor +from tqdm import tqdm + +from ignite.contrib.handlers.hpo_logger import HPOLogger + +from ignite.engine import create_supervised_evaluator, create_supervised_trainer, Events +from ignite.metrics import Accuracy, Loss +from ignite.utils import setup_logger + + +class Net(nn.Module): + def __init__(self): + super(Net, self).__init__() + self.conv1 = nn.Conv2d(1, 10, kernel_size=5) + self.conv2 = nn.Conv2d(10, 20, kernel_size=5) + self.conv2_drop = nn.Dropout2d() + self.fc1 = nn.Linear(320, 50) + self.fc2 = nn.Linear(50, 10) + + def forward(self, x): + x = F.relu(F.max_pool2d(self.conv1(x), 2)) + x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2)) + x = x.view(-1, 320) + x = F.relu(self.fc1(x)) + x = F.dropout(x, training=self.training) + x = self.fc2(x) + return F.log_softmax(x, dim=-1) + + +def get_data_loaders(train_batch_size, val_batch_size): + data_transform = Compose([ToTensor(), Normalize((0.1307,), (0.3081,))]) + + train_loader = DataLoader( + MNIST(download=True, root=".", transform=data_transform, train=True), batch_size=train_batch_size, shuffle=True + ) + + val_loader = DataLoader( + MNIST(download=False, root=".", transform=data_transform, train=False), batch_size=val_batch_size, shuffle=False + ) + return train_loader, val_loader + + +def run(train_batch_size, val_batch_size, epochs, lr, momentum, log_interval): + train_loader, val_loader = get_data_loaders(train_batch_size, val_batch_size) + model = Net() + device = "cpu" + + if torch.cuda.is_available(): + device = "cuda" + + model.to(device) # Move model before creating optimizer + optimizer = SGD(model.parameters(), lr=lr, momentum=momentum) + criterion = nn.NLLLoss() + trainer = create_supervised_trainer(model, optimizer, criterion, device=device) + trainer.logger = setup_logger("trainer") + + val_metrics = {"accuracy": Accuracy(), "nll": Loss(criterion)} + evaluator = create_supervised_evaluator(model, metrics=val_metrics, device=device) + evaluator.logger = setup_logger("evaluator") + + pbar = tqdm(initial=0, leave=False, total=len(train_loader), desc=f"ITERATION - loss: {0:.2f}") + + @trainer.on(Events.ITERATION_COMPLETED(every=log_interval)) + def log_training_loss(engine): + pbar.desc = f"ITERATION - loss: {engine.state.output:.2f}" + pbar.update(log_interval) + + @trainer.on(Events.EPOCH_COMPLETED) + def log_training_results(engine): + pbar.refresh() + evaluator.run(train_loader) + metrics = evaluator.state.metrics + avg_accuracy = metrics["accuracy"] + avg_nll = metrics["nll"] + tqdm.write( + f"Training Results - Epoch: {engine.state.epoch} Avg accuracy: {avg_accuracy:.2f} Avg loss: {avg_nll:.2f}" + ) + + @trainer.on(Events.EPOCH_COMPLETED) + def log_validation_results(engine): + evaluator.run(val_loader) + metrics = evaluator.state.metrics + avg_accuracy = metrics["accuracy"] + avg_nll = metrics["nll"] + tqdm.write( + f"Validation Results - Epoch: {engine.state.epoch} Avg accuracy: {avg_accuracy:.2f} Avg loss: {avg_nll:.2f}" + ) + + pbar.n = pbar.last_print_n = 0 + + @trainer.on(Events.EPOCH_COMPLETED | Events.COMPLETED) + def log_time(engine): + tqdm.write(f"{trainer.last_event_name.name} took { trainer.state.times[trainer.last_event_name.name]} seconds") + + # instantiate Vertex HPO logger class + hpo_logger = HPOLogger(evaluator=evaluator, metric_tag="nll") + trainer.add_event_handler(Events.EPOCH_COMPLETED, hpo_logger) + + trainer.run(train_loader, max_epochs=epochs) + pbar.close() + + +if __name__ == "__main__": + parser = ArgumentParser() + parser.add_argument("--batch_size", type=int, default=64, help="input batch size for training (default: 64)") + parser.add_argument( + "--val_batch_size", type=int, default=1000, help="input batch size for validation (default: 1000)" + ) + parser.add_argument("--epochs", type=int, default=10, help="number of epochs to train (default: 10)") + parser.add_argument("--lr", type=float, default=0.01, help="learning rate (default: 0.01)") + parser.add_argument("--momentum", type=float, default=0.5, help="SGD momentum (default: 0.5)") + parser.add_argument( + "--log_interval", type=int, default=10, help="how many batches to wait before logging training status" + ) + + args = parser.parse_args() + + run(args.batch_size, args.val_batch_size, args.epochs, args.lr, args.momentum, args.log_interval) diff --git a/ignite/contrib/handlers/hpo_logger.py b/ignite/contrib/handlers/hpo_logger.py new file mode 100644 index 00000000000..c4840277355 --- /dev/null +++ b/ignite/contrib/handlers/hpo_logger.py @@ -0,0 +1,137 @@ +from __future__ import annotations + +import collections +from collections import defaultdict, UserString +from collections.abc import Mapping, Sequence +from enum import Enum +import json +import os +from threading import RLock +import time +from typing import Any + +from ignite.engine import Engine, Events + +_DEFAULT_METRIC_PATH = '/tmp/hypertune/output.metrics' + +_MAX_NUM_METRIC_ENTRIES_TO_PRESERVE = 100 + +def _get_loss_from_output(output: Sequence[Mapping[str, Any]]) -> Any: + return output[0]["loss"] + +class MetricLoggerKeys(Enum): + METRICS = "Metrics" + LOSS = "Loss" + + +class HPOLogger(object): + """ + Makes selected metric accessible for use by GCP Vertex AI hyperparameter tuning jobs. By adding only this + single HPO-logger it unlocks hyperparam screening for Ignite + + Example:: + # construct an evaluator saving metric values + val_metrics = {"accuracy": Accuracy(), "nll": Loss(criterion)} + evaluator = create_supervised_evaluator(model, metrics=val_metrics, device=device) + evaluator.logger = setup_logger("evaluator") + + # construct the logger and associate with evaluator to extract metric values from + hpo_logger = HPOLogger(evaluator = evaluator, metric_tag= 'nll') + + # construct the trainer with the logger passed in as a handler so that it logs loss values + trainer.add_event_handler(Events.EPOCH_COMPLETED, hpo_logger) + + # run training + trainer.run(train_loader, max_epochs=epochs) + + Args: + evaluator: Evaluator to consume metric results from at the end of its evaluation run + metric_tag: Converts the metric value coming from the trainer/evaluator's state into a storable value + """ + + def __init__( + self, + evaluator: Engine | None = None, + metric_tag: UserString = 'training/hptuning/metric' + ) -> None: + self.loss: list = [] + self.metrics: defaultdict = defaultdict(list) + self.iteration = 0 + self.lock = RLock() + self.metric_tag = metric_tag + + if evaluator is not None: + self.attach_evaluator(evaluator) + + """Constructor.""" + self.metric_path = os.environ.get('CLOUD_ML_HP_METRIC_FILE', + _DEFAULT_METRIC_PATH) + if not os.path.exists(os.path.dirname(self.metric_path)): + os.makedirs(os.path.dirname(self.metric_path)) + + self.trial_id = os.environ.get('CLOUD_ML_TRIAL_ID', 0) + self.metrics_queue = collections.deque( + maxlen=_MAX_NUM_METRIC_ENTRIES_TO_PRESERVE) + + def _dump_metrics_to_file(self): + with open(self.metric_path, 'w') as metric_file: + for metric in self.metrics_queue: + metric_file.write(json.dumps(metric, sort_keys=True) + '\n') + + def attach(self, engine: Engine) -> None: + """ + Args: + engine: Ignite Engine, it can be a trainer, validator or evaluator. + """ + engine.add_event_handler(Events.ITERATION_COMPLETED, self) + + def attach_evaluator(self, evaluator: Engine) -> None: + """ + Attach event handlers to the given evaluator to log metric values from it. + + Args: + evaluator: Ignite Engine implementing network evaluation + """ + evaluator.add_event_handler(Events.EPOCH_COMPLETED, self.log_metrics) + + def __call__(self, engine: Engine) -> None: + """ + Args: + engine: Ignite Engine, it can be a trainer, validator or evaluator. + """ + with self.lock: + self.iteration = engine.state.iteration + lossval = engine.state.output + + metric_value = float(lossval) + + metric_body = { + 'timestamp': time.time(), + 'trial': str(self.trial_id), + self.metric_tag: str(metric_value), + 'global_step': str(int(self.iteration) if self.iteration else 0), + 'checkpoint_path': '' + } + self.metrics_queue.append(metric_body) + self._dump_metrics_to_file() + + def log_metrics(self, engine: Engine) -> None: + """ + Log metrics from the given Engine's state member. + + Args: + engine: Ignite Engine to log from + """ + with self.lock: + for m, v in engine.state.metrics.items(): + self.metrics[m].append((self.iteration, v)) + + def state_dict(self): + return {MetricLoggerKeys.LOSS: self.loss, MetricLoggerKeys.METRICS: self.metrics} + + def load_state_dict(self, state_dict): + self.loss[:] = state_dict[MetricLoggerKeys.LOSS] + self.metrics.clear() + self.metrics.update(state_dict[MetricLoggerKeys.METRICS]) + +hpologger = HPOLogger() \ No newline at end of file