Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/pipeline ml inputs #101

Merged
merged 2 commits into from
Oct 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@

- `kedro-mlflow` now supports kedro 0.16.5 (#62)
- `kedro-mlflow` hooks can now be declared in `.kedro.yml` or `pyproject.toml` by adding `kedro_mlflow.framework.hooks.mlflow_pipeline_hook` and `kedro_mlflow.framework.hooks.mlflow_node_hook` into the hooks entry. _Only for kedro>=0.16.5_
- `pipeline_ml_factory` now accepts that `inference` pipeline `inputs` may be in `training` pipeline `inputs` (#71)

### Fixed

- `get_mlflow_config` now uses the Kedro `ProjectContext` `ConfigLoader` to get configs (#66). This indirectly solves the following issues:
- `get_mlflow_config` now works in interactive mode if `load_context` is called with a path different from the working directory (#30)
- kedro_mlflow now works fine with kedro jupyter notebook independently of the working directory (#64)
- You can use global variables in `mlflow.yml` which is now properly parsed if you use a `TemplatedConfigLoader` (#72)
- `get_mlflow_config` now works in interactive mode if `load_context` is called with a path different from the working directory (#30)
- `kedro-mlflow` now works fine with `kedro jupyter notebook` command independently of the working directory (#64)
- You can use global variables in `mlflow.yml` which is now properly parsed if you use a `TemplatedConfigLoader` (#72)
- `mlflow init` is now getting conf path from context.CONF_ROOT instead of hardcoded conf folder. This makes the package robust to Kedro changes.
- `MlflowMetricsDataset` now saves in the specified `run_id` instead of the current one when the prefix is not specified (#102)

Expand All @@ -21,6 +22,7 @@
- Documentation reference to the plugin is now dynamic when necessary (#6).
- The test coverage now excludes `tests` and `setup.py` (#99).
- The `KedroPipelineModel` now unpacks the result of the `inference` pipeline and no longer returns a dictionary with the name in the `DataCatalog` but only the predicted value (#93).
- The `PipelineML.extract_pipeline_catalog` is renamed `PipelineML._extract_pipeline_catalog` to indicate it is a private method and is not intended to be used directly by end users (#100)

### Removed

Expand Down
22 changes: 11 additions & 11 deletions docs/source/05_python_objects/03_Pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ from kedro_mlflow.mlflow import KedroPipelineModel
catalog = load_context(".").io

# artifacts are all the inputs of the inference pipelines that are persisted in the catalog
pipeline_catalog = pipeline_training.extract_pipeline_catalog(catalog)
artifacts = {name: Path(dataset._filepath).resolve().as_uri()
for name, dataset in pipeline_catalog._data_sets.items()
if name != pipeline_training.model_input_name}


mlflow.pyfunc.log_model(artifact_path="model",
python_model=KedroPipelineModel(pipeline_ml=pipeline_training,
catalog=pipeline_catalog),
artifacts=artifacts,
conda_env={"python": "3.7.0"})
artifacts = pipeline_training.extract_pipeline_artifacts(catalog)

mlflow.pyfunc.log_model(
artifact_path="model",
python_model=KedroPipelineModel(
pipeline_ml=pipeline_training,
catalog=catalog
),
artifacts=artifacts,
conda_env={"python": "3.7.0"}
)
```
2 changes: 1 addition & 1 deletion kedro_mlflow/framework/hooks/pipeline_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def after_pipeline_run(
"""

if isinstance(pipeline, PipelineML):
pipeline_catalog = pipeline.extract_pipeline_catalog(catalog)
pipeline_catalog = pipeline._extract_pipeline_catalog(catalog)
artifacts = pipeline.extract_pipeline_artifacts(pipeline_catalog)
mlflow.pyfunc.log_model(
artifact_path=pipeline.model_name,
Expand Down
2 changes: 1 addition & 1 deletion kedro_mlflow/mlflow/kedro_pipeline_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class KedroPipelineModel(PythonModel):
def __init__(self, pipeline_ml: PipelineML, catalog: DataCatalog):

self.pipeline_ml = pipeline_ml
self.initial_catalog = pipeline_ml.extract_pipeline_catalog(catalog)
self.initial_catalog = pipeline_ml._extract_pipeline_catalog(catalog)
self.loaded_catalog = DataCatalog()
# we have the guarantee that there is only one output in inference
self.output_name = list(pipeline_ml.inference.outputs())[0]
Expand Down
75 changes: 47 additions & 28 deletions kedro_mlflow/pipeline/pipeline_ml.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, Optional, Union

Expand Down Expand Up @@ -85,13 +86,27 @@ def __init__(
self.model_name = model_name
self.input_name = input_name

self._check_consistency()

@property
def input_name(self) -> str:
return self._input_name

@property
def _logger(self) -> logging.Logger:
return logging.getLogger(__name__)

@input_name.setter
def input_name(self, name: str) -> None:
self._check_input_name(name)
allowed_names = self.inference.inputs()
pp_allowed_names = "\n - ".join(allowed_names)
if name not in allowed_names:
raise KedroMlflowPipelineMLInputsError(
(
f"input_name='{name}' but it must be an input of 'inference'"
f", i.e. one of: \n - {pp_allowed_names}"
)
)
self._input_name = name

@property
Expand All @@ -107,31 +122,6 @@ def inference(self, inference: Pipeline) -> None:
def training(self) -> Pipeline:
return Pipeline(self.nodes)

def _check_input_name(self, input_name: str) -> str:
allowed_names = self.inference.inputs()
pp_allowed_names = "\n - ".join(allowed_names)
if input_name not in allowed_names:
raise KedroMlflowPipelineMLInputsError(
f"input_name='{input_name}' but it must be an input of 'inference', i.e. one of: \n - {pp_allowed_names}"
)
else:
free_inputs_set = (
self.inference.inputs() - {input_name} - self.all_outputs()
)
if len(free_inputs_set) > 0:
raise KedroMlflowPipelineMLInputsError(
"""
The following inputs are free for the inference pipeline:
- {inputs}.
No free input is allowed.
Please make sure that 'inference.pipeline.inputs()' are all in 'training.pipeline.all_outputs()',
except eventually 'input_name'.""".format(
inputs="\n - ".join(free_inputs_set)
)
)

return None

def _check_inference(self, inference: Pipeline) -> None:
nb_outputs = len(inference.outputs())
outputs_txt = "\n - ".join(inference.outputs())
Expand All @@ -146,7 +136,11 @@ def _check_inference(self, inference: Pipeline) -> None:
)
)

def extract_pipeline_catalog(self, catalog: DataCatalog) -> DataCatalog:
def _extract_pipeline_catalog(self, catalog: DataCatalog) -> DataCatalog:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we allow the use of parameters as inference / training inputs?
kedro create params:xxx inputs as a MemoryDataSet. The following PipelineML code exclude them from our inference pipelines :

if isinstance(data_set, MemoryDataSet):
     raise KedroMlflowPipelineMLDatasetsError(...)

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hesitated to deal with parameters automatically, but:

  • it is quite complicated: there are a some edge case situation to deal with, we have to decide how / when / where to persist them
  • it is error prone: I don't want to persist parameters that are not explictly intended to.

On the other hand, it is very easy for a user to enforce a parameter just by persisting either as an input or output of a "training" node, e.g. by creating a YAMLDataSet, so I think we can just let it to the user to be sure that it voluntary.


# check that the pipeline is consistent in case its attributes have been
self._check_consistency()

sub_catalog = DataCatalog()
for data_set_name in self.inference.inputs():
if data_set_name == self.input_name:
Expand All @@ -168,6 +162,9 @@ def extract_pipeline_catalog(self, catalog: DataCatalog) -> DataCatalog:
data_set_name=data_set_name
)
)
self._logger.info(
f"The data_set '{data_set_name}' is added to the PipelineML catalog."
)
sub_catalog.add(data_set_name=data_set_name, data_set=data_set)
except KeyError:
raise KedroMlflowPipelineMLDatasetsError(
Expand All @@ -182,7 +179,7 @@ def extract_pipeline_catalog(self, catalog: DataCatalog) -> DataCatalog:
return sub_catalog

def extract_pipeline_artifacts(self, catalog: DataCatalog):
pipeline_catalog = self.extract_pipeline_catalog(catalog)
pipeline_catalog = self._extract_pipeline_catalog(catalog)
artifacts = {
name: Path(dataset._filepath.as_posix())
.resolve()
Expand All @@ -192,6 +189,28 @@ def extract_pipeline_artifacts(self, catalog: DataCatalog):
}
return artifacts

def _check_consistency(self) -> None:
free_inputs_set = (
self.inference.inputs()
- {self.input_name}
- self.all_outputs()
- self.inputs()
)
if len(free_inputs_set) > 0:
input_set_txt = "\n - ".join(free_inputs_set)
raise KedroMlflowPipelineMLInputsError(
(
"The following inputs are free for the inference pipeline:"
f" - {input_set_txt}."
" No free input is allowed."
" Please make sure that 'inference.inputs()' are all"
" in 'training.all_outputs() + training.inputs()'"
"except 'input_name'."
)
)

return None

def _turn_pipeline_to_ml(self, pipeline: Pipeline):
return PipelineML(
nodes=pipeline.nodes, inference=self.inference, input_name=self.input_name
Expand Down
64 changes: 59 additions & 5 deletions tests/pipeline/test_pipeline_ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ def predict_fun_return_nothing(model, data):
pass


def remove_stopwords(data, stopwords):
return data


@pytest.fixture
def pipeline_with_tag():

Expand Down Expand Up @@ -125,12 +129,44 @@ def pipeline_ml_with_intermediary_artifacts():
),
]
)
pipeline_ml_with_tag = pipeline_ml_factory(
pipeline_ml_with_intermediary_artifacts = pipeline_ml_factory(
training=full_pipeline.only_nodes_with_tags("training"),
inference=full_pipeline.only_nodes_with_tags("inference"),
input_name="data",
)
return pipeline_ml_with_tag
return pipeline_ml_with_intermediary_artifacts


@pytest.fixture
def pipeline_ml_with_inputs_artifacts():
full_pipeline = Pipeline(
[
node(
func=remove_stopwords,
inputs=dict(data="data", stopwords="stopwords_from_nltk"),
outputs="cleaned_data",
tags=["training", "inference"],
),
node(
func=train_fun,
inputs="cleaned_data",
outputs="model",
tags=["training"],
),
node(
func=predict_fun,
inputs=["model", "cleaned_data"],
outputs="predictions",
tags=["inference"],
),
]
)
pipeline_ml_with_inputs_artifacts = pipeline_ml_factory(
training=full_pipeline.only_nodes_with_tags("training"),
inference=full_pipeline.only_nodes_with_tags("inference"),
input_name="data",
)
return pipeline_ml_with_inputs_artifacts


@pytest.fixture
Expand Down Expand Up @@ -175,6 +211,19 @@ def catalog_with_encoder():
return catalog_with_encoder


@pytest.fixture
def catalog_with_stopwords():
catalog_with_stopwords = DataCatalog(
{
"data": MemoryDataSet(),
"cleaned_data": MemoryDataSet(),
"stopwords_from_nltk": CSVDataSet("fake/path/to/stopwords.csv"),
"model": CSVDataSet("fake/path/to/model.csv"),
}
)
return catalog_with_stopwords


@pytest.mark.parametrize(
"tags,from_nodes,to_nodes,node_names,from_inputs",
[
Expand Down Expand Up @@ -296,10 +345,15 @@ def test_filtering_generate_invalid_pipeline_ml(
pytest.lazy_fixture("catalog_with_encoder"),
{"model", "data", "encoder"},
),
(
pytest.lazy_fixture("pipeline_ml_with_inputs_artifacts"),
pytest.lazy_fixture("catalog_with_stopwords"),
{"model", "data", "stopwords_from_nltk"},
),
],
)
def test_catalog_extraction(pipeline_ml_obj, catalog, result):
filtered_catalog = pipeline_ml_obj.extract_pipeline_catalog(catalog)
filtered_catalog = pipeline_ml_obj._extract_pipeline_catalog(catalog)
assert set(filtered_catalog.list()) == result


Expand All @@ -309,7 +363,7 @@ def test_catalog_extraction_missing_inference_input(pipeline_ml_with_tag):
KedroMlflowPipelineMLDatasetsError,
match="since it is an input for inference pipeline",
):
pipeline_ml_with_tag.extract_pipeline_catalog(catalog)
pipeline_ml_with_tag._extract_pipeline_catalog(catalog)


def test_catalog_extraction_unpersisted_inference_input(pipeline_ml_with_tag):
Expand All @@ -320,7 +374,7 @@ def test_catalog_extraction_unpersisted_inference_input(pipeline_ml_with_tag):
KedroMlflowPipelineMLDatasetsError,
match="The datasets of the training pipeline must be persisted locally",
):
pipeline_ml_with_tag.extract_pipeline_catalog(catalog)
pipeline_ml_with_tag._extract_pipeline_catalog(catalog)


def test_too_many_free_inputs():
Expand Down