Skip to content

Commit

Permalink
✨ Create an MlflowMetricDataSet to simplify metric logging (#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
Galileo-Galilei committed Aug 29, 2021
1 parent d626c8e commit 1eecda2
Show file tree
Hide file tree
Showing 10 changed files with 517 additions and 15 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased]

### Added

- :sparkles: Create an ``MlflowMetricDataSet`` to simplify the existing metric API. It enables logging a single float as a metric, eventually automatically increasing the "step" if the metric is going to be updated during time ([#73](https://github.com/Galileo-Galilei/kedro-mlflow/issues/73))

### Fixed

- :bug: Dictionnary parameters with integer keys are now properly logged in mlflow when ``flatten_dict_params`` is set to ``True`` in the ``mlflow.yml`` instead of raising a ``TypeError`` ([#224](https://github.com/Galileo-Galilei/kedro-mlflow/discussions/224))
Expand Down
62 changes: 60 additions & 2 deletions docs/source/04_experimentation_tracking/05_version_metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,67 @@ MLflow defines a metric as "a (key, value) pair, where the value is numeric". Ea

## How to version metrics in a kedro project?

`kedro-mlflow` introduces a new ``AbstractDataSet`` called ``MlflowMetricsDataSet``. It is a wrapper around a dictionary with metrics which is returned by node and log metrics in MLflow.
`kedro-mlflow` introduces 2 new ``AbstractDataSet``:
- ``MlflowMetricDataSet`` which can log a float as a metric
- ``MlflowMetricsDataSet``. The first one It is a wrapper around a dictionary with metrics which is returned by node and log metrics in MLflow.

Since it is an ``AbstractDataSet``, it can be used with the YAML API. You can define it as:
### Saving a single float as a metric with ``MlflowMetricDataSet``

The ``MlflowMetricDataSet`` is an ``AbstractDataSet`` which enable to save or load a ``float`` as a mlflow metric. You must specify the ``key`` (i.e. the name to display in mlflow) when creating the dataset. Somes examples follow:

- The most basic usage is to create the dataset and save a a value:

```python
from kedro_mlflow.io.metrics import MlflowMetricDataSet

metric_ds=MlflowMetricDataSet(key="my_metric")
with mlflow.start_run():
metric_ds.save(0.3) # create a "my_metric=0.3" value in the "metric" field in mlflow UI
```

**Beware: Unlike mlflow default behaviour, if there is no active run, no run is created.**

- You can also specify a ``run_id`` instead of logging in the active run:

```python
from kedro_mlflow.io.metrics import MlflowMetricDataSet

metric_ds=MlflowMetricDataSet(key="my_metric", run_id="123456789")
with mlflow.start_run():
metric_ds.save(0.3) # create a "my_metric=0.3" value in the "metric" field of the run 123456789
```

It is also possible to pass ``load_args`` and ``save_args`` to control which step should be logged (in case you have logged several step for the same metric.) ``save_args`` accepts a ``mode`` key which can be set to ``overwrite`` (mlflow default) or ``append``. In append mode, if no step is specified, saving the metric will "bump" the last existing step to create a linear history. **This is very useful if you have a monitoring pipeline which calculates a metric frequently to check the performance of a deployed model.**

```python
from kedro_mlflow.io.metrics import MlflowMetricDataSet

metric_ds=MlflowMetricDataSet(key="my_metric", load_args={"step": 1}, save_args={"mode": "append"})

with mlflow.start_run():
metric_ds.save(0) # step 0 stored for "my_metric"
metric_ds.save(0.1) # step 1 stored for "my_metric"
metric_ds.save(0.2) # step 2 stored for "my_metric"

my_metric=metric_ds.load() # value=0.1 (step number 1)
```

Since it is an ``AbstractDataSet``, it can be used with the YAML API in your ``catalog.yml``, e.g. :

```yaml
my_model_metric:
type: kedro_mlflow.io.metrics.MlflowMetricDataSet
run_id: 123456 # OPTIONAL, you should likely let it empty to log in the current run
key: my_awesome_name # OPTIONAL: if not provided, the dataset name will be sued (here "my_model_metric")
load_args:
step: ... # OPTIONAL: likely not provided, unless you have a very good reason to do so
save_args:
step: ... # OPTIONAL: likely not provided, unless you have a very good reason to do so
mode: append # OPTIONAL: likely better than the default "overwrite". Will be ignored if "step" is provided.
```
### Saving several metrics with their entire history with ``MlflowMetricDataSet``
Since it is an ``AbstractDataSet``, it can be used with the YAML API. You can define it in your ``catalog.yml`` as:
```yaml
my_model_metrics:
Expand Down
17 changes: 16 additions & 1 deletion kedro_mlflow/framework/hooks/pipeline_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from kedro_mlflow.framework.context import get_mlflow_config
from kedro_mlflow.framework.hooks.utils import _assert_mlflow_enabled
from kedro_mlflow.io.catalog.switch_catalog_logging import switch_catalog_logging
from kedro_mlflow.io.metrics import MlflowMetricsDataSet
from kedro_mlflow.io.metrics import MlflowMetricDataSet, MlflowMetricsDataSet
from kedro_mlflow.mlflow import KedroPipelineModel
from kedro_mlflow.pipeline.pipeline_ml import PipelineML
from kedro_mlflow.utils import _parse_requirements
Expand Down Expand Up @@ -47,6 +47,21 @@ def after_catalog_created(
else:
catalog._data_sets[name] = MlflowMetricsDataSet(prefix=name)

if isinstance(dataset, MlflowMetricDataSet) and dataset.key is None:
if dataset._run_id is not None:
catalog._data_sets[name] = MlflowMetricDataSet(
run_id=dataset._run_id,
key=name,
load_args=dataset._load_args,
save_args=dataset._save_args,
)
else:
catalog._data_sets[name] = MlflowMetricDataSet(
key=name,
load_args=dataset._load_args,
save_args=dataset._save_args,
)

@hook_impl
def before_pipeline_run(
self, run_params: Dict[str, Any], pipeline: Pipeline, catalog: DataCatalog
Expand Down
1 change: 1 addition & 0 deletions kedro_mlflow/io/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .mlflow_metric_dataset import MlflowMetricDataSet
from .mlflow_metrics_dataset import MlflowMetricsDataSet
91 changes: 91 additions & 0 deletions kedro_mlflow/io/metrics/mlflow_abstract_metric_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from typing import Any, Dict, Union

import mlflow
from kedro.io import AbstractDataSet
from mlflow.tracking import MlflowClient


class MlflowAbstractMetricDataSet(AbstractDataSet):
def __init__(
self,
run_id: str = None,
key: str = None,
load_args: Dict[str, Any] = None,
save_args: Dict[str, Any] = None,
):
"""Initialise MlflowMetricsDataSet.
Args:
run_id (str): The ID of the mlflow run where the metric should be logged
"""

self.key = key
self.run_id = run_id
self._load_args = load_args or {}
self._save_args = save_args or {}
self._logging_activated = True # by default, logging is activated!

@property
def run_id(self) -> Union[str, None]:
"""Get run id."""

run = mlflow.active_run()
if (self._run_id is None) and (run is not None):
# if no run_id is specified, we try to retrieve the current run
# this is useful because during a kedro run, we want to be able to retrieve
# the metric from the active run to be able to reload a metric
# without specifying the (unknown) run id
return run.info.run_id

# else we return the _run_id which can eventually be None.
# In this case, saving will work (a new run will be created)
# but loading will fail,
# according to mlflow's behaviour
return self._run_id

@run_id.setter
def run_id(self, run_id: str):
self._run_id = run_id

# we want to be able to turn logging off for an entire pipeline run
# To avoid that a single call to a dataset in the catalog creates a new run automatically
# we want to be able to turn everything off
@property
def _logging_activated(self):
return self.__logging_activated

@_logging_activated.setter
def _logging_activated(self, flag):
if not isinstance(flag, bool):
raise ValueError(f"_logging_activated must be a boolean, got {type(flag)}")
self.__logging_activated = flag

def _validate_run_id(self):
if self.run_id is None:
raise ValueError(
"You must either specify a run_id or have a mlflow active run opened. Use mlflow.start_run() if necessary."
)

def _exists(self) -> bool:
"""Check if the metric exists in remote mlflow storage exists.
Returns:
bool: Does the metric name exist in the given run_id?
"""
mlflow_client = MlflowClient()
run_id = self.run_id # will get the active run if nothing is specified
run = mlflow_client.get_run(run_id) if run_id else mlflow.active_run()

flag_exist = self.key in run.data.metrics.keys() if run else False
return flag_exist

def _describe(self) -> Dict[str, Any]:
"""Describe MLflow metrics dataset.
Returns:
Dict[str, Any]: Dictionary with MLflow metrics dataset description.
"""
return {
"key": self.key,
"run_id": self.run_id,
}
98 changes: 98 additions & 0 deletions kedro_mlflow/io/metrics/mlflow_metric_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from copy import deepcopy
from typing import Any, Dict

from mlflow.tracking import MlflowClient

from kedro_mlflow.io.metrics.mlflow_abstract_metric_dataset import (
MlflowAbstractMetricDataSet,
)


class MlflowMetricDataSet(MlflowAbstractMetricDataSet):
SUPPORTED_SAVE_MODES = {"overwrite", "append"}
DEFAULT_SAVE_MODE = "overwrite"

def __init__(
self,
run_id: str = None,
key: str = None,
load_args: Dict[str, Any] = None,
save_args: Dict[str, Any] = None,
):
"""Initialise MlflowMetricDataSet.
Args:
run_id (str): The ID of the mlflow run where the metric should be logged
"""

super().__init__(run_id, key, load_args, save_args)

# We add an extra argument mode="overwrite" / "append" to enable logging update an existing metric
# this is not an offical mlflow argument for log_metric, so we separate it from the others
# "overwrite" corresponds to the default mlflow behaviour
self.mode = self._save_args.pop("mode", self.DEFAULT_SAVE_MODE)

def _load(self):
self._validate_run_id()
mlflow_client = MlflowClient()
metric_history = mlflow_client.get_metric_history(
run_id=self.run_id, key=self.key
) # gets active run if no run_id was given

# the metric history is always a list of mlflow.entities.metric.Metric
# we want the value of the last one stored because this dataset only deal with one single metric
step = self._load_args.get("step")

if step is None:
# we take the last value recorded
metric_value = metric_history[-1].value
else:
# we should take the last historical value with the given step
# (it is possible to have several values with the same step)
metric_value = next(
metric.value
for metric in reversed(metric_history)
if metric.step == step
)

return metric_value

def _save(self, data: float):
if self._logging_activated:
self._validate_run_id()
run_id = (
self.run_id
) # we access it once instead of calling self.run_id everywhere to avoid looking or an active run each time

mlflow_client = MlflowClient()

# get the metric history if it has been saved previously to ensure
# to retrieve the right data
# reminder: this is True even if no run_id was originally specified but a run is active
metric_history = (
mlflow_client.get_metric_history(run_id=run_id, key=self.key)
if self._exists()
else []
)

save_args = deepcopy(self._save_args)
step = save_args.pop("step", None)
if step is None:
if self.mode == "overwrite":
step = max([metric.step for metric in metric_history], default=0)
elif self.mode == "append":
# I put a max([]) default to -1 so that default "step" equals 0
step = (
max([metric.step for metric in metric_history], default=-1) + 1
)
else:
raise ValueError(
f"save_args['mode'] must be one of {self.SUPPORTED_SAVE_MODES}, got '{self.mode}' instead."
)

mlflow_client.log_metric(
run_id=run_id,
key=self.key,
value=data,
step=step,
**save_args,
)
19 changes: 13 additions & 6 deletions tests/framework/hooks/test_all_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@

def fake_fun(input):
artifact = input
metric = {
metrics = {
"metric1": {"value": 1.1, "step": 1},
"metric2": [{"value": 1.1, "step": 1}, {"value": 1.2, "step": 2}],
}
metric = 1
model = 3
return artifact, metric, model
return artifact, metrics, metric, metric, model


@pytest.fixture
Expand Down Expand Up @@ -87,6 +88,9 @@ def catalog_config(kedro_project_path):
"metrics_data": {
"type": "kedro_mlflow.io.metrics.MlflowMetricsDataSet",
},
"metric_data": {
"type": "kedro_mlflow.io.metrics.MlflowMetricDataSet",
},
"model": {
"type": "kedro_mlflow.io.models.MlflowModelLoggerDataSet",
"flavor": "mlflow.sklearn",
Expand Down Expand Up @@ -212,7 +216,13 @@ def dummy_pipeline():
node(
func=fake_fun,
inputs=["params:a"],
outputs=["artifact_data", "metrics_data", "model"],
outputs=[
"artifact_data",
"metrics_data",
"metric_data",
"metric_data_with_run_id",
"model",
],
)
]
)
Expand Down Expand Up @@ -273,9 +283,6 @@ def test_deactivated_tracking_but_not_for_given_pipeline(
]
)

# context = session.load_context()
# context.run(pipeline_name="pipeline_on") # this is a pipeline should be tracked

mock_session.run(pipeline_name="pipeline_on")

all_runs_id_end = set(
Expand Down
Loading

0 comments on commit 1eecda2

Please sign in to comment.