diff --git a/CHANGELOG.md b/CHANGELOG.md index 6cc05d94..e6ce5f78 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ - Documentation reference to the plugin is now dynamic when necessary (#6). - The test coverage now excludes `tests` and `setup.py` (#99). +- The `KedroPipelineModel` now unpacks the result of the `inference` pipeline and no longer returns a dictionary with the name in the `DataCatalog` but only the predicted value (#93). + ### Removed diff --git a/docs/source/05_python_objects/03_Pipelines.md b/docs/source/05_python_objects/03_Pipelines.md index de60a62c..53bf0d1a 100644 --- a/docs/source/05_python_objects/03_Pipelines.md +++ b/docs/source/05_python_objects/03_Pipelines.md @@ -24,7 +24,13 @@ def create_pipelines(**kwargs) -> Dict[str, Pipeline]: } ``` -Now each time you will run ``kedro run --pipeline=training`` (provided you registered ``MlflowPipelineHook`` in you ``run.py``), the full inference pipeline will be registered as a mlflow model (with all the outputs produced by training as artifacts : the machine learning, but also the *scaler*, *vectorizer*, *imputer*, or whatever object fitted on data you create in ``training`` and that is used in ``inference``). +Now each time you will run ``kedro run --pipeline=training`` (provided you registered ``MlflowPipelineHook`` in you ``run.py``), the full inference pipeline will be registered as a mlflow model (with all the outputs produced by training as artifacts : the machine learning model, but also the *scaler*, *vectorizer*, *imputer*, or whatever object fitted on data you create in ``training`` and that is used in ``inference``). + +Note that: +- the `inference` pipeline `input_name` can be a `MemoryDataSet` and it belongs to inference pipeline `inputs` +- Apart form `input_name`, all other `inference` pipeline `inputs` must be persisted locally on disk (i.e. it must not be `MemoryDataSet` and must have a local `filepath`) +- the `inference` pipeline `inputs` must belong to training `outputs` (vectorizer, binarizer, machine learning model...) +- the `inference` pipeline must have one and only one `output` *Note: If you want to log a ``PipelineML`` object in ``mlflow`` programatically, you can use the following code snippet:* diff --git a/kedro_mlflow/mlflow/kedro_pipeline_model.py b/kedro_mlflow/mlflow/kedro_pipeline_model.py index 4a1989cf..b0c9475f 100644 --- a/kedro_mlflow/mlflow/kedro_pipeline_model.py +++ b/kedro_mlflow/mlflow/kedro_pipeline_model.py @@ -14,6 +14,8 @@ def __init__(self, pipeline_ml: PipelineML, catalog: DataCatalog): self.pipeline_ml = pipeline_ml self.initial_catalog = pipeline_ml.extract_pipeline_catalog(catalog) self.loaded_catalog = DataCatalog() + # we have the guarantee that there is only one output in inference + self.output_name = list(pipeline_ml.inference.outputs())[0] def load_context(self, context): @@ -33,7 +35,11 @@ def load_context(self, context): kedro_artifacts_keys - mlflow_artifacts_keys ) raise ValueError( - f"Provided artifacts do not match catalog entries:\n- 'artifacts - inference.inputs()' = : {in_artifacts_but_not_inference}'\n- 'inference.inputs() - artifacts' = : {in_inference_but_not_artifacts}'" + ( + "Provided artifacts do not match catalog entries:" + f"\n - 'artifacts - inference.inputs()' = : {in_artifacts_but_not_inference}" + f"\n - 'inference.inputs() - artifacts' = : {in_inference_but_not_artifacts}" + ) ) self.loaded_catalog = deepcopy(self.initial_catalog) @@ -53,4 +59,6 @@ def predict(self, context, model_input): run_outputs = runner.run( pipeline=self.pipeline_ml.inference, catalog=self.loaded_catalog ) - return run_outputs + return run_outputs[ + self.output_name + ] # unpack the result to avoid messing the json output diff --git a/kedro_mlflow/pipeline/__init__.py b/kedro_mlflow/pipeline/__init__.py index d0640b0e..cbe59521 100644 --- a/kedro_mlflow/pipeline/__init__.py +++ b/kedro_mlflow/pipeline/__init__.py @@ -1,5 +1,6 @@ from .pipeline_ml import ( KedroMlflowPipelineMLDatasetsError, KedroMlflowPipelineMLInputsError, + KedroMlflowPipelineMLOutputsError, ) from .pipeline_ml_factory import pipeline_ml, pipeline_ml_factory diff --git a/kedro_mlflow/pipeline/pipeline_ml.py b/kedro_mlflow/pipeline/pipeline_ml.py index bb61e526..466bf57c 100644 --- a/kedro_mlflow/pipeline/pipeline_ml.py +++ b/kedro_mlflow/pipeline/pipeline_ml.py @@ -5,7 +5,12 @@ from kedro.pipeline import Pipeline from kedro.pipeline.node import Node -MSG_NOT_IMPLEMENTED = "This method is not implemented because it does not make sens for 'PipelineML'. Manipulate directly the training pipeline and recreate the 'PipelineML' with 'pipeline_ml_factory' factory" +MSG_NOT_IMPLEMENTED = ( + "This method is not implemented because it does" + "not make sense for 'PipelineML'." + "Manipulate directly the training pipeline and" + "recreate the 'PipelineML' with 'pipeline_ml_factory' factory." +) class PipelineML(Pipeline): @@ -78,7 +83,6 @@ def __init__( self.inference = inference self.conda_env = conda_env self.model_name = model_name - self.input_name = input_name @property @@ -90,6 +94,58 @@ def input_name(self, name: str) -> None: self._check_input_name(name) self._input_name = name + @property + def inference(self) -> str: + return self._inference + + @inference.setter + def inference(self, inference: Pipeline) -> None: + self._check_inference(inference) + self._inference = inference + + @property + def training(self) -> Pipeline: + return Pipeline(self.nodes) + + def _check_input_name(self, input_name: str) -> str: + allowed_names = self.inference.inputs() + pp_allowed_names = "\n - ".join(allowed_names) + if input_name not in allowed_names: + raise KedroMlflowPipelineMLInputsError( + f"input_name='{input_name}' but it must be an input of 'inference', i.e. one of: \n - {pp_allowed_names}" + ) + else: + free_inputs_set = ( + self.inference.inputs() - {input_name} - self.all_outputs() + ) + if len(free_inputs_set) > 0: + raise KedroMlflowPipelineMLInputsError( + """ + The following inputs are free for the inference pipeline: + - {inputs}. + No free input is allowed. + Please make sure that 'inference.pipeline.inputs()' are all in 'training.pipeline.all_outputs()', + except eventually 'input_name'.""".format( + inputs="\n - ".join(free_inputs_set) + ) + ) + + return None + + def _check_inference(self, inference: Pipeline) -> None: + nb_outputs = len(inference.outputs()) + outputs_txt = "\n - ".join(inference.outputs()) + if len(inference.outputs()) != 1: + raise KedroMlflowPipelineMLOutputsError( + ( + "The inference pipeline must have one" + " and only one output. You are trying" + " to set a inference pipeline with" + f" '{nb_outputs}' output(s): \n - {outputs_txt}" + " " + ) + ) + def extract_pipeline_catalog(self, catalog: DataCatalog) -> DataCatalog: sub_catalog = DataCatalog() for data_set_name in self.inference.inputs(): @@ -136,36 +192,7 @@ def extract_pipeline_artifacts(self, catalog: DataCatalog): } return artifacts - @property - def training(self): - return Pipeline(self.nodes) - - def _check_input_name(self, input_name: str) -> str: - allowed_names = self.inference.inputs() - pp_allowed_names = "\n - ".join(allowed_names) - if input_name not in allowed_names: - raise KedroMlflowPipelineMLInputsError( - f"input_name='{input_name}' but it must be an input of 'inference', i.e. one of: \n - {pp_allowed_names}" - ) - else: - free_inputs_set = ( - self.inference.inputs() - {input_name} - self.all_outputs() - ) - if len(free_inputs_set) > 0: - raise KedroMlflowPipelineMLInputsError( - """ - The following inputs are free for the inference pipeline: - - {inputs}. - No free input is allowed. - Please make sure that 'inference.pipeline.inputs()' are all in 'training.pipeline.all_outputs()', - except eventually 'input_name'.""".format( - inputs="\n - ".join(free_inputs_set) - ) - ) - - return None - - def _turn_pipeline_to_ml(self, pipeline): + def _turn_pipeline_to_ml(self, pipeline: Pipeline): return PipelineML( nodes=pipeline.nodes, inference=self.inference, input_name=self.input_name ) @@ -230,10 +257,15 @@ def __or__(self, other): # pragma: no cover class KedroMlflowPipelineMLInputsError(Exception): - """Error raised when the inputs of KedroPipelineMoel are invalid + """Error raised when the inputs of KedroPipelineModel are invalid """ class KedroMlflowPipelineMLDatasetsError(Exception): """Error raised when the inputs of KedroPipelineMoel are invalid """ + + +class KedroMlflowPipelineMLOutputsError(Exception): + """Error raised when the outputs of KedroPipelineModel are invalid + """ diff --git a/tests/mlflow/test_kedro_pipeline_model.py b/tests/mlflow/test_kedro_pipeline_model.py index 65be7c69..4e7e3d25 100644 --- a/tests/mlflow/test_kedro_pipeline_model.py +++ b/tests/mlflow/test_kedro_pipeline_model.py @@ -83,7 +83,7 @@ def test_model_packaging(tmp_path, pipeline_ml_obj): loaded_model = mlflow.pyfunc.load_model( model_uri=(Path(r"runs:/") / run_id / "model").as_posix() ) - assert loaded_model.predict(1) == {"predictions": 2} + assert loaded_model.predict(1) == 2 # should very likely add tests to see what happens when the artifacts diff --git a/tests/pipeline/test_pipeline_ml.py b/tests/pipeline/test_pipeline_ml.py index 9016b2d9..4f6479f8 100644 --- a/tests/pipeline/test_pipeline_ml.py +++ b/tests/pipeline/test_pipeline_ml.py @@ -8,6 +8,7 @@ from kedro_mlflow.pipeline import ( KedroMlflowPipelineMLDatasetsError, KedroMlflowPipelineMLInputsError, + KedroMlflowPipelineMLOutputsError, pipeline_ml, pipeline_ml_factory, ) @@ -34,6 +35,14 @@ def predict_fun(model, data): return data * model +def predict_fun_with_metric(model, data): + return data * model, "super_metric" + + +def predict_fun_return_nothing(model, data): + pass + + @pytest.fixture def pipeline_with_tag(): @@ -360,3 +369,43 @@ def test_invalid_input_name(pipeline_ml_with_tag): match="input_name='whoops_bad_name' but it must be an input of 'inference'", ): pipeline_ml_with_tag.input_name = "whoops_bad_name" + + +def test_too_many_inference_outputs(): + with pytest.raises( + KedroMlflowPipelineMLOutputsError, + match="The inference pipeline must have one and only one output", + ): + pipeline_ml_factory( + training=Pipeline([node(func=train_fun, inputs="data", outputs="model",)]), + inference=Pipeline( + [ + node( + func=predict_fun_with_metric, + inputs=["model", "data"], + outputs=["predictions", "metric"], + ) + ] + ), + input_name="data", + ) + + +def test_not_enough_inference_outputs(): + with pytest.raises( + KedroMlflowPipelineMLOutputsError, + match="The inference pipeline must have one and only one output", + ): + pipeline_ml_factory( + training=Pipeline([node(func=train_fun, inputs="data", outputs="model",)]), + inference=Pipeline( + [ + node( + func=predict_fun_return_nothing, + inputs=["model", "data"], + outputs=None, + ) + ] + ), + input_name="data", + )