Skip to content

Commit

Permalink
FIX #70 - Add an input schema in PipelineML for data validation at in…
Browse files Browse the repository at this point in the history
…ference time
  • Loading branch information
Galileo-Galilei committed Oct 30, 2020
1 parent a0f5aa3 commit 95d5980
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 58 deletions.
27 changes: 14 additions & 13 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,30 @@

### Added

- `kedro-mlflow` now supports kedro 0.16.5 (#62)
- `kedro-mlflow` hooks can now be declared in `.kedro.yml` or `pyproject.toml` by adding `kedro_mlflow.framework.hooks.mlflow_pipeline_hook` and `kedro_mlflow.framework.hooks.mlflow_node_hook` into the hooks entry. _Only for kedro>=0.16.5_
- `pipeline_ml_factory` now accepts that `inference` pipeline `inputs` may be in `training` pipeline `inputs` (#71)
- `kedro-mlflow` now supports `kedro>=0.16.5` [#62](https://github.com/Galileo-Galilei/kedro-mlflow/issues/62)
- `kedro-mlflow` hooks can now be declared in `.kedro.yml` or `pyproject.toml` by adding `kedro_mlflow.framework.hooks.mlflow_pipeline_hook` and `kedro_mlflow.framework.hooks.mlflow_node_hook` into the hooks entry. _Only for kedro>=0.16.5_ [#96](https://github.com/Galileo-Galilei/kedro-mlflow/issues/96)
- `pipeline_ml_factory` now accepts that `inference` pipeline `inputs` may be in `training` pipeline `inputs` [#71](https://github.com/Galileo-Galilei/kedro-mlflow/issues/71)
- `pipeline_ml_factory` now infer automatically the schema of the input dataset to validate data automatically at inference time. The output schema can be declared manually in `model_signature` argument [#70](https://github.com/Galileo-Galilei/kedro-mlflow/issues/70)

### Fixed

- `get_mlflow_config` now uses the Kedro `ProjectContext` `ConfigLoader` to get configs (#66). This indirectly solves the following issues:
- `get_mlflow_config` now works in interactive mode if `load_context` is called with a path different from the working directory (#30)
- `kedro-mlflow` now works fine with `kedro jupyter notebook` command independently of the working directory (#64)
- You can use global variables in `mlflow.yml` which is now properly parsed if you use a `TemplatedConfigLoader` (#72)
- `get_mlflow_config` now uses the Kedro `ProjectContext` `ConfigLoader` to get configs [#66](https://github.com/Galileo-Galilei/kedro-mlflow/issues/66). This indirectly solves the following issues:
- `get_mlflow_config` now works in interactive mode if `load_context` is called with a path different from the working directory [#30](https://github.com/Galileo-Galilei/kedro-mlflow/issues/30)
- kedro_mlflow now works fine with kedro jupyter notebook independently of the working directory [#64](https://github.com/Galileo-Galilei/kedro-mlflow/issues/64)
- You can use global variables in `mlflow.yml` which is now properly parsed if you use a `TemplatedConfigLoader` [#72](https://github.com/Galileo-Galilei/kedro-mlflow/issues/72)
- `mlflow init` is now getting conf path from context.CONF_ROOT instead of hardcoded conf folder. This makes the package robust to Kedro changes.
- `MlflowMetricsDataset` now saves in the specified `run_id` instead of the current one when the prefix is not specified (#102)
- `MlflowMetricsDataset` now saves in the specified `run_id` instead of the current one when the prefix is not specified [#62](https://github.com/Galileo-Galilei/kedro-mlflow/issues/62)

### Changed

- Documentation reference to the plugin is now dynamic when necessary (#6).
- The test coverage now excludes `tests` and `setup.py` (#99).
- The `KedroPipelineModel` now unpacks the result of the `inference` pipeline and no longer returns a dictionary with the name in the `DataCatalog` but only the predicted value (#93).
- The `PipelineML.extract_pipeline_catalog` is renamed `PipelineML._extract_pipeline_catalog` to indicate it is a private method and is not intended to be used directly by end users (#100)
- Documentation reference to the plugin is now dynamic when necessary [#6](https://github.com/Galileo-Galilei/kedro-mlflow/issues/6)
- The test coverage now excludes `tests` and `setup.py` [#99](https://github.com/Galileo-Galilei/kedro-mlflow/issues/99)
- The `KedroPipelineModel` now unpacks the result of the `inference` pipeline and no longer returns a dictionary with the name in the `DataCatalog` but only the predicted value [#93](https://github.com/Galileo-Galilei/kedro-mlflow/issues/93)
- The `PipelineML.extract_pipeline_catalog` is renamed `PipelineML._extract_pipeline_catalog` to indicate it is a private method and is not intended to be used directly by end users [#100](https://github.com/Galileo-Galilei/kedro-mlflow/issues/100)

### Removed

- `kedro mlflow init` command is no longer declaring hooks in `run.py`. You must now [register your hooks manually](docs/source/03_tutorial/02_setup.md#declaring-kedro-mlflow-hooks) in the ``run.py`` (kedro > 0.16.0), ``.kedro.yml`` (kedro >= 0.16.5) or ``pyproject.toml`` (kedro >= 0.16.5)
- `kedro mlflow init` command is no longer declaring hooks in `run.py`. You must now [register your hooks manually](docs/source/03_tutorial/02_setup.md#declaring-kedro-mlflow-hooks) in the ``run.py`` (kedro > 0.16.0), ``.kedro.yml`` (kedro >= 0.16.5) or ``pyproject.toml`` (kedro >= 0.16.5) [](https://github.com/Galileo-Galilei/kedro-mlflow/issues/70)

## [0.3.0] - 2020-10-11

Expand Down
7 changes: 6 additions & 1 deletion docs/source/05_python_objects/02_Hooks.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
# ``Hooks``

This package provides 2 new hooks.

## ``MlflowPipelineHook``

This hook :

1. manages mlflow settings at the beginning and the end of the run (run start / end).
2. log useful informations for reproducibility as ``mlflow tags`` (including kedro ``Journal`` information and the commands used to launch the run).
3. register the pipeline as a valid ``mlflow model`` if [it is a ``PipelineML`` instance](#new-pipeline)

## ``MlflowNodeHook``
This hook :

This hook:

1. must be used with the ``MlflowPipelineHook``
2. autolog nodes parameters each time the pipeline is run (with ``kedro run`` or programatically).
11 changes: 10 additions & 1 deletion docs/source/05_python_objects/03_Pipelines.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Pipelines

## ``PipelineML`` and ``pipeline_ml_factory``

``PipelineML`` is a new class which extends ``Pipeline`` and enable to bind two pipelines (one of training, one of inference) together. This class comes with a ``KedroPipelineModel`` class for logging it in mlflow. A pipeline logged as a mlflow model can be served using ``mlflow models serve`` and ``mlflow models predict`` command.
Expand All @@ -24,9 +25,11 @@ def create_pipelines(**kwargs) -> Dict[str, Pipeline]:
}

```

Now each time you will run ``kedro run --pipeline=training`` (provided you registered ``MlflowPipelineHook`` in you ``run.py``), the full inference pipeline will be registered as a mlflow model (with all the outputs produced by training as artifacts : the machine learning model, but also the *scaler*, *vectorizer*, *imputer*, or whatever object fitted on data you create in ``training`` and that is used in ``inference``).

Note that:

- the `inference` pipeline `input_name` can be a `MemoryDataSet` and it belongs to inference pipeline `inputs`
- Apart form `input_name`, all other `inference` pipeline `inputs` must be persisted locally on disk (i.e. it must not be `MemoryDataSet` and must have a local `filepath`)
- the `inference` pipeline `inputs` must belong to training `outputs` (vectorizer, binarizer, machine learning model...)
Expand All @@ -38,20 +41,26 @@ Note that:
from pathlib import Path
from kedro.framework.context import load_context
from kedro_mlflow.mlflow import KedroPipelineModel
from mlflow.models import ModelSignature

# pipeline_training is your PipelineML object, created as previsously
catalog = load_context(".").io

# artifacts are all the inputs of the inference pipelines that are persisted in the catalog
artifacts = pipeline_training.extract_pipeline_artifacts(catalog)

# get the schema of the input dataset
input_data = catalog.load(pipeline_training.input_name)
model_signature = infer_signature(model_input=input_data)

mlflow.pyfunc.log_model(
artifact_path="model",
python_model=KedroPipelineModel(
pipeline_ml=pipeline_training,
catalog=catalog
),
artifacts=artifacts,
conda_env={"python": "3.7.0"}
conda_env={"python": "3.7.0"},
model_signature=model_signature
)
```
27 changes: 11 additions & 16 deletions kedro_mlflow/framework/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@
from pathlib import Path

import click
from kedro import __version__ as kedro_version
from kedro.framework.context import load_context
from packaging import version

from kedro_mlflow.framework.cli.cli_utils import write_jinja_template
from kedro_mlflow.framework.context import get_mlflow_config
from kedro_mlflow.utils import _already_updated, _is_kedro_project

try:
from kedro.framework.context import get_static_project_data
except ImportError: # pragma: no cover
from kedro_mlflow.utils import _get_project_globals as get_static_project_data # pragma: no cover
except ImportError: # pragma: no cover
from kedro_mlflow.utils import (
_get_project_globals as get_static_project_data, # pragma: no cover
)


TEMPLATE_FOLDER_PATH = Path(__file__).parent.parent.parent / "template" / "project"
Expand Down Expand Up @@ -49,15 +49,13 @@ def get_command(self, ctx, cmd_name):

@click.group(name="Mlflow")
def commands():
"""Kedro plugin for interactions with mlflow.
"""
"""Kedro plugin for interactions with mlflow."""
pass # pragma: no cover


@commands.command(name="mlflow", cls=KedroClickGroup)
def mlflow_commands():
"""Use mlflow-specific commands inside kedro project.
"""
"""Use mlflow-specific commands inside kedro project."""
pass


Expand Down Expand Up @@ -128,8 +126,8 @@ def init(force, silent):
)
def ui(project_path, env):
"""Opens the mlflow user interface with the
project-specific settings of mlflow.yml. This interface
enables to browse and compares runs.
project-specific settings of mlflow.yml. This interface
enables to browse and compares runs.
"""

Expand All @@ -148,8 +146,7 @@ def ui(project_path, env):

@mlflow_commands.command()
def run():
"""Re-run an old run with mlflow-logged info.
"""
"""Re-run an old run with mlflow-logged info."""

# TODO (HARD) : define general assumptions to check whether a run
# is reproductible or not
Expand All @@ -163,13 +160,11 @@ def run():

@mlflow_commands.command()
def new():
"""Create a new kedro project with updated template.
"""
"""Create a new kedro project with updated template."""
raise NotImplementedError # pragma: no cover


class KedroMlflowCliError(Exception):
""" kedro-mlflow cli specific error
"""
"""kedro-mlflow cli specific error"""

pass
9 changes: 9 additions & 0 deletions kedro_mlflow/framework/hooks/pipeline_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from kedro.io import DataCatalog
from kedro.pipeline import Pipeline
from kedro.versioning.journal import _git_sha
from mlflow.models import infer_signature

from kedro_mlflow.framework.context import get_mlflow_config
from kedro_mlflow.io import MlflowMetricsDataSet
Expand Down Expand Up @@ -136,13 +137,21 @@ def after_pipeline_run(
if isinstance(pipeline, PipelineML):
pipeline_catalog = pipeline._extract_pipeline_catalog(catalog)
artifacts = pipeline.extract_pipeline_artifacts(pipeline_catalog)

if pipeline.model_signature == "auto":
input_data = pipeline_catalog.load(pipeline.input_name)
model_signature = infer_signature(model_input=input_data)
else:
model_signature = pipeline.model_signature

mlflow.pyfunc.log_model(
artifact_path=pipeline.model_name,
python_model=KedroPipelineModel(
pipeline_ml=pipeline, catalog=pipeline_catalog
),
artifacts=artifacts,
conda_env=_format_conda_env(pipeline.conda_env),
signature=model_signature,
)
# Close the mlflow active run at the end of the pipeline to avoid interactions with further runs
mlflow.end_run()
Expand Down
63 changes: 42 additions & 21 deletions kedro_mlflow/pipeline/pipeline_ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from kedro.io import DataCatalog, MemoryDataSet
from kedro.pipeline import Pipeline
from kedro.pipeline.node import Node
from mlflow.models import ModelSignature

MSG_NOT_IMPLEMENTED = (
"This method is not implemented because it does"
Expand Down Expand Up @@ -43,6 +44,7 @@ def __init__(
input_name: str,
conda_env: Optional[Union[str, Path, Dict[str, Any]]] = None,
model_name: Optional[str] = "model",
model_signature: Union[ModelSignature, str, None] = "auto",
):

"""Store all necessary information for calling mlflow.log_model in the pipeline.
Expand Down Expand Up @@ -77,6 +79,13 @@ def __init__(
model_name (Union[str, None], optional): The name of
the folder where the model will be stored in
remote mlflow. Defaults to "model".
model_signature (Union[ModelSignature, bool]): The mlflow
signature of the input dataframe common to training
and inference.
- If 'auto', it is infered automatically
- If None, no signature is used
- if a `ModelSignature` instance, passed
to the underlying dataframe
"""

super().__init__(nodes, *args, tags=tags)
Expand All @@ -85,17 +94,31 @@ def __init__(
self.conda_env = conda_env
self.model_name = model_name
self.input_name = input_name
self.model_signature = model_signature

self._check_consistency()

@property
def input_name(self) -> str:
return self._input_name

@property
def _logger(self) -> logging.Logger:
return logging.getLogger(__name__)

@property
def training(self) -> Pipeline:
return Pipeline(self.nodes)

@property
def inference(self) -> str:
return self._inference

@inference.setter
def inference(self, inference: Pipeline) -> None:
self._check_inference(inference)
self._inference = inference

@property
def input_name(self) -> str:
return self._input_name

@input_name.setter
def input_name(self, name: str) -> None:
allowed_names = self.inference.inputs()
Expand All @@ -110,17 +133,18 @@ def input_name(self, name: str) -> None:
self._input_name = name

@property
def inference(self) -> str:
return self._inference

@inference.setter
def inference(self, inference: Pipeline) -> None:
self._check_inference(inference)
self._inference = inference

@property
def training(self) -> Pipeline:
return Pipeline(self.nodes)
def model_signature(self) -> str:
return self._model_signature

@model_signature.setter
def model_signature(self, model_signature: ModelSignature) -> None:
if model_signature is not None:
if not isinstance(model_signature, ModelSignature):
if model_signature != "auto":
raise ValueError(
f"model_signature must be one of 'None', 'auto', or a 'ModelSignature' Object, got '{type(model_signature)}'"
)
self._model_signature = model_signature

def _check_inference(self, inference: Pipeline) -> None:
nb_outputs = len(inference.outputs())
Expand Down Expand Up @@ -276,15 +300,12 @@ def __or__(self, other): # pragma: no cover


class KedroMlflowPipelineMLInputsError(Exception):
"""Error raised when the inputs of KedroPipelineModel are invalid
"""
"""Error raised when the inputs of KedroPipelineModel are invalid"""


class KedroMlflowPipelineMLDatasetsError(Exception):
"""Error raised when the inputs of KedroPipelineMoel are invalid
"""
"""Error raised when the inputs of KedroPipelineMoel are invalid"""


class KedroMlflowPipelineMLOutputsError(Exception):
"""Error raised when the outputs of KedroPipelineModel are invalid
"""
"""Error raised when the outputs of KedroPipelineModel are invalid"""
10 changes: 10 additions & 0 deletions kedro_mlflow/pipeline/pipeline_ml_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from warnings import warn

from kedro.pipeline import Pipeline
from mlflow.models import ModelSignature

from kedro_mlflow.pipeline.pipeline_ml import PipelineML

Expand All @@ -13,6 +14,7 @@ def pipeline_ml_factory(
input_name: str = None,
conda_env: Optional[Union[str, Path, Dict[str, Any]]] = None,
model_name: Optional[str] = "model",
model_signature: Union[ModelSignature, str, None] = "auto",
) -> PipelineML:
"""This function is a helper to create `PipelineML`
object directly from two Kedro `Pipelines` (one of
Expand Down Expand Up @@ -47,6 +49,13 @@ def pipeline_ml_factory(
model_name (Union[str, None], optional): The name of
the folder where the model will be stored in
remote mlflow. Defaults to "model".
model_signature (Union[ModelSignature, bool]): The mlflow
signature of the input dataframe common to training
and inference.
- If 'auto', it is infered automatically
- If None, no signature is used
- if a `ModelSignature` instance, passed
to the underlying dataframe
Returns:
PipelineML: A `PipelineML` which is automatically
Expand All @@ -61,6 +70,7 @@ def pipeline_ml_factory(
input_name=input_name,
conda_env=conda_env,
model_name=model_name,
model_signature=model_signature,
)
return pipeline

Expand Down
Loading

0 comments on commit 95d5980

Please sign in to comment.