Skip to content

Commit

Permalink
More defensive approach to value exporting
Browse files Browse the repository at this point in the history
  • Loading branch information
AleksanderWWW committed Dec 4, 2023
1 parent 561be44 commit 7b4a98a
Showing 1 changed file with 30 additions and 21 deletions.
51 changes: 30 additions & 21 deletions src/neptune_mlflow_exporter/impl/components/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import mlflow
from mlflow.entities import Experiment
from mlflow.entities import Run as MlflowRun
from neptune.utils import stringify_unsupported

from neptune_mlflow_exporter.impl.artifact_strategy import choose_upload_strategy

Expand All @@ -42,39 +43,47 @@ def export_experiment_metadata(neptune_run: NeptuneRun, experiment: Experiment)
neptune_run["experiment/name"] = experiment.name

# https://stackoverflow.com/questions/9744775/how-to-convert-integer-timestamp-into-a-datetime
neptune_run["experiment/creation_time"] = datetime.fromtimestamp(experiment.creation_time / 1e3)
neptune_run["experiment/last_update_time"] = datetime.fromtimestamp(experiment.last_update_time / 1e3)
if experiment.creation_time is not None:
neptune_run["experiment/creation_time"] = datetime.fromtimestamp(experiment.creation_time / 1e3)

if experiment.last_update_time is not None:
neptune_run["experiment/last_update_time"] = datetime.fromtimestamp(experiment.last_update_time / 1e3)

@staticmethod
def export_run_info(neptune_run: NeptuneRun, mlflow_run: MlflowRun) -> None:
info = dict(mlflow_run.info)

info["start_time"] = datetime.fromtimestamp(mlflow_run.info.start_time / 1e3)
info["end_time"] = datetime.fromtimestamp(mlflow_run.info.end_time / 1e3)
if mlflow_run.info.start_time is not None:
info["start_time"] = datetime.fromtimestamp(mlflow_run.info.start_time / 1e3)

if mlflow_run.info.end_time is not None:
info["end_time"] = datetime.fromtimestamp(mlflow_run.info.end_time / 1e3)

neptune_run["run_info"] = info
neptune_run["run_info"] = stringify_unsupported(info)
neptune_run["sys/name"] = info["run_name"]

def export_run_data(self, neptune_run: NeptuneRun, mlflow_run: MlflowRun) -> None:
data_dict = mlflow_run.data.to_dictionary()
metric_keys = data_dict["metrics"].keys()
del data_dict["metrics"]

if "metrics" in data_dict:
metric_keys = data_dict["metrics"].keys()
del data_dict["metrics"]

for key in metric_keys:
metrics = self.mlflow_client.get_metric_history(
run_id=mlflow_run.info.run_id,
key=key,
)
metric_values = list(map(lambda metric: metric.value, metrics))
metric_timestamps = list(
map(lambda metric: metric.timestamp / 1e3 if metric.timestamp else None, metrics)
)
metric_steps = list(map(lambda metric: metric.step, metrics))

neptune_run[f"run_data/metrics/{key}"].extend(
metric_values, steps=metric_steps, timestamps=metric_timestamps
)
neptune_run["run_data"] = data_dict

for key in metric_keys:
metrics = self.mlflow_client.get_metric_history(
run_id=mlflow_run.info.run_id,
key=key,
)
metric_values = list(map(lambda metric: metric.value, metrics))
metric_timestamps = list(map(lambda metric: metric.timestamp / 1e3, metrics))
metric_steps = list(map(lambda metric: metric.step, metrics))

neptune_run[f"run_data/metrics/{key}"].extend(
metric_values, steps=metric_steps, timestamps=metric_timestamps
)

def export_artifacts(
self, neptune_run: NeptuneRun, mlflow_run: MlflowRun, max_artifact_size: int, tracking_uri: Optional[str]
) -> None:
Expand Down

0 comments on commit 7b4a98a

Please sign in to comment.