From a0f5aa3f3c74e54e9638e0be074c4746bbd4efcb Mon Sep 17 00:00:00 2001 From: Galileo Galilei Date: Thu, 29 Oct 2020 00:47:46 +0100 Subject: [PATCH 1/2] FIX #106 - Update isort to 5.0.0 to fix conflicts with black --- .pre-commit-config.yaml | 7 +------ requirements/test_requirements.txt | 2 +- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1cc8690f..2a8f524a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -5,13 +5,8 @@ repos: hooks: - id: black language_version: python3.7 - - repo: https://github.com/asottile/seed-isort-config - rev: v2.1.1 - hooks: - - id: seed-isort-config - args: [--exclude=kedro_mlflow/template/project/run.py] - repo: https://github.com/timothycrosley/isort - rev: e762d48fce82492b9c092653fc2f5cbf00197dcc + rev: 5.6.4 hooks: - id: isort - repo: https://gitlab.com/pycqa/flake8 diff --git a/requirements/test_requirements.txt b/requirements/test_requirements.txt index a74c50bd..7c09d50f 100644 --- a/requirements/test_requirements.txt +++ b/requirements/test_requirements.txt @@ -4,4 +4,4 @@ pytest-lazy-fixture>=0.6.0, <1.0.0 pytest-mock>=3.1.0, <4.0.0 flake8>=3.0.0, <4.0.0 black==19.10b0 # pin black version because it is not compatible with a pip range (because of non semver version number) -isort>=4.0.0, <5.0.0 +isort>=5.0.0, <6.0.0 From 95d59807aa366f0a7e920b8db1fe27048f3a9f61 Mon Sep 17 00:00:00 2001 From: Galileo Galilei Date: Thu, 29 Oct 2020 00:48:19 +0100 Subject: [PATCH 2/2] FIX #70 - Add an input schema in PipelineML for data validation at inference time --- CHANGELOG.md | 27 +++--- docs/source/05_python_objects/02_Hooks.md | 7 +- docs/source/05_python_objects/03_Pipelines.md | 11 ++- kedro_mlflow/framework/cli/cli.py | 27 +++--- kedro_mlflow/framework/hooks/pipeline_hook.py | 9 ++ kedro_mlflow/pipeline/pipeline_ml.py | 63 ++++++++----- kedro_mlflow/pipeline/pipeline_ml_factory.py | 10 ++ tests/framework/hooks/test_pipeline_hook.py | 91 ++++++++++++++++++- tests/pipeline/test_pipeline_ml.py | 27 +++++- 9 files changed, 214 insertions(+), 58 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 17395752..fa34bf93 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,29 +4,30 @@ ### Added -- `kedro-mlflow` now supports kedro 0.16.5 (#62) -- `kedro-mlflow` hooks can now be declared in `.kedro.yml` or `pyproject.toml` by adding `kedro_mlflow.framework.hooks.mlflow_pipeline_hook` and `kedro_mlflow.framework.hooks.mlflow_node_hook` into the hooks entry. _Only for kedro>=0.16.5_ -- `pipeline_ml_factory` now accepts that `inference` pipeline `inputs` may be in `training` pipeline `inputs` (#71) +- `kedro-mlflow` now supports `kedro>=0.16.5` [#62](https://github.com/Galileo-Galilei/kedro-mlflow/issues/62) +- `kedro-mlflow` hooks can now be declared in `.kedro.yml` or `pyproject.toml` by adding `kedro_mlflow.framework.hooks.mlflow_pipeline_hook` and `kedro_mlflow.framework.hooks.mlflow_node_hook` into the hooks entry. _Only for kedro>=0.16.5_ [#96](https://github.com/Galileo-Galilei/kedro-mlflow/issues/96) +- `pipeline_ml_factory` now accepts that `inference` pipeline `inputs` may be in `training` pipeline `inputs` [#71](https://github.com/Galileo-Galilei/kedro-mlflow/issues/71) +- `pipeline_ml_factory` now infer automatically the schema of the input dataset to validate data automatically at inference time. The output schema can be declared manually in `model_signature` argument [#70](https://github.com/Galileo-Galilei/kedro-mlflow/issues/70) ### Fixed -- `get_mlflow_config` now uses the Kedro `ProjectContext` `ConfigLoader` to get configs (#66). This indirectly solves the following issues: - - `get_mlflow_config` now works in interactive mode if `load_context` is called with a path different from the working directory (#30) - - `kedro-mlflow` now works fine with `kedro jupyter notebook` command independently of the working directory (#64) - - You can use global variables in `mlflow.yml` which is now properly parsed if you use a `TemplatedConfigLoader` (#72) +- `get_mlflow_config` now uses the Kedro `ProjectContext` `ConfigLoader` to get configs [#66](https://github.com/Galileo-Galilei/kedro-mlflow/issues/66). This indirectly solves the following issues: + - `get_mlflow_config` now works in interactive mode if `load_context` is called with a path different from the working directory [#30](https://github.com/Galileo-Galilei/kedro-mlflow/issues/30) + - kedro_mlflow now works fine with kedro jupyter notebook independently of the working directory [#64](https://github.com/Galileo-Galilei/kedro-mlflow/issues/64) + - You can use global variables in `mlflow.yml` which is now properly parsed if you use a `TemplatedConfigLoader` [#72](https://github.com/Galileo-Galilei/kedro-mlflow/issues/72) - `mlflow init` is now getting conf path from context.CONF_ROOT instead of hardcoded conf folder. This makes the package robust to Kedro changes. -- `MlflowMetricsDataset` now saves in the specified `run_id` instead of the current one when the prefix is not specified (#102) +- `MlflowMetricsDataset` now saves in the specified `run_id` instead of the current one when the prefix is not specified [#62](https://github.com/Galileo-Galilei/kedro-mlflow/issues/62) ### Changed -- Documentation reference to the plugin is now dynamic when necessary (#6). -- The test coverage now excludes `tests` and `setup.py` (#99). -- The `KedroPipelineModel` now unpacks the result of the `inference` pipeline and no longer returns a dictionary with the name in the `DataCatalog` but only the predicted value (#93). -- The `PipelineML.extract_pipeline_catalog` is renamed `PipelineML._extract_pipeline_catalog` to indicate it is a private method and is not intended to be used directly by end users (#100) +- Documentation reference to the plugin is now dynamic when necessary [#6](https://github.com/Galileo-Galilei/kedro-mlflow/issues/6) +- The test coverage now excludes `tests` and `setup.py` [#99](https://github.com/Galileo-Galilei/kedro-mlflow/issues/99) +- The `KedroPipelineModel` now unpacks the result of the `inference` pipeline and no longer returns a dictionary with the name in the `DataCatalog` but only the predicted value [#93](https://github.com/Galileo-Galilei/kedro-mlflow/issues/93) +- The `PipelineML.extract_pipeline_catalog` is renamed `PipelineML._extract_pipeline_catalog` to indicate it is a private method and is not intended to be used directly by end users [#100](https://github.com/Galileo-Galilei/kedro-mlflow/issues/100) ### Removed -- `kedro mlflow init` command is no longer declaring hooks in `run.py`. You must now [register your hooks manually](docs/source/03_tutorial/02_setup.md#declaring-kedro-mlflow-hooks) in the ``run.py`` (kedro > 0.16.0), ``.kedro.yml`` (kedro >= 0.16.5) or ``pyproject.toml`` (kedro >= 0.16.5) +- `kedro mlflow init` command is no longer declaring hooks in `run.py`. You must now [register your hooks manually](docs/source/03_tutorial/02_setup.md#declaring-kedro-mlflow-hooks) in the ``run.py`` (kedro > 0.16.0), ``.kedro.yml`` (kedro >= 0.16.5) or ``pyproject.toml`` (kedro >= 0.16.5) [](https://github.com/Galileo-Galilei/kedro-mlflow/issues/70) ## [0.3.0] - 2020-10-11 diff --git a/docs/source/05_python_objects/02_Hooks.md b/docs/source/05_python_objects/02_Hooks.md index c3b3c39b..d3d90a9e 100644 --- a/docs/source/05_python_objects/02_Hooks.md +++ b/docs/source/05_python_objects/02_Hooks.md @@ -1,13 +1,18 @@ # ``Hooks`` + This package provides 2 new hooks. ## ``MlflowPipelineHook`` + This hook : + 1. manages mlflow settings at the beginning and the end of the run (run start / end). 2. log useful informations for reproducibility as ``mlflow tags`` (including kedro ``Journal`` information and the commands used to launch the run). 3. register the pipeline as a valid ``mlflow model`` if [it is a ``PipelineML`` instance](#new-pipeline) ## ``MlflowNodeHook`` -This hook : + +This hook: + 1. must be used with the ``MlflowPipelineHook`` 2. autolog nodes parameters each time the pipeline is run (with ``kedro run`` or programatically). diff --git a/docs/source/05_python_objects/03_Pipelines.md b/docs/source/05_python_objects/03_Pipelines.md index 88f8bcc3..de8fb561 100644 --- a/docs/source/05_python_objects/03_Pipelines.md +++ b/docs/source/05_python_objects/03_Pipelines.md @@ -1,4 +1,5 @@ # Pipelines + ## ``PipelineML`` and ``pipeline_ml_factory`` ``PipelineML`` is a new class which extends ``Pipeline`` and enable to bind two pipelines (one of training, one of inference) together. This class comes with a ``KedroPipelineModel`` class for logging it in mlflow. A pipeline logged as a mlflow model can be served using ``mlflow models serve`` and ``mlflow models predict`` command. @@ -24,9 +25,11 @@ def create_pipelines(**kwargs) -> Dict[str, Pipeline]: } ``` + Now each time you will run ``kedro run --pipeline=training`` (provided you registered ``MlflowPipelineHook`` in you ``run.py``), the full inference pipeline will be registered as a mlflow model (with all the outputs produced by training as artifacts : the machine learning model, but also the *scaler*, *vectorizer*, *imputer*, or whatever object fitted on data you create in ``training`` and that is used in ``inference``). Note that: + - the `inference` pipeline `input_name` can be a `MemoryDataSet` and it belongs to inference pipeline `inputs` - Apart form `input_name`, all other `inference` pipeline `inputs` must be persisted locally on disk (i.e. it must not be `MemoryDataSet` and must have a local `filepath`) - the `inference` pipeline `inputs` must belong to training `outputs` (vectorizer, binarizer, machine learning model...) @@ -38,6 +41,7 @@ Note that: from pathlib import Path from kedro.framework.context import load_context from kedro_mlflow.mlflow import KedroPipelineModel +from mlflow.models import ModelSignature # pipeline_training is your PipelineML object, created as previsously catalog = load_context(".").io @@ -45,6 +49,10 @@ catalog = load_context(".").io # artifacts are all the inputs of the inference pipelines that are persisted in the catalog artifacts = pipeline_training.extract_pipeline_artifacts(catalog) +# get the schema of the input dataset +input_data = catalog.load(pipeline_training.input_name) +model_signature = infer_signature(model_input=input_data) + mlflow.pyfunc.log_model( artifact_path="model", python_model=KedroPipelineModel( @@ -52,6 +60,7 @@ mlflow.pyfunc.log_model( catalog=catalog ), artifacts=artifacts, - conda_env={"python": "3.7.0"} + conda_env={"python": "3.7.0"}, + model_signature=model_signature ) ``` diff --git a/kedro_mlflow/framework/cli/cli.py b/kedro_mlflow/framework/cli/cli.py index a6dc6031..4c4b5418 100644 --- a/kedro_mlflow/framework/cli/cli.py +++ b/kedro_mlflow/framework/cli/cli.py @@ -2,9 +2,7 @@ from pathlib import Path import click -from kedro import __version__ as kedro_version from kedro.framework.context import load_context -from packaging import version from kedro_mlflow.framework.cli.cli_utils import write_jinja_template from kedro_mlflow.framework.context import get_mlflow_config @@ -12,8 +10,10 @@ try: from kedro.framework.context import get_static_project_data -except ImportError: # pragma: no cover - from kedro_mlflow.utils import _get_project_globals as get_static_project_data # pragma: no cover +except ImportError: # pragma: no cover + from kedro_mlflow.utils import ( + _get_project_globals as get_static_project_data, # pragma: no cover + ) TEMPLATE_FOLDER_PATH = Path(__file__).parent.parent.parent / "template" / "project" @@ -49,15 +49,13 @@ def get_command(self, ctx, cmd_name): @click.group(name="Mlflow") def commands(): - """Kedro plugin for interactions with mlflow. - """ + """Kedro plugin for interactions with mlflow.""" pass # pragma: no cover @commands.command(name="mlflow", cls=KedroClickGroup) def mlflow_commands(): - """Use mlflow-specific commands inside kedro project. - """ + """Use mlflow-specific commands inside kedro project.""" pass @@ -128,8 +126,8 @@ def init(force, silent): ) def ui(project_path, env): """Opens the mlflow user interface with the - project-specific settings of mlflow.yml. This interface - enables to browse and compares runs. + project-specific settings of mlflow.yml. This interface + enables to browse and compares runs. """ @@ -148,8 +146,7 @@ def ui(project_path, env): @mlflow_commands.command() def run(): - """Re-run an old run with mlflow-logged info. - """ + """Re-run an old run with mlflow-logged info.""" # TODO (HARD) : define general assumptions to check whether a run # is reproductible or not @@ -163,13 +160,11 @@ def run(): @mlflow_commands.command() def new(): - """Create a new kedro project with updated template. - """ + """Create a new kedro project with updated template.""" raise NotImplementedError # pragma: no cover class KedroMlflowCliError(Exception): - """ kedro-mlflow cli specific error - """ + """kedro-mlflow cli specific error""" pass diff --git a/kedro_mlflow/framework/hooks/pipeline_hook.py b/kedro_mlflow/framework/hooks/pipeline_hook.py index 70c7c49c..8b2244e3 100644 --- a/kedro_mlflow/framework/hooks/pipeline_hook.py +++ b/kedro_mlflow/framework/hooks/pipeline_hook.py @@ -9,6 +9,7 @@ from kedro.io import DataCatalog from kedro.pipeline import Pipeline from kedro.versioning.journal import _git_sha +from mlflow.models import infer_signature from kedro_mlflow.framework.context import get_mlflow_config from kedro_mlflow.io import MlflowMetricsDataSet @@ -136,6 +137,13 @@ def after_pipeline_run( if isinstance(pipeline, PipelineML): pipeline_catalog = pipeline._extract_pipeline_catalog(catalog) artifacts = pipeline.extract_pipeline_artifacts(pipeline_catalog) + + if pipeline.model_signature == "auto": + input_data = pipeline_catalog.load(pipeline.input_name) + model_signature = infer_signature(model_input=input_data) + else: + model_signature = pipeline.model_signature + mlflow.pyfunc.log_model( artifact_path=pipeline.model_name, python_model=KedroPipelineModel( @@ -143,6 +151,7 @@ def after_pipeline_run( ), artifacts=artifacts, conda_env=_format_conda_env(pipeline.conda_env), + signature=model_signature, ) # Close the mlflow active run at the end of the pipeline to avoid interactions with further runs mlflow.end_run() diff --git a/kedro_mlflow/pipeline/pipeline_ml.py b/kedro_mlflow/pipeline/pipeline_ml.py index 7dbb405c..9f08ddbb 100644 --- a/kedro_mlflow/pipeline/pipeline_ml.py +++ b/kedro_mlflow/pipeline/pipeline_ml.py @@ -5,6 +5,7 @@ from kedro.io import DataCatalog, MemoryDataSet from kedro.pipeline import Pipeline from kedro.pipeline.node import Node +from mlflow.models import ModelSignature MSG_NOT_IMPLEMENTED = ( "This method is not implemented because it does" @@ -43,6 +44,7 @@ def __init__( input_name: str, conda_env: Optional[Union[str, Path, Dict[str, Any]]] = None, model_name: Optional[str] = "model", + model_signature: Union[ModelSignature, str, None] = "auto", ): """Store all necessary information for calling mlflow.log_model in the pipeline. @@ -77,6 +79,13 @@ def __init__( model_name (Union[str, None], optional): The name of the folder where the model will be stored in remote mlflow. Defaults to "model". + model_signature (Union[ModelSignature, bool]): The mlflow + signature of the input dataframe common to training + and inference. + - If 'auto', it is infered automatically + - If None, no signature is used + - if a `ModelSignature` instance, passed + to the underlying dataframe """ super().__init__(nodes, *args, tags=tags) @@ -85,17 +94,31 @@ def __init__( self.conda_env = conda_env self.model_name = model_name self.input_name = input_name + self.model_signature = model_signature self._check_consistency() - @property - def input_name(self) -> str: - return self._input_name - @property def _logger(self) -> logging.Logger: return logging.getLogger(__name__) + @property + def training(self) -> Pipeline: + return Pipeline(self.nodes) + + @property + def inference(self) -> str: + return self._inference + + @inference.setter + def inference(self, inference: Pipeline) -> None: + self._check_inference(inference) + self._inference = inference + + @property + def input_name(self) -> str: + return self._input_name + @input_name.setter def input_name(self, name: str) -> None: allowed_names = self.inference.inputs() @@ -110,17 +133,18 @@ def input_name(self, name: str) -> None: self._input_name = name @property - def inference(self) -> str: - return self._inference - - @inference.setter - def inference(self, inference: Pipeline) -> None: - self._check_inference(inference) - self._inference = inference - - @property - def training(self) -> Pipeline: - return Pipeline(self.nodes) + def model_signature(self) -> str: + return self._model_signature + + @model_signature.setter + def model_signature(self, model_signature: ModelSignature) -> None: + if model_signature is not None: + if not isinstance(model_signature, ModelSignature): + if model_signature != "auto": + raise ValueError( + f"model_signature must be one of 'None', 'auto', or a 'ModelSignature' Object, got '{type(model_signature)}'" + ) + self._model_signature = model_signature def _check_inference(self, inference: Pipeline) -> None: nb_outputs = len(inference.outputs()) @@ -276,15 +300,12 @@ def __or__(self, other): # pragma: no cover class KedroMlflowPipelineMLInputsError(Exception): - """Error raised when the inputs of KedroPipelineModel are invalid - """ + """Error raised when the inputs of KedroPipelineModel are invalid""" class KedroMlflowPipelineMLDatasetsError(Exception): - """Error raised when the inputs of KedroPipelineMoel are invalid - """ + """Error raised when the inputs of KedroPipelineMoel are invalid""" class KedroMlflowPipelineMLOutputsError(Exception): - """Error raised when the outputs of KedroPipelineModel are invalid - """ + """Error raised when the outputs of KedroPipelineModel are invalid""" diff --git a/kedro_mlflow/pipeline/pipeline_ml_factory.py b/kedro_mlflow/pipeline/pipeline_ml_factory.py index 425a7911..d094a1dc 100644 --- a/kedro_mlflow/pipeline/pipeline_ml_factory.py +++ b/kedro_mlflow/pipeline/pipeline_ml_factory.py @@ -3,6 +3,7 @@ from warnings import warn from kedro.pipeline import Pipeline +from mlflow.models import ModelSignature from kedro_mlflow.pipeline.pipeline_ml import PipelineML @@ -13,6 +14,7 @@ def pipeline_ml_factory( input_name: str = None, conda_env: Optional[Union[str, Path, Dict[str, Any]]] = None, model_name: Optional[str] = "model", + model_signature: Union[ModelSignature, str, None] = "auto", ) -> PipelineML: """This function is a helper to create `PipelineML` object directly from two Kedro `Pipelines` (one of @@ -47,6 +49,13 @@ def pipeline_ml_factory( model_name (Union[str, None], optional): The name of the folder where the model will be stored in remote mlflow. Defaults to "model". + model_signature (Union[ModelSignature, bool]): The mlflow + signature of the input dataframe common to training + and inference. + - If 'auto', it is infered automatically + - If None, no signature is used + - if a `ModelSignature` instance, passed + to the underlying dataframe Returns: PipelineML: A `PipelineML` which is automatically @@ -61,6 +70,7 @@ def pipeline_ml_factory( input_name=input_name, conda_env=conda_env, model_name=model_name, + model_signature=model_signature, ) return pipeline diff --git a/tests/framework/hooks/test_pipeline_hook.py b/tests/framework/hooks/test_pipeline_hook.py index 76666802..dd6342fd 100644 --- a/tests/framework/hooks/test_pipeline_hook.py +++ b/tests/framework/hooks/test_pipeline_hook.py @@ -1,6 +1,7 @@ import sys import mlflow +import pandas as pd import pytest import yaml from kedro.extras.datasets.pickle import PickleDataSet @@ -8,6 +9,7 @@ from kedro.io import DataCatalog, MemoryDataSet from kedro.pipeline import Pipeline, node from kedro.runner import SequentialRunner +from mlflow.models import infer_signature from mlflow.tracking import MlflowClient from kedro_mlflow.framework.context import get_mlflow_config @@ -79,7 +81,9 @@ def env_from_environment(environment_path, env_from_dict): with open(environment_path, mode="w") as file_handler: yaml.dump(env_from_dict, file_handler) - return env_from_dict + env_from_environment = env_from_dict + + return env_from_environment @pytest.mark.parametrize( @@ -183,7 +187,7 @@ def dummy_pipeline_ml(dummy_pipeline, env_from_dict): def dummy_catalog(tmp_path): dummy_catalog = DataCatalog( { - "raw_data": MemoryDataSet(1), + "raw_data": MemoryDataSet(pd.DataFrame(data=[1], columns=["a"])), "params:unused_param": MemoryDataSet("blah"), "data": MemoryDataSet(), "model": PickleDataSet((tmp_path / "model.csv").as_posix()), @@ -194,6 +198,13 @@ def dummy_catalog(tmp_path): return dummy_catalog +@pytest.fixture +def dummy_signature(dummy_catalog, dummy_pipeline_ml): + input_data = dummy_catalog.load(dummy_pipeline_ml.input_name) + dummy_signature = infer_signature(input_data) + return dummy_signature + + @pytest.fixture def dummy_run_params(tmp_path): dummy_run_params = { @@ -278,10 +289,12 @@ def test_mlflow_pipeline_hook_with_different_pipeline_types( mlflow_conf = get_mlflow_config(context) mlflow_client = MlflowClient(mlflow_conf.mlflow_tracking_uri) run_data = mlflow_client.get_run(run_id).data + # all run_params are recorded as tags for k, v in dummy_run_params.items(): if v: assert run_data.tags[k] == str(v) + # params are not recorded because we don't have MlflowNodeHook here # and the model should not be logged when it is not a PipelineML nb_artifacts = len(mlflow_client.list_artifacts(run_id)) @@ -289,11 +302,19 @@ def test_mlflow_pipeline_hook_with_different_pipeline_types( assert nb_artifacts == 1 else: assert nb_artifacts == 0 + # Check if metrics datasets have prefix with its names. # for metric assert dummy_catalog._data_sets["my_metrics"]._prefix == "my_metrics" assert dummy_catalog._data_sets["another_metrics"]._prefix == "foo" + if isinstance(pipeline_to_run, PipelineML): + trained_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") + assert trained_model.metadata.signature.to_dict() == { + "inputs": '[{"name": "a", "type": "long"}]', + "outputs": None, + } + def test_mlflow_pipeline_hook_metrics_with_run_id( mocker, @@ -318,7 +339,7 @@ def test_mlflow_pipeline_hook_metrics_with_run_id( dummy_catalog_with_run_id = DataCatalog( { - "raw_data": MemoryDataSet(1), + "raw_data": MemoryDataSet(pd.DataFrame(data=[1], columns=["a"])), "params:unused_param": MemoryDataSet("blah"), "data": MemoryDataSet(), "model": PickleDataSet((tmp_path / "model.csv").as_posix()), @@ -373,6 +394,70 @@ def test_mlflow_pipeline_hook_metrics_with_run_id( assert run_data.metrics["foo.metric_key"] == 1.1 +@pytest.mark.parametrize( + "model_signature,expected_signature", + ( + [None, None], + ["auto", pytest.lazy_fixture("dummy_signature")], + [ + pytest.lazy_fixture("dummy_signature"), + pytest.lazy_fixture("dummy_signature"), + ], + ), +) +def test_mlflow_pipeline_hook_with_pipeline_ml_signature( + mocker, + monkeypatch, + tmp_path, + config_dir, + env_from_dict, + dummy_pipeline, + dummy_catalog, + dummy_run_params, + dummy_mlflow_conf, + model_signature, + expected_signature, +): + # config_with_base_mlflow_conf is a conftest fixture + mocker.patch("kedro_mlflow.utils._is_kedro_project", return_value=True) + monkeypatch.chdir(tmp_path) + pipeline_hook = MlflowPipelineHook() + runner = SequentialRunner() + + pipeline_to_run = pipeline_ml_factory( + training=dummy_pipeline.only_nodes_with_tags("training"), + inference=dummy_pipeline.only_nodes_with_tags("inference"), + input_name="raw_data", + conda_env=env_from_dict, + model_name="model", + model_signature=model_signature, + ) + + pipeline_hook.after_catalog_created( + catalog=dummy_catalog, + # `after_catalog_created` is not using any of arguments bellow, + # so we are setting them to empty values. + conf_catalog={}, + conf_creds={}, + feed_dict={}, + save_version="", + load_versions="", + run_id=dummy_run_params["run_id"], + ) + pipeline_hook.before_pipeline_run( + run_params=dummy_run_params, pipeline=pipeline_to_run, catalog=dummy_catalog + ) + runner.run(pipeline_to_run, dummy_catalog) + run_id = mlflow.active_run().info.run_id + pipeline_hook.after_pipeline_run( + run_params=dummy_run_params, pipeline=pipeline_to_run, catalog=dummy_catalog + ) + + # test : parameters should have been logged + trained_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") + assert trained_model.metadata.signature == expected_signature + + def test_generate_kedro_commands(): # TODO : add a better test because the formatting of record_data is subject to change # We could check that the command is recored and then rerun properly diff --git a/tests/pipeline/test_pipeline_ml.py b/tests/pipeline/test_pipeline_ml.py index 44833772..f0b8c6a2 100644 --- a/tests/pipeline/test_pipeline_ml.py +++ b/tests/pipeline/test_pipeline_ml.py @@ -248,9 +248,9 @@ def test_filtering_pipeline_ml( from_inputs, ): """When the pipeline is filtered by the context (e.g calling only_nodes_with_tags, - from_inputs...), it must return a PipelineML instance with unmodified inference. - We loop dynamically on the arguments of the function in case of kedro - modify the filters. + from_inputs...), it must return a PipelineML instance with unmodified inference. + We loop dynamically on the arguments of the function in case of kedro + modify the filters. """ # dummy_context, pipeline_with_tag, pipeline_ml_with_tag are fixture in conftest @@ -463,3 +463,24 @@ def test_not_enough_inference_outputs(): ), input_name="data", ) + + +def test_wrong_pipeline_ml_signature_type(pipeline_with_tag): + with pytest.raises( + ValueError, + match="model_signature must be one of 'None', 'auto', or a 'ModelSignature'", + ): + pipeline_ml_factory( + training=pipeline_with_tag, + inference=Pipeline( + [ + node( + func=predict_fun, + inputs=["model", "data"], + outputs="predictions", + ) + ] + ), + input_name="data", + model_signature="wrong_type", + )