Skip to content

Commit

Permalink
FIX #93 - Enforce inference pipeline output unpacking
Browse files Browse the repository at this point in the history
  • Loading branch information
Galileo-Galilei committed Oct 25, 2020
1 parent 7398526 commit cd16633
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 37 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

- Documentation reference to the plugin is now dynamic when necessary (#6).
- The test coverage now excludes `tests` and `setup.py` (#99).
- The `KedroPipelineModel` now unpacks the result of the `inference` pipeline and no longer returns a dictionary with the name in the `DataCatalog` but only the predicted value (#93).


### Removed

Expand Down
8 changes: 7 additions & 1 deletion docs/source/05_python_objects/03_Pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@ def create_pipelines(**kwargs) -> Dict[str, Pipeline]:
}

```
Now each time you will run ``kedro run --pipeline=training`` (provided you registered ``MlflowPipelineHook`` in you ``run.py``), the full inference pipeline will be registered as a mlflow model (with all the outputs produced by training as artifacts : the machine learning, but also the *scaler*, *vectorizer*, *imputer*, or whatever object fitted on data you create in ``training`` and that is used in ``inference``).
Now each time you will run ``kedro run --pipeline=training`` (provided you registered ``MlflowPipelineHook`` in you ``run.py``), the full inference pipeline will be registered as a mlflow model (with all the outputs produced by training as artifacts : the machine learning model, but also the *scaler*, *vectorizer*, *imputer*, or whatever object fitted on data you create in ``training`` and that is used in ``inference``).

Note that:
- the `inference` pipeline `input_name` can be a `MemoryDataSet` and it belongs to inference pipeline `inputs`
- Apart form `input_name`, all other `inference` pipeline `inputs` must be persisted locally on disk (i.e. it must not be `MemoryDataSet` and must have a local `filepath`)
- the `inference` pipeline `inputs` must belong to training `outputs` (vectorizer, binarizer, machine learning model...)
- the `inference` pipeline must have one and only one `output`

*Note: If you want to log a ``PipelineML`` object in ``mlflow`` programatically, you can use the following code snippet:*

Expand Down
12 changes: 10 additions & 2 deletions kedro_mlflow/mlflow/kedro_pipeline_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ def __init__(self, pipeline_ml: PipelineML, catalog: DataCatalog):
self.pipeline_ml = pipeline_ml
self.initial_catalog = pipeline_ml.extract_pipeline_catalog(catalog)
self.loaded_catalog = DataCatalog()
# we have the guarantee that there is only one output in inference
self.output_name = list(pipeline_ml.inference.outputs())[0]

def load_context(self, context):

Expand All @@ -33,7 +35,11 @@ def load_context(self, context):
kedro_artifacts_keys - mlflow_artifacts_keys
)
raise ValueError(
f"Provided artifacts do not match catalog entries:\n- 'artifacts - inference.inputs()' = : {in_artifacts_but_not_inference}'\n- 'inference.inputs() - artifacts' = : {in_inference_but_not_artifacts}'"
(
"Provided artifacts do not match catalog entries:"
f"\n - 'artifacts - inference.inputs()' = : {in_artifacts_but_not_inference}"
f"\n - 'inference.inputs() - artifacts' = : {in_inference_but_not_artifacts}"
)
)

self.loaded_catalog = deepcopy(self.initial_catalog)
Expand All @@ -53,4 +59,6 @@ def predict(self, context, model_input):
run_outputs = runner.run(
pipeline=self.pipeline_ml.inference, catalog=self.loaded_catalog
)
return run_outputs
return run_outputs[
self.output_name
] # unpack the result to avoid messing the json output
1 change: 1 addition & 0 deletions kedro_mlflow/pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .pipeline_ml import (
KedroMlflowPipelineMLDatasetsError,
KedroMlflowPipelineMLInputsError,
KedroMlflowPipelineMLOutputsError,
)
from .pipeline_ml_factory import pipeline_ml, pipeline_ml_factory
98 changes: 65 additions & 33 deletions kedro_mlflow/pipeline/pipeline_ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
from kedro.pipeline import Pipeline
from kedro.pipeline.node import Node

MSG_NOT_IMPLEMENTED = "This method is not implemented because it does not make sens for 'PipelineML'. Manipulate directly the training pipeline and recreate the 'PipelineML' with 'pipeline_ml_factory' factory"
MSG_NOT_IMPLEMENTED = (
"This method is not implemented because it does"
"not make sense for 'PipelineML'."
"Manipulate directly the training pipeline and"
"recreate the 'PipelineML' with 'pipeline_ml_factory' factory."
)


class PipelineML(Pipeline):
Expand Down Expand Up @@ -78,7 +83,6 @@ def __init__(
self.inference = inference
self.conda_env = conda_env
self.model_name = model_name

self.input_name = input_name

@property
Expand All @@ -90,6 +94,58 @@ def input_name(self, name: str) -> None:
self._check_input_name(name)
self._input_name = name

@property
def inference(self) -> str:
return self._inference

@inference.setter
def inference(self, inference: Pipeline) -> None:
self._check_inference(inference)
self._inference = inference

@property
def training(self) -> Pipeline:
return Pipeline(self.nodes)

def _check_input_name(self, input_name: str) -> str:
allowed_names = self.inference.inputs()
pp_allowed_names = "\n - ".join(allowed_names)
if input_name not in allowed_names:
raise KedroMlflowPipelineMLInputsError(
f"input_name='{input_name}' but it must be an input of 'inference', i.e. one of: \n - {pp_allowed_names}"
)
else:
free_inputs_set = (
self.inference.inputs() - {input_name} - self.all_outputs()
)
if len(free_inputs_set) > 0:
raise KedroMlflowPipelineMLInputsError(
"""
The following inputs are free for the inference pipeline:
- {inputs}.
No free input is allowed.
Please make sure that 'inference.pipeline.inputs()' are all in 'training.pipeline.all_outputs()',
except eventually 'input_name'.""".format(
inputs="\n - ".join(free_inputs_set)
)
)

return None

def _check_inference(self, inference: Pipeline) -> None:
nb_outputs = len(inference.outputs())
outputs_txt = "\n - ".join(inference.outputs())
if len(inference.outputs()) != 1:
raise KedroMlflowPipelineMLOutputsError(
(
"The inference pipeline must have one"
" and only one output. You are trying"
" to set a inference pipeline with"
f" '{nb_outputs}' output(s): \n - {outputs_txt}"
" "
)
)

def extract_pipeline_catalog(self, catalog: DataCatalog) -> DataCatalog:
sub_catalog = DataCatalog()
for data_set_name in self.inference.inputs():
Expand Down Expand Up @@ -136,36 +192,7 @@ def extract_pipeline_artifacts(self, catalog: DataCatalog):
}
return artifacts

@property
def training(self):
return Pipeline(self.nodes)

def _check_input_name(self, input_name: str) -> str:
allowed_names = self.inference.inputs()
pp_allowed_names = "\n - ".join(allowed_names)
if input_name not in allowed_names:
raise KedroMlflowPipelineMLInputsError(
f"input_name='{input_name}' but it must be an input of 'inference', i.e. one of: \n - {pp_allowed_names}"
)
else:
free_inputs_set = (
self.inference.inputs() - {input_name} - self.all_outputs()
)
if len(free_inputs_set) > 0:
raise KedroMlflowPipelineMLInputsError(
"""
The following inputs are free for the inference pipeline:
- {inputs}.
No free input is allowed.
Please make sure that 'inference.pipeline.inputs()' are all in 'training.pipeline.all_outputs()',
except eventually 'input_name'.""".format(
inputs="\n - ".join(free_inputs_set)
)
)

return None

def _turn_pipeline_to_ml(self, pipeline):
def _turn_pipeline_to_ml(self, pipeline: Pipeline):
return PipelineML(
nodes=pipeline.nodes, inference=self.inference, input_name=self.input_name
)
Expand Down Expand Up @@ -230,10 +257,15 @@ def __or__(self, other): # pragma: no cover


class KedroMlflowPipelineMLInputsError(Exception):
"""Error raised when the inputs of KedroPipelineMoel are invalid
"""Error raised when the inputs of KedroPipelineModel are invalid
"""


class KedroMlflowPipelineMLDatasetsError(Exception):
"""Error raised when the inputs of KedroPipelineMoel are invalid
"""


class KedroMlflowPipelineMLOutputsError(Exception):
"""Error raised when the outputs of KedroPipelineModel are invalid
"""
2 changes: 1 addition & 1 deletion tests/mlflow/test_kedro_pipeline_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def test_model_packaging(tmp_path, pipeline_ml_obj):
loaded_model = mlflow.pyfunc.load_model(
model_uri=(Path(r"runs:/") / run_id / "model").as_posix()
)
assert loaded_model.predict(1) == {"predictions": 2}
assert loaded_model.predict(1) == 2


# should very likely add tests to see what happens when the artifacts
Expand Down
49 changes: 49 additions & 0 deletions tests/pipeline/test_pipeline_ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from kedro_mlflow.pipeline import (
KedroMlflowPipelineMLDatasetsError,
KedroMlflowPipelineMLInputsError,
KedroMlflowPipelineMLOutputsError,
pipeline_ml,
pipeline_ml_factory,
)
Expand All @@ -34,6 +35,14 @@ def predict_fun(model, data):
return data * model


def predict_fun_with_metric(model, data):
return data * model, "super_metric"


def predict_fun_return_nothing(model, data):
pass


@pytest.fixture
def pipeline_with_tag():

Expand Down Expand Up @@ -360,3 +369,43 @@ def test_invalid_input_name(pipeline_ml_with_tag):
match="input_name='whoops_bad_name' but it must be an input of 'inference'",
):
pipeline_ml_with_tag.input_name = "whoops_bad_name"


def test_too_many_inference_outputs():
with pytest.raises(
KedroMlflowPipelineMLOutputsError,
match="The inference pipeline must have one and only one output",
):
pipeline_ml_factory(
training=Pipeline([node(func=train_fun, inputs="data", outputs="model",)]),
inference=Pipeline(
[
node(
func=predict_fun_with_metric,
inputs=["model", "data"],
outputs=["predictions", "metric"],
)
]
),
input_name="data",
)


def test_not_enough_inference_outputs():
with pytest.raises(
KedroMlflowPipelineMLOutputsError,
match="The inference pipeline must have one and only one output",
):
pipeline_ml_factory(
training=Pipeline([node(func=train_fun, inputs="data", outputs="model",)]),
inference=Pipeline(
[
node(
func=predict_fun_return_nothing,
inputs=["model", "data"],
outputs=None,
)
]
),
input_name="data",
)

0 comments on commit cd16633

Please sign in to comment.