Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX #70 - Enforce input schema when logging a PipelineML #107

Merged
merged 2 commits into from
Oct 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,8 @@ repos:
hooks:
- id: black
language_version: python3.7
- repo: https://github.com/asottile/seed-isort-config
rev: v2.1.1
hooks:
- id: seed-isort-config
args: [--exclude=kedro_mlflow/template/project/run.py]
- repo: https://github.com/timothycrosley/isort
rev: e762d48fce82492b9c092653fc2f5cbf00197dcc
rev: 5.6.4
hooks:
- id: isort
- repo: https://gitlab.com/pycqa/flake8
Expand Down
27 changes: 14 additions & 13 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,30 @@

### Added

- `kedro-mlflow` now supports kedro 0.16.5 (#62)
- `kedro-mlflow` hooks can now be declared in `.kedro.yml` or `pyproject.toml` by adding `kedro_mlflow.framework.hooks.mlflow_pipeline_hook` and `kedro_mlflow.framework.hooks.mlflow_node_hook` into the hooks entry. _Only for kedro>=0.16.5_
- `pipeline_ml_factory` now accepts that `inference` pipeline `inputs` may be in `training` pipeline `inputs` (#71)
- `kedro-mlflow` now supports `kedro>=0.16.5` [#62](https://github.com/Galileo-Galilei/kedro-mlflow/issues/62)
- `kedro-mlflow` hooks can now be declared in `.kedro.yml` or `pyproject.toml` by adding `kedro_mlflow.framework.hooks.mlflow_pipeline_hook` and `kedro_mlflow.framework.hooks.mlflow_node_hook` into the hooks entry. _Only for kedro>=0.16.5_ [#96](https://github.com/Galileo-Galilei/kedro-mlflow/issues/96)
- `pipeline_ml_factory` now accepts that `inference` pipeline `inputs` may be in `training` pipeline `inputs` [#71](https://github.com/Galileo-Galilei/kedro-mlflow/issues/71)
- `pipeline_ml_factory` now infer automatically the schema of the input dataset to validate data automatically at inference time. The output schema can be declared manually in `model_signature` argument [#70](https://github.com/Galileo-Galilei/kedro-mlflow/issues/70)

### Fixed

- `get_mlflow_config` now uses the Kedro `ProjectContext` `ConfigLoader` to get configs (#66). This indirectly solves the following issues:
- `get_mlflow_config` now works in interactive mode if `load_context` is called with a path different from the working directory (#30)
- `kedro-mlflow` now works fine with `kedro jupyter notebook` command independently of the working directory (#64)
- You can use global variables in `mlflow.yml` which is now properly parsed if you use a `TemplatedConfigLoader` (#72)
- `get_mlflow_config` now uses the Kedro `ProjectContext` `ConfigLoader` to get configs [#66](https://github.com/Galileo-Galilei/kedro-mlflow/issues/66). This indirectly solves the following issues:
- `get_mlflow_config` now works in interactive mode if `load_context` is called with a path different from the working directory [#30](https://github.com/Galileo-Galilei/kedro-mlflow/issues/30)
- kedro_mlflow now works fine with kedro jupyter notebook independently of the working directory [#64](https://github.com/Galileo-Galilei/kedro-mlflow/issues/64)
- You can use global variables in `mlflow.yml` which is now properly parsed if you use a `TemplatedConfigLoader` [#72](https://github.com/Galileo-Galilei/kedro-mlflow/issues/72)
- `mlflow init` is now getting conf path from context.CONF_ROOT instead of hardcoded conf folder. This makes the package robust to Kedro changes.
- `MlflowMetricsDataset` now saves in the specified `run_id` instead of the current one when the prefix is not specified (#102)
- `MlflowMetricsDataset` now saves in the specified `run_id` instead of the current one when the prefix is not specified [#62](https://github.com/Galileo-Galilei/kedro-mlflow/issues/62)

### Changed

- Documentation reference to the plugin is now dynamic when necessary (#6).
- The test coverage now excludes `tests` and `setup.py` (#99).
- The `KedroPipelineModel` now unpacks the result of the `inference` pipeline and no longer returns a dictionary with the name in the `DataCatalog` but only the predicted value (#93).
- The `PipelineML.extract_pipeline_catalog` is renamed `PipelineML._extract_pipeline_catalog` to indicate it is a private method and is not intended to be used directly by end users (#100)
- Documentation reference to the plugin is now dynamic when necessary [#6](https://github.com/Galileo-Galilei/kedro-mlflow/issues/6)
- The test coverage now excludes `tests` and `setup.py` [#99](https://github.com/Galileo-Galilei/kedro-mlflow/issues/99)
- The `KedroPipelineModel` now unpacks the result of the `inference` pipeline and no longer returns a dictionary with the name in the `DataCatalog` but only the predicted value [#93](https://github.com/Galileo-Galilei/kedro-mlflow/issues/93)
- The `PipelineML.extract_pipeline_catalog` is renamed `PipelineML._extract_pipeline_catalog` to indicate it is a private method and is not intended to be used directly by end users [#100](https://github.com/Galileo-Galilei/kedro-mlflow/issues/100)

### Removed

- `kedro mlflow init` command is no longer declaring hooks in `run.py`. You must now [register your hooks manually](docs/source/03_tutorial/02_setup.md#declaring-kedro-mlflow-hooks) in the ``run.py`` (kedro > 0.16.0), ``.kedro.yml`` (kedro >= 0.16.5) or ``pyproject.toml`` (kedro >= 0.16.5)
- `kedro mlflow init` command is no longer declaring hooks in `run.py`. You must now [register your hooks manually](docs/source/03_tutorial/02_setup.md#declaring-kedro-mlflow-hooks) in the ``run.py`` (kedro > 0.16.0), ``.kedro.yml`` (kedro >= 0.16.5) or ``pyproject.toml`` (kedro >= 0.16.5) [](https://github.com/Galileo-Galilei/kedro-mlflow/issues/70)

## [0.3.0] - 2020-10-11

Expand Down
7 changes: 6 additions & 1 deletion docs/source/05_python_objects/02_Hooks.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
# ``Hooks``

This package provides 2 new hooks.

## ``MlflowPipelineHook``

This hook :

1. manages mlflow settings at the beginning and the end of the run (run start / end).
2. log useful informations for reproducibility as ``mlflow tags`` (including kedro ``Journal`` information and the commands used to launch the run).
3. register the pipeline as a valid ``mlflow model`` if [it is a ``PipelineML`` instance](#new-pipeline)

## ``MlflowNodeHook``
This hook :

This hook:

1. must be used with the ``MlflowPipelineHook``
2. autolog nodes parameters each time the pipeline is run (with ``kedro run`` or programatically).
11 changes: 10 additions & 1 deletion docs/source/05_python_objects/03_Pipelines.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Pipelines

## ``PipelineML`` and ``pipeline_ml_factory``

``PipelineML`` is a new class which extends ``Pipeline`` and enable to bind two pipelines (one of training, one of inference) together. This class comes with a ``KedroPipelineModel`` class for logging it in mlflow. A pipeline logged as a mlflow model can be served using ``mlflow models serve`` and ``mlflow models predict`` command.
Expand All @@ -24,9 +25,11 @@ def create_pipelines(**kwargs) -> Dict[str, Pipeline]:
}

```

Now each time you will run ``kedro run --pipeline=training`` (provided you registered ``MlflowPipelineHook`` in you ``run.py``), the full inference pipeline will be registered as a mlflow model (with all the outputs produced by training as artifacts : the machine learning model, but also the *scaler*, *vectorizer*, *imputer*, or whatever object fitted on data you create in ``training`` and that is used in ``inference``).

Note that:

- the `inference` pipeline `input_name` can be a `MemoryDataSet` and it belongs to inference pipeline `inputs`
- Apart form `input_name`, all other `inference` pipeline `inputs` must be persisted locally on disk (i.e. it must not be `MemoryDataSet` and must have a local `filepath`)
- the `inference` pipeline `inputs` must belong to training `outputs` (vectorizer, binarizer, machine learning model...)
Expand All @@ -38,20 +41,26 @@ Note that:
from pathlib import Path
from kedro.framework.context import load_context
from kedro_mlflow.mlflow import KedroPipelineModel
from mlflow.models import ModelSignature

# pipeline_training is your PipelineML object, created as previsously
catalog = load_context(".").io

# artifacts are all the inputs of the inference pipelines that are persisted in the catalog
artifacts = pipeline_training.extract_pipeline_artifacts(catalog)

# get the schema of the input dataset
input_data = catalog.load(pipeline_training.input_name)
model_signature = infer_signature(model_input=input_data)

mlflow.pyfunc.log_model(
artifact_path="model",
python_model=KedroPipelineModel(
pipeline_ml=pipeline_training,
catalog=catalog
),
artifacts=artifacts,
conda_env={"python": "3.7.0"}
conda_env={"python": "3.7.0"},
model_signature=model_signature
)
```
27 changes: 11 additions & 16 deletions kedro_mlflow/framework/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@
from pathlib import Path

import click
from kedro import __version__ as kedro_version
from kedro.framework.context import load_context
from packaging import version

from kedro_mlflow.framework.cli.cli_utils import write_jinja_template
from kedro_mlflow.framework.context import get_mlflow_config
from kedro_mlflow.utils import _already_updated, _is_kedro_project

try:
from kedro.framework.context import get_static_project_data
except ImportError: # pragma: no cover
from kedro_mlflow.utils import _get_project_globals as get_static_project_data # pragma: no cover
except ImportError: # pragma: no cover
from kedro_mlflow.utils import (
_get_project_globals as get_static_project_data, # pragma: no cover
)


TEMPLATE_FOLDER_PATH = Path(__file__).parent.parent.parent / "template" / "project"
Expand Down Expand Up @@ -49,15 +49,13 @@ def get_command(self, ctx, cmd_name):

@click.group(name="Mlflow")
def commands():
"""Kedro plugin for interactions with mlflow.
"""
"""Kedro plugin for interactions with mlflow."""
pass # pragma: no cover


@commands.command(name="mlflow", cls=KedroClickGroup)
def mlflow_commands():
"""Use mlflow-specific commands inside kedro project.
"""
"""Use mlflow-specific commands inside kedro project."""
pass


Expand Down Expand Up @@ -128,8 +126,8 @@ def init(force, silent):
)
def ui(project_path, env):
"""Opens the mlflow user interface with the
project-specific settings of mlflow.yml. This interface
enables to browse and compares runs.
project-specific settings of mlflow.yml. This interface
enables to browse and compares runs.
"""

Expand All @@ -148,8 +146,7 @@ def ui(project_path, env):

@mlflow_commands.command()
def run():
"""Re-run an old run with mlflow-logged info.
"""
"""Re-run an old run with mlflow-logged info."""

# TODO (HARD) : define general assumptions to check whether a run
# is reproductible or not
Expand All @@ -163,13 +160,11 @@ def run():

@mlflow_commands.command()
def new():
"""Create a new kedro project with updated template.
"""
"""Create a new kedro project with updated template."""
raise NotImplementedError # pragma: no cover


class KedroMlflowCliError(Exception):
""" kedro-mlflow cli specific error
"""
"""kedro-mlflow cli specific error"""

pass
9 changes: 9 additions & 0 deletions kedro_mlflow/framework/hooks/pipeline_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from kedro.io import DataCatalog
from kedro.pipeline import Pipeline
from kedro.versioning.journal import _git_sha
from mlflow.models import infer_signature

from kedro_mlflow.framework.context import get_mlflow_config
from kedro_mlflow.io import MlflowMetricsDataSet
Expand Down Expand Up @@ -136,13 +137,21 @@ def after_pipeline_run(
if isinstance(pipeline, PipelineML):
pipeline_catalog = pipeline._extract_pipeline_catalog(catalog)
artifacts = pipeline.extract_pipeline_artifacts(pipeline_catalog)

if pipeline.model_signature == "auto":
input_data = pipeline_catalog.load(pipeline.input_name)
model_signature = infer_signature(model_input=input_data)
else:
model_signature = pipeline.model_signature

mlflow.pyfunc.log_model(
artifact_path=pipeline.model_name,
python_model=KedroPipelineModel(
pipeline_ml=pipeline, catalog=pipeline_catalog
),
artifacts=artifacts,
conda_env=_format_conda_env(pipeline.conda_env),
signature=model_signature,
)
# Close the mlflow active run at the end of the pipeline to avoid interactions with further runs
mlflow.end_run()
Expand Down
63 changes: 42 additions & 21 deletions kedro_mlflow/pipeline/pipeline_ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from kedro.io import DataCatalog, MemoryDataSet
from kedro.pipeline import Pipeline
from kedro.pipeline.node import Node
from mlflow.models import ModelSignature

MSG_NOT_IMPLEMENTED = (
"This method is not implemented because it does"
Expand Down Expand Up @@ -43,6 +44,7 @@ def __init__(
input_name: str,
conda_env: Optional[Union[str, Path, Dict[str, Any]]] = None,
model_name: Optional[str] = "model",
model_signature: Union[ModelSignature, str, None] = "auto",
):

"""Store all necessary information for calling mlflow.log_model in the pipeline.
Expand Down Expand Up @@ -77,6 +79,13 @@ def __init__(
model_name (Union[str, None], optional): The name of
the folder where the model will be stored in
remote mlflow. Defaults to "model".
model_signature (Union[ModelSignature, bool]): The mlflow
signature of the input dataframe common to training
and inference.
- If 'auto', it is infered automatically
- If None, no signature is used
- if a `ModelSignature` instance, passed
to the underlying dataframe
"""

super().__init__(nodes, *args, tags=tags)
Expand All @@ -85,17 +94,31 @@ def __init__(
self.conda_env = conda_env
self.model_name = model_name
self.input_name = input_name
self.model_signature = model_signature

self._check_consistency()

@property
def input_name(self) -> str:
return self._input_name

@property
def _logger(self) -> logging.Logger:
return logging.getLogger(__name__)

@property
def training(self) -> Pipeline:
return Pipeline(self.nodes)

@property
def inference(self) -> str:
return self._inference

@inference.setter
def inference(self, inference: Pipeline) -> None:
self._check_inference(inference)
self._inference = inference

@property
def input_name(self) -> str:
return self._input_name

@input_name.setter
def input_name(self, name: str) -> None:
allowed_names = self.inference.inputs()
Expand All @@ -110,17 +133,18 @@ def input_name(self, name: str) -> None:
self._input_name = name

@property
def inference(self) -> str:
return self._inference

@inference.setter
def inference(self, inference: Pipeline) -> None:
self._check_inference(inference)
self._inference = inference

@property
def training(self) -> Pipeline:
return Pipeline(self.nodes)
def model_signature(self) -> str:
return self._model_signature

@model_signature.setter
def model_signature(self, model_signature: ModelSignature) -> None:
if model_signature is not None:
if not isinstance(model_signature, ModelSignature):
if model_signature != "auto":
raise ValueError(
f"model_signature must be one of 'None', 'auto', or a 'ModelSignature' Object, got '{type(model_signature)}'"
)
self._model_signature = model_signature

def _check_inference(self, inference: Pipeline) -> None:
nb_outputs = len(inference.outputs())
Expand Down Expand Up @@ -276,15 +300,12 @@ def __or__(self, other): # pragma: no cover


class KedroMlflowPipelineMLInputsError(Exception):
"""Error raised when the inputs of KedroPipelineModel are invalid
"""
"""Error raised when the inputs of KedroPipelineModel are invalid"""


class KedroMlflowPipelineMLDatasetsError(Exception):
"""Error raised when the inputs of KedroPipelineMoel are invalid
"""
"""Error raised when the inputs of KedroPipelineMoel are invalid"""


class KedroMlflowPipelineMLOutputsError(Exception):
"""Error raised when the outputs of KedroPipelineModel are invalid
"""
"""Error raised when the outputs of KedroPipelineModel are invalid"""
10 changes: 10 additions & 0 deletions kedro_mlflow/pipeline/pipeline_ml_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from warnings import warn

from kedro.pipeline import Pipeline
from mlflow.models import ModelSignature

from kedro_mlflow.pipeline.pipeline_ml import PipelineML

Expand All @@ -13,6 +14,7 @@ def pipeline_ml_factory(
input_name: str = None,
conda_env: Optional[Union[str, Path, Dict[str, Any]]] = None,
model_name: Optional[str] = "model",
model_signature: Union[ModelSignature, str, None] = "auto",
) -> PipelineML:
"""This function is a helper to create `PipelineML`
object directly from two Kedro `Pipelines` (one of
Expand Down Expand Up @@ -47,6 +49,13 @@ def pipeline_ml_factory(
model_name (Union[str, None], optional): The name of
the folder where the model will be stored in
remote mlflow. Defaults to "model".
model_signature (Union[ModelSignature, bool]): The mlflow
signature of the input dataframe common to training
and inference.
- If 'auto', it is infered automatically
- If None, no signature is used
- if a `ModelSignature` instance, passed
to the underlying dataframe

Returns:
PipelineML: A `PipelineML` which is automatically
Expand All @@ -61,6 +70,7 @@ def pipeline_ml_factory(
input_name=input_name,
conda_env=conda_env,
model_name=model_name,
model_signature=model_signature,
)
return pipeline

Expand Down
2 changes: 1 addition & 1 deletion requirements/test_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ pytest-lazy-fixture>=0.6.0, <1.0.0
pytest-mock>=3.1.0, <4.0.0
flake8>=3.0.0, <4.0.0
black==19.10b0 # pin black version because it is not compatible with a pip range (because of non semver version number)
isort>=4.0.0, <5.0.0
isort>=5.0.0, <6.0.0
Loading