Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log dataset info #4

Open
wants to merge 3 commits into
base: feat/getml_mlflow
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 46 additions & 16 deletions mlflow/getml/autologging.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import json

Check failure on line 1 in mlflow/getml/autologging.py

View workflow job for this annotation

GitHub Actions / lint

Unformatted file. Run `ruff format .` or comment `/autoformat` to format.
import logging
import threading
from dataclasses import dataclass, field
from typing import Any

import mlflow
from mlflow.data.pandas_dataset import PandasDataset
from mlflow.entities.dataset_input import DatasetInput
from mlflow.entities.input_tag import InputTag
from mlflow.utils.autologging_utils import safe_patch
from mlflow.utils.autologging_utils.client import MlflowAutologgingQueueingClient
from mlflow.utils.mlflow_tags import MLFLOW_DATASET_CONTEXT

_logger = logging.getLogger(__name__)

@dataclass
class LogInfo:
Expand Down Expand Up @@ -148,24 +154,12 @@
step += 1
stop_event.wait(1)

def patched_fit_mlflow(original, self: getml.Pipeline, *args, **kwargs):
def patched_fit_mlflow(original, self: getml.Pipeline, *args, **kwargs) -> getml.pipeline.Pipeline:

Check failure on line 157 in mlflow/getml/autologging.py

View workflow job for this annotation

GitHub Actions / lint

Line too long (103 > 100). See https://docs.astral.sh/ruff/rules/E501 for how to fix this error.
autologging_client = MlflowAutologgingQueueingClient()
assert (active_run := mlflow.active_run())
run_id = active_run.info.run_id
pipeline_log_info = _extract_pipeline_informations(self)
# with open("my_dict.json", "w") as f:
# json.dump(pipeline_log_info.params, f)
# mlflow.log_artifact("my_dict.json")
# mlflow.log_dict(pipeline_log_info.params, 'params.json')
autologging_client.log_params(
run_id=run_id,
params=pipeline_log_info.params,
)
if tags := pipeline_log_info.tags:
autologging_client.set_tags(run_id=run_id, tags=tags)

engine_metrics_to_be_tracked = _collect_available_engine_metrics()

engine_metrics_to_be_tracked = _log_pretraining_metadata(autologging_client, self, run_id, *args)

Check failure on line 162 in mlflow/getml/autologging.py

View workflow job for this annotation

GitHub Actions / lint

Line too long (105 > 100). See https://docs.astral.sh/ruff/rules/E501 for how to fix this error.
if engine_metrics_to_be_tracked:
stop_event = threading.Event()
metrics_thread = threading.Thread(
Expand All @@ -174,7 +168,7 @@
)
metrics_thread.start()
else:
print(

Check failure on line 171 in mlflow/getml/autologging.py

View workflow job for this annotation

GitHub Actions / lint

`print` found. See https://docs.astral.sh/ruff/rules/T201 for how to fix this error.
"Engine metrics are not available. Please upgrade to the Enterprise edition. "
)

Expand All @@ -193,7 +187,7 @@
autologging_client.flush(synchronous=True)
return fit_output

def patched_score_method(original, self: getml.Pipeline, *args, **kwargs):
def patched_score_method(original, self: getml.Pipeline, *args, **kwargs) -> getml.pipeline.Scores:

Check failure on line 190 in mlflow/getml/autologging.py

View workflow job for this annotation

GitHub Actions / lint

Line too long (103 > 100). See https://docs.astral.sh/ruff/rules/E501 for how to fix this error.

target = self.data_model.population.roles.target[0]
pop_df = args[0].population.to_pandas()
Expand All @@ -208,9 +202,45 @@
model_type=["regressor" if self.is_regression else "classifier"][0],
evaluators=["default"],
)

return original(self, *args, **kwargs)

def _log_pretraining_metadata(autologging_client: MlflowAutologgingQueueingClient,
self: getml.Pipeline,
run_id: str,
*args
) -> dict:

pipeline_log_info = _extract_pipeline_informations(self)
autologging_client.log_params(
run_id=run_id,
params=pipeline_log_info.params,
)
if tags := pipeline_log_info.tags:
autologging_client.set_tags(run_id=run_id, tags=tags)

engine_metrics_to_be_tracked = _collect_available_engine_metrics()

if log_datasets:
try:
datasets = []
population_dataset: PandasDataset = mlflow.data.from_pandas(args[0].population.to_pandas(), name = args[0].population.base.name)

Check failure on line 226 in mlflow/getml/autologging.py

View workflow job for this annotation

GitHub Actions / lint

Line too long (144 > 100). See https://docs.astral.sh/ruff/rules/E501 for how to fix this error.
tags = [InputTag(key=MLFLOW_DATASET_CONTEXT, value='Population')]
datasets.append(DatasetInput(dataset=population_dataset._to_mlflow_entity(), tags=tags))

Check failure on line 228 in mlflow/getml/autologging.py

View workflow job for this annotation

GitHub Actions / lint

Line too long (104 > 100). See https://docs.astral.sh/ruff/rules/E501 for how to fix this error.

for name, peripheral in args[0].peripheral.items():
tags = [InputTag(key=MLFLOW_DATASET_CONTEXT, value='Peripheral')]
peripheral_dataset: PandasDataset = mlflow.data.from_pandas(peripheral.to_pandas(), name = name)

Check failure on line 232 in mlflow/getml/autologging.py

View workflow job for this annotation

GitHub Actions / lint

Line too long (116 > 100). See https://docs.astral.sh/ruff/rules/E501 for how to fix this error.
datasets.append(DatasetInput(dataset=peripheral_dataset._to_mlflow_entity(), tags=tags))

Check failure on line 233 in mlflow/getml/autologging.py

View workflow job for this annotation

GitHub Actions / lint

Line too long (108 > 100). See https://docs.astral.sh/ruff/rules/E501 for how to fix this error.

autologging_client.log_inputs(
run_id=run_id, datasets=datasets
)

except Exception as e:
_logger.warning(
"Failed to log training dataset information to MLflow Tracking. Reason: %s", e
)
return engine_metrics_to_be_tracked

_patch_pipeline_method(
flavor_name=flavor_name,
Expand Down
Loading