Skip to content

Commit

Permalink
FIX #158 - Auto-pickle parameters of the inference pipeline of a Pipe…
Browse files Browse the repository at this point in the history
…lineML
  • Loading branch information
Galileo-Galilei committed Feb 21, 2021
1 parent 5d5b6fe commit 7864f49
Show file tree
Hide file tree
Showing 8 changed files with 295 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- A new `long_parameters_strategy` key is added in the `mlflow.yml` (under in the hook/node section). You can specify different strategies (`fail`, `truncate` or `tag`) to handle parameters over 250 characters which cause crashes for some mlflow backend. ([#69](https://github.com/Galileo-Galilei/kedro-mlflow/issues/69))
- Add an `env` parameter to `kedro mlflow init` command to specify under which `conf/` subfolder the `mlflow.yml` should be created. ([#159](https://github.com/Galileo-Galilei/kedro-mlflow/issues/159))
- The input parameters of the `inference` pipeline of a `PipelineML` object are now automatically pickle-ised and converted as artifacts. ([#158](https://github.com/Galileo-Galilei/kedro-mlflow/issues/158))

### Fixed

Expand Down
40 changes: 24 additions & 16 deletions kedro_mlflow/framework/hooks/pipeline_hook.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sys
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any, Dict, Union

import mlflow
Expand Down Expand Up @@ -139,24 +140,31 @@ def after_pipeline_run(
"""

if isinstance(pipeline, PipelineML):
pipeline_catalog = pipeline._extract_pipeline_catalog(catalog)
artifacts = pipeline.extract_pipeline_artifacts(pipeline_catalog)
with TemporaryDirectory() as tmp_dir:
# This will be removed at the end of the context manager,
# but we need to log in mlflow beforeremoving the folder
pipeline_catalog = pipeline._extract_pipeline_catalog(catalog)
artifacts = pipeline.extract_pipeline_artifacts(
pipeline_catalog, temp_folder=Path(tmp_dir)
)

if pipeline.model_signature == "auto":
input_data = pipeline_catalog.load(pipeline.input_name)
model_signature = infer_signature(model_input=input_data)
else:
model_signature = pipeline.model_signature
if pipeline.model_signature == "auto":
input_data = pipeline_catalog.load(pipeline.input_name)
model_signature = infer_signature(model_input=input_data)
else:
model_signature = pipeline.model_signature

mlflow.pyfunc.log_model(
artifact_path=pipeline.model_name,
python_model=KedroPipelineModel(
pipeline_ml=pipeline, catalog=pipeline_catalog, **pipeline.kwargs
),
artifacts=artifacts,
conda_env=_format_conda_env(pipeline.conda_env),
signature=model_signature,
)
mlflow.pyfunc.log_model(
artifact_path=pipeline.model_name,
python_model=KedroPipelineModel(
pipeline_ml=pipeline,
catalog=pipeline_catalog,
**pipeline.kwargs,
),
artifacts=artifacts,
conda_env=_format_conda_env(pipeline.conda_env),
signature=model_signature,
)
# Close the mlflow active run at the end of the pipeline to avoid interactions with further runs
mlflow.end_run()

Expand Down
49 changes: 37 additions & 12 deletions kedro_mlflow/pipeline/pipeline_ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, Optional, Union

from kedro.extras.datasets.pickle import PickleDataSet
from kedro.io import DataCatalog, MemoryDataSet
from kedro.pipeline import Pipeline
from kedro.pipeline.node import Node
Expand Down Expand Up @@ -171,6 +172,7 @@ def _check_inference(self, inference: Pipeline) -> None:
def _extract_pipeline_catalog(self, catalog: DataCatalog) -> DataCatalog:

# check that the pipeline is consistent in case its attributes have been
# modified manually
self._check_consistency()

sub_catalog = DataCatalog()
Expand All @@ -183,7 +185,9 @@ def _extract_pipeline_catalog(self, catalog: DataCatalog) -> DataCatalog:
else:
try:
data_set = catalog._data_sets[data_set_name]
if isinstance(data_set, MemoryDataSet):
if isinstance(
data_set, MemoryDataSet
) and not data_set_name.startswith("params:"):
raise KedroMlflowPipelineMLDatasetsError(
"""
The datasets of the training pipeline must be persisted locally
Expand All @@ -210,34 +214,55 @@ def _extract_pipeline_catalog(self, catalog: DataCatalog) -> DataCatalog:

return sub_catalog

def extract_pipeline_artifacts(self, catalog: DataCatalog):
def extract_pipeline_artifacts(self, catalog: DataCatalog, temp_folder: Path):
pipeline_catalog = self._extract_pipeline_catalog(catalog)
artifacts = {
name: Path(dataset._filepath.as_posix())
.resolve()
.as_uri() # weird bug when directly converting PurePosixPath to windows: it is considered as relative
for name, dataset in pipeline_catalog._data_sets.items()
if name != self.input_name
}

artifacts = {}
for name, dataset in pipeline_catalog._data_sets.items():
if name != self.input_name:
if name.startswith("params:"):
# we need to persist it locally for mlflow access
absolute_param_path = temp_folder / f"params_{name[7:]}.pkl"
persisted_dataset = PickleDataSet(
filepath=absolute_param_path.as_posix()
)
persisted_dataset.save(dataset.load())
artifact_path = absolute_param_path.as_uri()
else:
# In this second case, we know it cannot be a MemoryDataSet
# weird bug when directly converting PurePosixPath to windows: it is considered as relative
artifact_path = (
Path(dataset._filepath.as_posix()).resolve().as_uri()
)

artifacts[name] = artifact_path

return artifacts

def _check_consistency(self) -> None:

inference_parameters = {
input for input in self.inference.inputs() if input.startswith("params:")
}

free_inputs_set = (
self.inference.inputs()
- {self.input_name}
- self.all_outputs()
- self.inputs()
- inference_parameters # it is allowed to pass parameters: they will be automatically persisted by the hook
)

if len(free_inputs_set) > 0:
input_set_txt = "\n - ".join(free_inputs_set)
raise KedroMlflowPipelineMLInputsError(
(
"The following inputs are free for the inference pipeline:"
"The following inputs are free for the inference pipeline:\n"
f" - {input_set_txt}."
" No free input is allowed."
" \nNo free input is allowed."
" Please make sure that 'inference.inputs()' are all"
" in 'training.all_outputs() + training.inputs()'"
"except 'input_name'."
"except 'input_name' and parameters which starts with 'params:'."
)
)

Expand Down
123 changes: 122 additions & 1 deletion tests/framework/hooks/test_pipeline_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,60 @@ def dummy_catalog(tmp_path):
return dummy_catalog


@pytest.fixture
def pipeline_ml_with_parameters():
def remove_stopwords(data, stopwords):
return data

def train_fun_hyperparam(data, hyperparam):
return 2

def predict_fun(model, data):
return data * model

def convert_probs_to_pred(data, threshold):
return (data > threshold) * 1

full_pipeline = Pipeline(
[
# almost the same that previsously but stopwords are parameters
# this is a shared parameter between inference and training22
node(
func=remove_stopwords,
inputs=dict(data="data", stopwords="params:stopwords"),
outputs="cleaned_data",
tags=["training", "inference"],
),
# parameters in training pipeline, should not be persisted
node(
func=train_fun_hyperparam,
inputs=["cleaned_data", "params:penalty"],
outputs="model",
tags=["training"],
),
node(
func=predict_fun,
inputs=["model", "cleaned_data"],
outputs="predicted_probs",
tags=["inference"],
),
# this time, there is a parameter only for the inference pipeline
node(
func=convert_probs_to_pred,
inputs=["predicted_probs", "params:threshold"],
outputs="predictions",
tags=["inference"],
),
]
)
pipeline_ml_with_parameters = pipeline_ml_factory(
training=full_pipeline.only_nodes_with_tags("training"),
inference=full_pipeline.only_nodes_with_tags("inference"),
input_name="data",
)
return pipeline_ml_with_parameters


@pytest.fixture
def dummy_signature(dummy_catalog, dummy_pipeline_ml):
input_data = dummy_catalog.load(dummy_pipeline_ml.input_name)
Expand Down Expand Up @@ -391,7 +445,6 @@ def test_mlflow_pipeline_hook_metrics_with_run_id(
monkeypatch,
tmp_path,
config_dir,
env_from_dict,
dummy_pipeline_ml,
dummy_run_params,
dummy_mlflow_conf,
Expand Down Expand Up @@ -464,6 +517,74 @@ def test_mlflow_pipeline_hook_metrics_with_run_id(
assert run_data.metrics["foo.metric_key"] == 1.1


def test_mlflow_pipeline_hook_save_pipeline_ml_with_parameters(
mocker,
monkeypatch,
config_dir, # a fixture to be in a kedro project
dummy_mlflow_conf, # a fixture to setup mlflow configuration
tmp_path,
pipeline_ml_with_parameters,
dummy_run_params,
):
# config_with_base_mlflow_conf is a conftest fixture
monkeypatch.chdir(tmp_path)

context = load_context(tmp_path)
mlflow_conf = get_mlflow_config(context)
mlflow.set_tracking_uri(mlflow_conf.mlflow_tracking_uri)

catalog_with_parameters = DataCatalog(
{
"data": MemoryDataSet(pd.DataFrame(data=[1], columns=["a"])),
"cleaned_data": MemoryDataSet(),
"params:stopwords": MemoryDataSet(["Hello", "Hi"]),
"params:penalty": MemoryDataSet(0.1),
"model": PickleDataSet((tmp_path / "model.csv").as_posix()),
"params:threshold": MemoryDataSet(0.5),
}
)

pipeline_hook = MlflowPipelineHook()

runner = SequentialRunner()
pipeline_hook.after_catalog_created(
catalog=catalog_with_parameters,
# `after_catalog_created` is not using any of arguments bellow,
# so we are setting them to empty values.
conf_catalog={},
conf_creds={},
feed_dict={},
save_version="",
load_versions="",
run_id=dummy_run_params["run_id"],
)
pipeline_hook.before_pipeline_run(
run_params=dummy_run_params,
pipeline=pipeline_ml_with_parameters,
catalog=catalog_with_parameters,
)
runner.run(pipeline_ml_with_parameters, catalog_with_parameters)

current_run_id = mlflow.active_run().info.run_id

# This is what we want to test: model must be saved and the parameters automatically persisted on disk
pipeline_hook.after_pipeline_run(
run_params=dummy_run_params,
pipeline=pipeline_ml_with_parameters,
catalog=catalog_with_parameters,
)

# the 2 parameters which are inputs of inference pipeline
# must have been persisted and logged inside the model's artifacts
model = mlflow.pyfunc.load_model(f"runs:/{current_run_id}/model")
assert set(
model.metadata.to_dict()["flavors"]["python_function"]["artifacts"].keys()
) == {"model", "params:stopwords", "params:threshold"}

# the model should be loadable and predict() should work (this tests KedroPipelineModel)
assert model.predict(pd.DataFrame(data=[1], columns=["a"])).values[0][0] == 1


@pytest.mark.parametrize(
"model_signature,expected_signature",
(
Expand Down
17 changes: 15 additions & 2 deletions tests/io/models/test_mlflow_model_logger_dataset.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from tempfile import TemporaryDirectory

import mlflow
import pandas as pd
import pytest
Expand Down Expand Up @@ -29,6 +31,12 @@ def tracking_uri(tmp_path):
return tracking_uri


@pytest.fixture
def tmp_folder():
tmp_folder = TemporaryDirectory()
return tmp_folder


@pytest.fixture
def mlflow_client(tracking_uri):
mlflow_client = MlflowClient(tracking_uri=tracking_uri)
Expand Down Expand Up @@ -275,10 +283,15 @@ def test_load_without_run_id_nor_active_run():


def test_pyfunc_flavor_python_model_save_and_load(
tmp_path, tracking_uri, pipeline_ml_obj, dummy_catalog, kedro_pipeline_model
tmp_path,
tmp_folder,
tracking_uri,
pipeline_ml_obj,
dummy_catalog,
kedro_pipeline_model,
):

artifacts = pipeline_ml_obj.extract_pipeline_artifacts(dummy_catalog)
artifacts = pipeline_ml_obj.extract_pipeline_artifacts(dummy_catalog, tmp_folder)

model_config = {
"name": "kedro_pipeline_model",
Expand Down
12 changes: 10 additions & 2 deletions tests/io/models/test_mlflow_model_saver_dataset.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from tempfile import TemporaryDirectory

import mlflow
import pandas as pd
import pytest
Expand All @@ -17,6 +19,12 @@ def linreg_model():
return linreg_model


@pytest.fixture
def tmp_folder():
tmp_folder = TemporaryDirectory()
return tmp_folder


@pytest.fixture
def linreg_path(tmp_path):
linreg_path = tmp_path / "data" / "06_models" / "linreg"
Expand Down Expand Up @@ -134,10 +142,10 @@ def test_save_load_local(linreg_path, linreg_model, versioned):


def test_pyfunc_flavor_python_model_save_and_load(
tmp_path, pipeline_ml_obj, dummy_catalog, kedro_pipeline_model
tmp_path, tmp_folder, pipeline_ml_obj, dummy_catalog, kedro_pipeline_model
):

artifacts = pipeline_ml_obj.extract_pipeline_artifacts(dummy_catalog)
artifacts = pipeline_ml_obj.extract_pipeline_artifacts(dummy_catalog, tmp_folder)

model_config = {
"name": "kedro_pipeline_model",
Expand Down
Loading

0 comments on commit 7864f49

Please sign in to comment.