Skip to content

Commit

Permalink
FIX #70 - Add an input schema in PipelineML for data validation at in…
Browse files Browse the repository at this point in the history
…ference time
  • Loading branch information
Galileo-Galilei committed Oct 30, 2020
1 parent a0f5aa3 commit 468b274
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 56 deletions.
27 changes: 14 additions & 13 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,30 @@

### Added

- `kedro-mlflow` now supports kedro 0.16.5 (#62)
- `kedro-mlflow` hooks can now be declared in `.kedro.yml` or `pyproject.toml` by adding `kedro_mlflow.framework.hooks.mlflow_pipeline_hook` and `kedro_mlflow.framework.hooks.mlflow_node_hook` into the hooks entry. _Only for kedro>=0.16.5_
- `pipeline_ml_factory` now accepts that `inference` pipeline `inputs` may be in `training` pipeline `inputs` (#71)
- `kedro-mlflow` now supports `kedro>=0.16.5` [#62](https://github.com/Galileo-Galilei/kedro-mlflow/issues/62)
- `kedro-mlflow` hooks can now be declared in `.kedro.yml` or `pyproject.toml` by adding `kedro_mlflow.framework.hooks.mlflow_pipeline_hook` and `kedro_mlflow.framework.hooks.mlflow_node_hook` into the hooks entry. _Only for kedro>=0.16.5_ [#96](https://github.com/Galileo-Galilei/kedro-mlflow/issues/96)
- `pipeline_ml_factory` now accepts that `inference` pipeline `inputs` may be in `training` pipeline `inputs` [#71](https://github.com/Galileo-Galilei/kedro-mlflow/issues/71)
- `pipeline_ml_factory` now infer automatically the schema of the input dataset to validate data automatically at inference time. The output schema can be declared manually in `model_signature` argument [#70](https://github.com/Galileo-Galilei/kedro-mlflow/issues/70)

### Fixed

- `get_mlflow_config` now uses the Kedro `ProjectContext` `ConfigLoader` to get configs (#66). This indirectly solves the following issues:
- `get_mlflow_config` now works in interactive mode if `load_context` is called with a path different from the working directory (#30)
- `kedro-mlflow` now works fine with `kedro jupyter notebook` command independently of the working directory (#64)
- You can use global variables in `mlflow.yml` which is now properly parsed if you use a `TemplatedConfigLoader` (#72)
- `get_mlflow_config` now uses the Kedro `ProjectContext` `ConfigLoader` to get configs [#66](https://github.com/Galileo-Galilei/kedro-mlflow/issues/66). This indirectly solves the following issues:
- `get_mlflow_config` now works in interactive mode if `load_context` is called with a path different from the working directory [#30](https://github.com/Galileo-Galilei/kedro-mlflow/issues/30)
- kedro_mlflow now works fine with kedro jupyter notebook independently of the working directory [#64](https://github.com/Galileo-Galilei/kedro-mlflow/issues/64)
- You can use global variables in `mlflow.yml` which is now properly parsed if you use a `TemplatedConfigLoader` [#72](https://github.com/Galileo-Galilei/kedro-mlflow/issues/72)
- `mlflow init` is now getting conf path from context.CONF_ROOT instead of hardcoded conf folder. This makes the package robust to Kedro changes.
- `MlflowMetricsDataset` now saves in the specified `run_id` instead of the current one when the prefix is not specified (#102)
- `MlflowMetricsDataset` now saves in the specified `run_id` instead of the current one when the prefix is not specified [#62](https://github.com/Galileo-Galilei/kedro-mlflow/issues/62)

### Changed

- Documentation reference to the plugin is now dynamic when necessary (#6).
- The test coverage now excludes `tests` and `setup.py` (#99).
- The `KedroPipelineModel` now unpacks the result of the `inference` pipeline and no longer returns a dictionary with the name in the `DataCatalog` but only the predicted value (#93).
- The `PipelineML.extract_pipeline_catalog` is renamed `PipelineML._extract_pipeline_catalog` to indicate it is a private method and is not intended to be used directly by end users (#100)
- Documentation reference to the plugin is now dynamic when necessary [#6](https://github.com/Galileo-Galilei/kedro-mlflow/issues/6)
- The test coverage now excludes `tests` and `setup.py` [#99](https://github.com/Galileo-Galilei/kedro-mlflow/issues/99)
- The `KedroPipelineModel` now unpacks the result of the `inference` pipeline and no longer returns a dictionary with the name in the `DataCatalog` but only the predicted value [#93](https://github.com/Galileo-Galilei/kedro-mlflow/issues/93)
- The `PipelineML.extract_pipeline_catalog` is renamed `PipelineML._extract_pipeline_catalog` to indicate it is a private method and is not intended to be used directly by end users [#100](https://github.com/Galileo-Galilei/kedro-mlflow/issues/100)

### Removed

- `kedro mlflow init` command is no longer declaring hooks in `run.py`. You must now [register your hooks manually](docs/source/03_tutorial/02_setup.md#declaring-kedro-mlflow-hooks) in the ``run.py`` (kedro > 0.16.0), ``.kedro.yml`` (kedro >= 0.16.5) or ``pyproject.toml`` (kedro >= 0.16.5)
- `kedro mlflow init` command is no longer declaring hooks in `run.py`. You must now [register your hooks manually](docs/source/03_tutorial/02_setup.md#declaring-kedro-mlflow-hooks) in the ``run.py`` (kedro > 0.16.0), ``.kedro.yml`` (kedro >= 0.16.5) or ``pyproject.toml`` (kedro >= 0.16.5) [](https://github.com/Galileo-Galilei/kedro-mlflow/issues/70)

## [0.3.0] - 2020-10-11

Expand Down
27 changes: 11 additions & 16 deletions kedro_mlflow/framework/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@
from pathlib import Path

import click
from kedro import __version__ as kedro_version
from kedro.framework.context import load_context
from packaging import version

from kedro_mlflow.framework.cli.cli_utils import write_jinja_template
from kedro_mlflow.framework.context import get_mlflow_config
from kedro_mlflow.utils import _already_updated, _is_kedro_project

try:
from kedro.framework.context import get_static_project_data
except ImportError: # pragma: no cover
from kedro_mlflow.utils import _get_project_globals as get_static_project_data # pragma: no cover
except ImportError: # pragma: no cover
from kedro_mlflow.utils import (
_get_project_globals as get_static_project_data, # pragma: no cover
)


TEMPLATE_FOLDER_PATH = Path(__file__).parent.parent.parent / "template" / "project"
Expand Down Expand Up @@ -49,15 +49,13 @@ def get_command(self, ctx, cmd_name):

@click.group(name="Mlflow")
def commands():
"""Kedro plugin for interactions with mlflow.
"""
"""Kedro plugin for interactions with mlflow."""
pass # pragma: no cover


@commands.command(name="mlflow", cls=KedroClickGroup)
def mlflow_commands():
"""Use mlflow-specific commands inside kedro project.
"""
"""Use mlflow-specific commands inside kedro project."""
pass


Expand Down Expand Up @@ -128,8 +126,8 @@ def init(force, silent):
)
def ui(project_path, env):
"""Opens the mlflow user interface with the
project-specific settings of mlflow.yml. This interface
enables to browse and compares runs.
project-specific settings of mlflow.yml. This interface
enables to browse and compares runs.
"""

Expand All @@ -148,8 +146,7 @@ def ui(project_path, env):

@mlflow_commands.command()
def run():
"""Re-run an old run with mlflow-logged info.
"""
"""Re-run an old run with mlflow-logged info."""

# TODO (HARD) : define general assumptions to check whether a run
# is reproductible or not
Expand All @@ -163,13 +160,11 @@ def run():

@mlflow_commands.command()
def new():
"""Create a new kedro project with updated template.
"""
"""Create a new kedro project with updated template."""
raise NotImplementedError # pragma: no cover


class KedroMlflowCliError(Exception):
""" kedro-mlflow cli specific error
"""
"""kedro-mlflow cli specific error"""

pass
11 changes: 11 additions & 0 deletions kedro_mlflow/framework/hooks/pipeline_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from kedro.io import DataCatalog
from kedro.pipeline import Pipeline
from kedro.versioning.journal import _git_sha
from mlflow.models import infer_signature

from kedro_mlflow.framework.context import get_mlflow_config
from kedro_mlflow.io import MlflowMetricsDataSet
Expand Down Expand Up @@ -136,13 +137,23 @@ def after_pipeline_run(
if isinstance(pipeline, PipelineML):
pipeline_catalog = pipeline._extract_pipeline_catalog(catalog)
artifacts = pipeline.extract_pipeline_artifacts(pipeline_catalog)

if pipeline.model_signature == "auto":
input_data = pipeline_catalog.load(pipeline.input_name)
model_signature = infer_signature(model_input=input_data)
else:
model_signature = pipeline.model_signature
print("OHE")
print(model_signature)

mlflow.pyfunc.log_model(
artifact_path=pipeline.model_name,
python_model=KedroPipelineModel(
pipeline_ml=pipeline, catalog=pipeline_catalog
),
artifacts=artifacts,
conda_env=_format_conda_env(pipeline.conda_env),
signature=model_signature,
)
# Close the mlflow active run at the end of the pipeline to avoid interactions with further runs
mlflow.end_run()
Expand Down
63 changes: 42 additions & 21 deletions kedro_mlflow/pipeline/pipeline_ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from kedro.io import DataCatalog, MemoryDataSet
from kedro.pipeline import Pipeline
from kedro.pipeline.node import Node
from mlflow.models import ModelSignature

MSG_NOT_IMPLEMENTED = (
"This method is not implemented because it does"
Expand Down Expand Up @@ -43,6 +44,7 @@ def __init__(
input_name: str,
conda_env: Optional[Union[str, Path, Dict[str, Any]]] = None,
model_name: Optional[str] = "model",
model_signature: Union[ModelSignature, str, None] = "auto",
):

"""Store all necessary information for calling mlflow.log_model in the pipeline.
Expand Down Expand Up @@ -77,6 +79,13 @@ def __init__(
model_name (Union[str, None], optional): The name of
the folder where the model will be stored in
remote mlflow. Defaults to "model".
model_signature (Union[ModelSignature, bool]): The mlflow
signature of the input dataframe common to training
and inference.
- If 'auto', it is infered automatically
- If None, no signature is used
- if a `ModelSignature` instance, passed
to the underlying dataframe
"""

super().__init__(nodes, *args, tags=tags)
Expand All @@ -85,17 +94,31 @@ def __init__(
self.conda_env = conda_env
self.model_name = model_name
self.input_name = input_name
self.model_signature = model_signature

self._check_consistency()

@property
def input_name(self) -> str:
return self._input_name

@property
def _logger(self) -> logging.Logger:
return logging.getLogger(__name__)

@property
def training(self) -> Pipeline:
return Pipeline(self.nodes)

@property
def inference(self) -> str:
return self._inference

@inference.setter
def inference(self, inference: Pipeline) -> None:
self._check_inference(inference)
self._inference = inference

@property
def input_name(self) -> str:
return self._input_name

@input_name.setter
def input_name(self, name: str) -> None:
allowed_names = self.inference.inputs()
Expand All @@ -110,17 +133,18 @@ def input_name(self, name: str) -> None:
self._input_name = name

@property
def inference(self) -> str:
return self._inference

@inference.setter
def inference(self, inference: Pipeline) -> None:
self._check_inference(inference)
self._inference = inference

@property
def training(self) -> Pipeline:
return Pipeline(self.nodes)
def model_signature(self) -> str:
return self._model_signature

@model_signature.setter
def model_signature(self, model_signature: ModelSignature) -> None:
if model_signature is not None:
if not isinstance(model_signature, ModelSignature):
if model_signature != "auto":
raise ValueError(
f"model_signature must be one of 'None', 'auto', or a 'ModelSignature' Object, got '{type(model_signature)}'"
)
self._model_signature = model_signature

def _check_inference(self, inference: Pipeline) -> None:
nb_outputs = len(inference.outputs())
Expand Down Expand Up @@ -276,15 +300,12 @@ def __or__(self, other): # pragma: no cover


class KedroMlflowPipelineMLInputsError(Exception):
"""Error raised when the inputs of KedroPipelineModel are invalid
"""
"""Error raised when the inputs of KedroPipelineModel are invalid"""


class KedroMlflowPipelineMLDatasetsError(Exception):
"""Error raised when the inputs of KedroPipelineMoel are invalid
"""
"""Error raised when the inputs of KedroPipelineMoel are invalid"""


class KedroMlflowPipelineMLOutputsError(Exception):
"""Error raised when the outputs of KedroPipelineModel are invalid
"""
"""Error raised when the outputs of KedroPipelineModel are invalid"""
10 changes: 10 additions & 0 deletions kedro_mlflow/pipeline/pipeline_ml_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from warnings import warn

from kedro.pipeline import Pipeline
from mlflow.models import ModelSignature

from kedro_mlflow.pipeline.pipeline_ml import PipelineML

Expand All @@ -13,6 +14,7 @@ def pipeline_ml_factory(
input_name: str = None,
conda_env: Optional[Union[str, Path, Dict[str, Any]]] = None,
model_name: Optional[str] = "model",
model_signature: Union[ModelSignature, str, None] = "auto",
) -> PipelineML:
"""This function is a helper to create `PipelineML`
object directly from two Kedro `Pipelines` (one of
Expand Down Expand Up @@ -47,6 +49,13 @@ def pipeline_ml_factory(
model_name (Union[str, None], optional): The name of
the folder where the model will be stored in
remote mlflow. Defaults to "model".
model_signature (Union[ModelSignature, bool]): The mlflow
signature of the input dataframe common to training
and inference.
- If 'auto', it is infered automatically
- If None, no signature is used
- if a `ModelSignature` instance, passed
to the underlying dataframe
Returns:
PipelineML: A `PipelineML` which is automatically
Expand All @@ -61,6 +70,7 @@ def pipeline_ml_factory(
input_name=input_name,
conda_env=conda_env,
model_name=model_name,
model_signature=model_signature,
)
return pipeline

Expand Down
Loading

0 comments on commit 468b274

Please sign in to comment.