diff --git a/CHANGELOG.md b/CHANGELOG.md index 17395752..fa34bf93 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,29 +4,30 @@ ### Added -- `kedro-mlflow` now supports kedro 0.16.5 (#62) -- `kedro-mlflow` hooks can now be declared in `.kedro.yml` or `pyproject.toml` by adding `kedro_mlflow.framework.hooks.mlflow_pipeline_hook` and `kedro_mlflow.framework.hooks.mlflow_node_hook` into the hooks entry. _Only for kedro>=0.16.5_ -- `pipeline_ml_factory` now accepts that `inference` pipeline `inputs` may be in `training` pipeline `inputs` (#71) +- `kedro-mlflow` now supports `kedro>=0.16.5` [#62](https://github.com/Galileo-Galilei/kedro-mlflow/issues/62) +- `kedro-mlflow` hooks can now be declared in `.kedro.yml` or `pyproject.toml` by adding `kedro_mlflow.framework.hooks.mlflow_pipeline_hook` and `kedro_mlflow.framework.hooks.mlflow_node_hook` into the hooks entry. _Only for kedro>=0.16.5_ [#96](https://github.com/Galileo-Galilei/kedro-mlflow/issues/96) +- `pipeline_ml_factory` now accepts that `inference` pipeline `inputs` may be in `training` pipeline `inputs` [#71](https://github.com/Galileo-Galilei/kedro-mlflow/issues/71) +- `pipeline_ml_factory` now infer automatically the schema of the input dataset to validate data automatically at inference time. The output schema can be declared manually in `model_signature` argument [#70](https://github.com/Galileo-Galilei/kedro-mlflow/issues/70) ### Fixed -- `get_mlflow_config` now uses the Kedro `ProjectContext` `ConfigLoader` to get configs (#66). This indirectly solves the following issues: - - `get_mlflow_config` now works in interactive mode if `load_context` is called with a path different from the working directory (#30) - - `kedro-mlflow` now works fine with `kedro jupyter notebook` command independently of the working directory (#64) - - You can use global variables in `mlflow.yml` which is now properly parsed if you use a `TemplatedConfigLoader` (#72) +- `get_mlflow_config` now uses the Kedro `ProjectContext` `ConfigLoader` to get configs [#66](https://github.com/Galileo-Galilei/kedro-mlflow/issues/66). This indirectly solves the following issues: + - `get_mlflow_config` now works in interactive mode if `load_context` is called with a path different from the working directory [#30](https://github.com/Galileo-Galilei/kedro-mlflow/issues/30) + - kedro_mlflow now works fine with kedro jupyter notebook independently of the working directory [#64](https://github.com/Galileo-Galilei/kedro-mlflow/issues/64) + - You can use global variables in `mlflow.yml` which is now properly parsed if you use a `TemplatedConfigLoader` [#72](https://github.com/Galileo-Galilei/kedro-mlflow/issues/72) - `mlflow init` is now getting conf path from context.CONF_ROOT instead of hardcoded conf folder. This makes the package robust to Kedro changes. -- `MlflowMetricsDataset` now saves in the specified `run_id` instead of the current one when the prefix is not specified (#102) +- `MlflowMetricsDataset` now saves in the specified `run_id` instead of the current one when the prefix is not specified [#62](https://github.com/Galileo-Galilei/kedro-mlflow/issues/62) ### Changed -- Documentation reference to the plugin is now dynamic when necessary (#6). -- The test coverage now excludes `tests` and `setup.py` (#99). -- The `KedroPipelineModel` now unpacks the result of the `inference` pipeline and no longer returns a dictionary with the name in the `DataCatalog` but only the predicted value (#93). -- The `PipelineML.extract_pipeline_catalog` is renamed `PipelineML._extract_pipeline_catalog` to indicate it is a private method and is not intended to be used directly by end users (#100) +- Documentation reference to the plugin is now dynamic when necessary [#6](https://github.com/Galileo-Galilei/kedro-mlflow/issues/6) +- The test coverage now excludes `tests` and `setup.py` [#99](https://github.com/Galileo-Galilei/kedro-mlflow/issues/99) +- The `KedroPipelineModel` now unpacks the result of the `inference` pipeline and no longer returns a dictionary with the name in the `DataCatalog` but only the predicted value [#93](https://github.com/Galileo-Galilei/kedro-mlflow/issues/93) +- The `PipelineML.extract_pipeline_catalog` is renamed `PipelineML._extract_pipeline_catalog` to indicate it is a private method and is not intended to be used directly by end users [#100](https://github.com/Galileo-Galilei/kedro-mlflow/issues/100) ### Removed -- `kedro mlflow init` command is no longer declaring hooks in `run.py`. You must now [register your hooks manually](docs/source/03_tutorial/02_setup.md#declaring-kedro-mlflow-hooks) in the ``run.py`` (kedro > 0.16.0), ``.kedro.yml`` (kedro >= 0.16.5) or ``pyproject.toml`` (kedro >= 0.16.5) +- `kedro mlflow init` command is no longer declaring hooks in `run.py`. You must now [register your hooks manually](docs/source/03_tutorial/02_setup.md#declaring-kedro-mlflow-hooks) in the ``run.py`` (kedro > 0.16.0), ``.kedro.yml`` (kedro >= 0.16.5) or ``pyproject.toml`` (kedro >= 0.16.5) [](https://github.com/Galileo-Galilei/kedro-mlflow/issues/70) ## [0.3.0] - 2020-10-11 diff --git a/kedro_mlflow/framework/cli/cli.py b/kedro_mlflow/framework/cli/cli.py index a6dc6031..4c4b5418 100644 --- a/kedro_mlflow/framework/cli/cli.py +++ b/kedro_mlflow/framework/cli/cli.py @@ -2,9 +2,7 @@ from pathlib import Path import click -from kedro import __version__ as kedro_version from kedro.framework.context import load_context -from packaging import version from kedro_mlflow.framework.cli.cli_utils import write_jinja_template from kedro_mlflow.framework.context import get_mlflow_config @@ -12,8 +10,10 @@ try: from kedro.framework.context import get_static_project_data -except ImportError: # pragma: no cover - from kedro_mlflow.utils import _get_project_globals as get_static_project_data # pragma: no cover +except ImportError: # pragma: no cover + from kedro_mlflow.utils import ( + _get_project_globals as get_static_project_data, # pragma: no cover + ) TEMPLATE_FOLDER_PATH = Path(__file__).parent.parent.parent / "template" / "project" @@ -49,15 +49,13 @@ def get_command(self, ctx, cmd_name): @click.group(name="Mlflow") def commands(): - """Kedro plugin for interactions with mlflow. - """ + """Kedro plugin for interactions with mlflow.""" pass # pragma: no cover @commands.command(name="mlflow", cls=KedroClickGroup) def mlflow_commands(): - """Use mlflow-specific commands inside kedro project. - """ + """Use mlflow-specific commands inside kedro project.""" pass @@ -128,8 +126,8 @@ def init(force, silent): ) def ui(project_path, env): """Opens the mlflow user interface with the - project-specific settings of mlflow.yml. This interface - enables to browse and compares runs. + project-specific settings of mlflow.yml. This interface + enables to browse and compares runs. """ @@ -148,8 +146,7 @@ def ui(project_path, env): @mlflow_commands.command() def run(): - """Re-run an old run with mlflow-logged info. - """ + """Re-run an old run with mlflow-logged info.""" # TODO (HARD) : define general assumptions to check whether a run # is reproductible or not @@ -163,13 +160,11 @@ def run(): @mlflow_commands.command() def new(): - """Create a new kedro project with updated template. - """ + """Create a new kedro project with updated template.""" raise NotImplementedError # pragma: no cover class KedroMlflowCliError(Exception): - """ kedro-mlflow cli specific error - """ + """kedro-mlflow cli specific error""" pass diff --git a/kedro_mlflow/framework/hooks/pipeline_hook.py b/kedro_mlflow/framework/hooks/pipeline_hook.py index 70c7c49c..f3e4e6a4 100644 --- a/kedro_mlflow/framework/hooks/pipeline_hook.py +++ b/kedro_mlflow/framework/hooks/pipeline_hook.py @@ -9,6 +9,7 @@ from kedro.io import DataCatalog from kedro.pipeline import Pipeline from kedro.versioning.journal import _git_sha +from mlflow.models import infer_signature from kedro_mlflow.framework.context import get_mlflow_config from kedro_mlflow.io import MlflowMetricsDataSet @@ -136,6 +137,15 @@ def after_pipeline_run( if isinstance(pipeline, PipelineML): pipeline_catalog = pipeline._extract_pipeline_catalog(catalog) artifacts = pipeline.extract_pipeline_artifacts(pipeline_catalog) + + if pipeline.model_signature == "auto": + input_data = pipeline_catalog.load(pipeline.input_name) + model_signature = infer_signature(model_input=input_data) + else: + model_signature = pipeline.model_signature + print("OHE") + print(model_signature) + mlflow.pyfunc.log_model( artifact_path=pipeline.model_name, python_model=KedroPipelineModel( @@ -143,6 +153,7 @@ def after_pipeline_run( ), artifacts=artifacts, conda_env=_format_conda_env(pipeline.conda_env), + signature=model_signature, ) # Close the mlflow active run at the end of the pipeline to avoid interactions with further runs mlflow.end_run() diff --git a/kedro_mlflow/pipeline/pipeline_ml.py b/kedro_mlflow/pipeline/pipeline_ml.py index 7dbb405c..9f08ddbb 100644 --- a/kedro_mlflow/pipeline/pipeline_ml.py +++ b/kedro_mlflow/pipeline/pipeline_ml.py @@ -5,6 +5,7 @@ from kedro.io import DataCatalog, MemoryDataSet from kedro.pipeline import Pipeline from kedro.pipeline.node import Node +from mlflow.models import ModelSignature MSG_NOT_IMPLEMENTED = ( "This method is not implemented because it does" @@ -43,6 +44,7 @@ def __init__( input_name: str, conda_env: Optional[Union[str, Path, Dict[str, Any]]] = None, model_name: Optional[str] = "model", + model_signature: Union[ModelSignature, str, None] = "auto", ): """Store all necessary information for calling mlflow.log_model in the pipeline. @@ -77,6 +79,13 @@ def __init__( model_name (Union[str, None], optional): The name of the folder where the model will be stored in remote mlflow. Defaults to "model". + model_signature (Union[ModelSignature, bool]): The mlflow + signature of the input dataframe common to training + and inference. + - If 'auto', it is infered automatically + - If None, no signature is used + - if a `ModelSignature` instance, passed + to the underlying dataframe """ super().__init__(nodes, *args, tags=tags) @@ -85,17 +94,31 @@ def __init__( self.conda_env = conda_env self.model_name = model_name self.input_name = input_name + self.model_signature = model_signature self._check_consistency() - @property - def input_name(self) -> str: - return self._input_name - @property def _logger(self) -> logging.Logger: return logging.getLogger(__name__) + @property + def training(self) -> Pipeline: + return Pipeline(self.nodes) + + @property + def inference(self) -> str: + return self._inference + + @inference.setter + def inference(self, inference: Pipeline) -> None: + self._check_inference(inference) + self._inference = inference + + @property + def input_name(self) -> str: + return self._input_name + @input_name.setter def input_name(self, name: str) -> None: allowed_names = self.inference.inputs() @@ -110,17 +133,18 @@ def input_name(self, name: str) -> None: self._input_name = name @property - def inference(self) -> str: - return self._inference - - @inference.setter - def inference(self, inference: Pipeline) -> None: - self._check_inference(inference) - self._inference = inference - - @property - def training(self) -> Pipeline: - return Pipeline(self.nodes) + def model_signature(self) -> str: + return self._model_signature + + @model_signature.setter + def model_signature(self, model_signature: ModelSignature) -> None: + if model_signature is not None: + if not isinstance(model_signature, ModelSignature): + if model_signature != "auto": + raise ValueError( + f"model_signature must be one of 'None', 'auto', or a 'ModelSignature' Object, got '{type(model_signature)}'" + ) + self._model_signature = model_signature def _check_inference(self, inference: Pipeline) -> None: nb_outputs = len(inference.outputs()) @@ -276,15 +300,12 @@ def __or__(self, other): # pragma: no cover class KedroMlflowPipelineMLInputsError(Exception): - """Error raised when the inputs of KedroPipelineModel are invalid - """ + """Error raised when the inputs of KedroPipelineModel are invalid""" class KedroMlflowPipelineMLDatasetsError(Exception): - """Error raised when the inputs of KedroPipelineMoel are invalid - """ + """Error raised when the inputs of KedroPipelineMoel are invalid""" class KedroMlflowPipelineMLOutputsError(Exception): - """Error raised when the outputs of KedroPipelineModel are invalid - """ + """Error raised when the outputs of KedroPipelineModel are invalid""" diff --git a/kedro_mlflow/pipeline/pipeline_ml_factory.py b/kedro_mlflow/pipeline/pipeline_ml_factory.py index 425a7911..d094a1dc 100644 --- a/kedro_mlflow/pipeline/pipeline_ml_factory.py +++ b/kedro_mlflow/pipeline/pipeline_ml_factory.py @@ -3,6 +3,7 @@ from warnings import warn from kedro.pipeline import Pipeline +from mlflow.models import ModelSignature from kedro_mlflow.pipeline.pipeline_ml import PipelineML @@ -13,6 +14,7 @@ def pipeline_ml_factory( input_name: str = None, conda_env: Optional[Union[str, Path, Dict[str, Any]]] = None, model_name: Optional[str] = "model", + model_signature: Union[ModelSignature, str, None] = "auto", ) -> PipelineML: """This function is a helper to create `PipelineML` object directly from two Kedro `Pipelines` (one of @@ -47,6 +49,13 @@ def pipeline_ml_factory( model_name (Union[str, None], optional): The name of the folder where the model will be stored in remote mlflow. Defaults to "model". + model_signature (Union[ModelSignature, bool]): The mlflow + signature of the input dataframe common to training + and inference. + - If 'auto', it is infered automatically + - If None, no signature is used + - if a `ModelSignature` instance, passed + to the underlying dataframe Returns: PipelineML: A `PipelineML` which is automatically @@ -61,6 +70,7 @@ def pipeline_ml_factory( input_name=input_name, conda_env=conda_env, model_name=model_name, + model_signature=model_signature, ) return pipeline diff --git a/tests/framework/hooks/test_pipeline_hook.py b/tests/framework/hooks/test_pipeline_hook.py index 76666802..dd6342fd 100644 --- a/tests/framework/hooks/test_pipeline_hook.py +++ b/tests/framework/hooks/test_pipeline_hook.py @@ -1,6 +1,7 @@ import sys import mlflow +import pandas as pd import pytest import yaml from kedro.extras.datasets.pickle import PickleDataSet @@ -8,6 +9,7 @@ from kedro.io import DataCatalog, MemoryDataSet from kedro.pipeline import Pipeline, node from kedro.runner import SequentialRunner +from mlflow.models import infer_signature from mlflow.tracking import MlflowClient from kedro_mlflow.framework.context import get_mlflow_config @@ -79,7 +81,9 @@ def env_from_environment(environment_path, env_from_dict): with open(environment_path, mode="w") as file_handler: yaml.dump(env_from_dict, file_handler) - return env_from_dict + env_from_environment = env_from_dict + + return env_from_environment @pytest.mark.parametrize( @@ -183,7 +187,7 @@ def dummy_pipeline_ml(dummy_pipeline, env_from_dict): def dummy_catalog(tmp_path): dummy_catalog = DataCatalog( { - "raw_data": MemoryDataSet(1), + "raw_data": MemoryDataSet(pd.DataFrame(data=[1], columns=["a"])), "params:unused_param": MemoryDataSet("blah"), "data": MemoryDataSet(), "model": PickleDataSet((tmp_path / "model.csv").as_posix()), @@ -194,6 +198,13 @@ def dummy_catalog(tmp_path): return dummy_catalog +@pytest.fixture +def dummy_signature(dummy_catalog, dummy_pipeline_ml): + input_data = dummy_catalog.load(dummy_pipeline_ml.input_name) + dummy_signature = infer_signature(input_data) + return dummy_signature + + @pytest.fixture def dummy_run_params(tmp_path): dummy_run_params = { @@ -278,10 +289,12 @@ def test_mlflow_pipeline_hook_with_different_pipeline_types( mlflow_conf = get_mlflow_config(context) mlflow_client = MlflowClient(mlflow_conf.mlflow_tracking_uri) run_data = mlflow_client.get_run(run_id).data + # all run_params are recorded as tags for k, v in dummy_run_params.items(): if v: assert run_data.tags[k] == str(v) + # params are not recorded because we don't have MlflowNodeHook here # and the model should not be logged when it is not a PipelineML nb_artifacts = len(mlflow_client.list_artifacts(run_id)) @@ -289,11 +302,19 @@ def test_mlflow_pipeline_hook_with_different_pipeline_types( assert nb_artifacts == 1 else: assert nb_artifacts == 0 + # Check if metrics datasets have prefix with its names. # for metric assert dummy_catalog._data_sets["my_metrics"]._prefix == "my_metrics" assert dummy_catalog._data_sets["another_metrics"]._prefix == "foo" + if isinstance(pipeline_to_run, PipelineML): + trained_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") + assert trained_model.metadata.signature.to_dict() == { + "inputs": '[{"name": "a", "type": "long"}]', + "outputs": None, + } + def test_mlflow_pipeline_hook_metrics_with_run_id( mocker, @@ -318,7 +339,7 @@ def test_mlflow_pipeline_hook_metrics_with_run_id( dummy_catalog_with_run_id = DataCatalog( { - "raw_data": MemoryDataSet(1), + "raw_data": MemoryDataSet(pd.DataFrame(data=[1], columns=["a"])), "params:unused_param": MemoryDataSet("blah"), "data": MemoryDataSet(), "model": PickleDataSet((tmp_path / "model.csv").as_posix()), @@ -373,6 +394,70 @@ def test_mlflow_pipeline_hook_metrics_with_run_id( assert run_data.metrics["foo.metric_key"] == 1.1 +@pytest.mark.parametrize( + "model_signature,expected_signature", + ( + [None, None], + ["auto", pytest.lazy_fixture("dummy_signature")], + [ + pytest.lazy_fixture("dummy_signature"), + pytest.lazy_fixture("dummy_signature"), + ], + ), +) +def test_mlflow_pipeline_hook_with_pipeline_ml_signature( + mocker, + monkeypatch, + tmp_path, + config_dir, + env_from_dict, + dummy_pipeline, + dummy_catalog, + dummy_run_params, + dummy_mlflow_conf, + model_signature, + expected_signature, +): + # config_with_base_mlflow_conf is a conftest fixture + mocker.patch("kedro_mlflow.utils._is_kedro_project", return_value=True) + monkeypatch.chdir(tmp_path) + pipeline_hook = MlflowPipelineHook() + runner = SequentialRunner() + + pipeline_to_run = pipeline_ml_factory( + training=dummy_pipeline.only_nodes_with_tags("training"), + inference=dummy_pipeline.only_nodes_with_tags("inference"), + input_name="raw_data", + conda_env=env_from_dict, + model_name="model", + model_signature=model_signature, + ) + + pipeline_hook.after_catalog_created( + catalog=dummy_catalog, + # `after_catalog_created` is not using any of arguments bellow, + # so we are setting them to empty values. + conf_catalog={}, + conf_creds={}, + feed_dict={}, + save_version="", + load_versions="", + run_id=dummy_run_params["run_id"], + ) + pipeline_hook.before_pipeline_run( + run_params=dummy_run_params, pipeline=pipeline_to_run, catalog=dummy_catalog + ) + runner.run(pipeline_to_run, dummy_catalog) + run_id = mlflow.active_run().info.run_id + pipeline_hook.after_pipeline_run( + run_params=dummy_run_params, pipeline=pipeline_to_run, catalog=dummy_catalog + ) + + # test : parameters should have been logged + trained_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") + assert trained_model.metadata.signature == expected_signature + + def test_generate_kedro_commands(): # TODO : add a better test because the formatting of record_data is subject to change # We could check that the command is recored and then rerun properly diff --git a/tests/pipeline/test_pipeline_ml.py b/tests/pipeline/test_pipeline_ml.py index 44833772..f0b8c6a2 100644 --- a/tests/pipeline/test_pipeline_ml.py +++ b/tests/pipeline/test_pipeline_ml.py @@ -248,9 +248,9 @@ def test_filtering_pipeline_ml( from_inputs, ): """When the pipeline is filtered by the context (e.g calling only_nodes_with_tags, - from_inputs...), it must return a PipelineML instance with unmodified inference. - We loop dynamically on the arguments of the function in case of kedro - modify the filters. + from_inputs...), it must return a PipelineML instance with unmodified inference. + We loop dynamically on the arguments of the function in case of kedro + modify the filters. """ # dummy_context, pipeline_with_tag, pipeline_ml_with_tag are fixture in conftest @@ -463,3 +463,24 @@ def test_not_enough_inference_outputs(): ), input_name="data", ) + + +def test_wrong_pipeline_ml_signature_type(pipeline_with_tag): + with pytest.raises( + ValueError, + match="model_signature must be one of 'None', 'auto', or a 'ModelSignature'", + ): + pipeline_ml_factory( + training=pipeline_with_tag, + inference=Pipeline( + [ + node( + func=predict_fun, + inputs=["model", "data"], + outputs="predictions", + ) + ] + ), + input_name="data", + model_signature="wrong_type", + )