Skip to content
This repository has been archived by the owner on Dec 19, 2024. It is now read-only.

MLFlow: Lookup client_id and host_id from Kubernetes env variable #133

Merged
merged 13 commits into from
Feb 1, 2021
55 changes: 30 additions & 25 deletions datasetinsights/io/tracker/factory.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
import threading

from mlflow.exceptions import MlflowException

from datasetinsights.io.exceptions import InvalidTrackerError
from datasetinsights.io.tracker.mlflow import MLFlowTracker

Expand All @@ -11,12 +13,11 @@ class TrackerFactory:
"""Factory: responsible for creating and holding singleton instance
of tracker classes"""

TRACKER = "tracker"
HOST_ID = "host"
MLFLOW_TRACKER = "mlflow"
__singleton_lock = threading.Lock()
__tracker_instance = None
RUN_FAILED = "FAILED"
TRACKER = "tracker"

@staticmethod
def create(config=None, tracker_type=None):
Expand All @@ -31,22 +32,30 @@ def create(config=None, tracker_type=None):
"""
if TrackerFactory.MLFLOW_TRACKER == tracker_type:

tracker = config.get(TrackerFactory.TRACKER, None)
if tracker and tracker.get(TrackerFactory.MLFLOW_TRACKER, None):
mlflow_config = tracker.get(TrackerFactory.MLFLOW_TRACKER)
if mlflow_config.get(TrackerFactory.HOST_ID, None):
try:
mlf_tracker = TrackerFactory._mlflow_tracker_instance(
mlflow_config
).get_mlflow()
logger.info("initializing mlflow_tracker")
return mlf_tracker
except Exception as e:
logger.warning(
"failed mlflow initialization, "
"starting null_tracker",
e,
)
try:
tracker = config.get(TrackerFactory.TRACKER, None)
if tracker and tracker.get(TrackerFactory.MLFLOW_TRACKER, None):
mlflow_config = tracker.get(TrackerFactory.MLFLOW_TRACKER)

mlf_tracker = TrackerFactory._mlflow_tracker_instance(
**mlflow_config
).get_mlflow()
else:
mlf_tracker = (
TrackerFactory._mlflow_tracker_instance().get_mlflow()
)
logger.info("initializing mlflow_tracker")
return mlf_tracker
except ValueError as e:
logger.warning(
"failed mlflow initialization, " "Host is not configured", e
)
except MlflowException as e:
logger.warning("failed mlflow initialization,", e)
except Exception as e:
logger.warning(
"failed mlflow initialization, " "starting null_tracker", e
)
86sanj marked this conversation as resolved.
Show resolved Hide resolved

logger.info("initializing null_tracker")
return TrackerFactory._null_tracker()
Expand All @@ -55,23 +64,19 @@ def create(config=None, tracker_type=None):
raise InvalidTrackerError

@staticmethod
def _mlflow_tracker_instance(mlflow_config):
def _mlflow_tracker_instance(**kwargs):

"""Static instance access method.

Args:
host_id: MlTracker server host
client_id: MLFlow tracking server client id
exp_name: name of the experiment
kwargs : key-value pairs of mlflow parameters
Returns:
tracker singleton instance.
"""
if not TrackerFactory.__tracker_instance:
with TrackerFactory.__singleton_lock:
if not TrackerFactory.__tracker_instance:
TrackerFactory.__tracker_instance = MLFlowTracker(
mlflow_config
)
TrackerFactory.__tracker_instance = MLFlowTracker(**kwargs)
logger.info("getting tracker instance")
return TrackerFactory.__tracker_instance

Expand Down
82 changes: 60 additions & 22 deletions datasetinsights/io/tracker/mlflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,22 @@


class MLFlowTracker:
""" MlFlow tracker class, responsible for setting host, client_id and return
""" MlFlow tracker class:
Responsible for setting host, client_id and return
initialized mlflow. It also refreshes the access token through daemon
thread.
thread. To start mlflow, host is required either through config YAML or
Kubernetes secrets.
Possible cases:
Case 1: Host and client_id are not configured:
Null tracker will be initiated.
Case 2: Host and client_id both are configured:
mlflow will initiate background thread to refresh token and
will start mlflow tracker.
Case 3: Only Host id is configured and client_id is None:
mlflow tracker will start without initiating background thread.
Order of lookup:
If host and client_id are configured in YAML then that
will be used else it will lookup in Kubernetes env variable.
Examples:
# Set MLTracking server UI, default is local file
>>> mlflow.set_tracking_uri(TRACKING_URI)
Expand All @@ -31,6 +44,13 @@ class MLFlowTracker:
>>> mlflow.log_artifact("output.txt", "run1/output/")
# ends the run launched under the current experiment
>>> mlflow.end_run()
YAML_Config:
>>> tracker:
>>> mlflow:
>>> experiment:
>>> run:
>>> client_id:
>>> host:
Attributes:
REFRESH_INTERVAL: default refresh token interval
__mlflow: holds initialized mlflow
Expand All @@ -40,37 +60,49 @@ class MLFlowTracker:
3000 # every 50 minutes refresh token. Token expires in 1 hour
)
__mlflow = None
CLIENT_ID = "client_id"
HOST_ID = "host"
EXP_NAME = "experiment"
RUN_NAME = "run"
DEFAULT_RUN_NAME = "run-" + TIMESTAMP_SUFFIX
DEFAULT_RUN = "run-" + TIMESTAMP_SUFFIX
DEFAULT_EXP = "datasetinsights"

def __init__(self, mlflow_config):
def __init__(
self,
*,
client_id=None,
host=None,
86sanj marked this conversation as resolved.
Show resolved Hide resolved
run=DEFAULT_RUN,
experiment=DEFAULT_EXP,
):
"""constructor.
Args:
mlflow_config:map of mlflow configuration
client_id(str, optional): MLFlow tracking server client id
host(str, optional): MLFlow tracking server host name
run(str, optional): MLFlow tracking run name
experiment(str, optional): MLFlow tracking experiment name
Raises:
ValueError: If `host_id` is not available in both YAML config
and env variable.
"""
host_id = mlflow_config.get(MLFlowTracker.HOST_ID)
client_id = mlflow_config.get(MLFlowTracker.CLIENT_ID, None)
exp_name = mlflow_config.get(MLFlowTracker.EXP_NAME, None)
run_name = mlflow_config.get(MLFlowTracker.RUN_NAME, None)
if not run_name:
run_name = MLFlowTracker.DEFAULT_RUN_NAME
logger.info(f"setting default mlflow run name: {run_name}")
host = host or os.environ.get("MLFLOW_HOST_ID", None)
MLFlowTracker._validate(host)
client_id = client_id or os.environ.get("MLFLOW_CLIENT_ID", None)
logger.info(
f"client_id:{client_id} and host_id:{host} connecting to mlflow"
)
if client_id:
_refresh_token(client_id)
thread = RefreshTokenThread(client_id)
thread.daemon = True
thread.start()
mlflow.set_tracking_uri(host_id)
if exp_name:
mlflow.set_experiment(experiment_name=exp_name)
logger.info(f"setting mlflow experiment name: {exp_name}")
mlflow.set_tracking_uri(host)
mlflow.set_experiment(experiment_name=experiment)
experiment_id = mlflow.get_experiment_by_name(experiment).experiment_id
logger.info(
f"Starting mlflow: experiment name: {experiment} "
f"and experiment id: {experiment_id}"
)

self.__mlflow = mlflow
self.__mlflow.start_run(run_name=run_name)
logger.info("instantiated mlflow")
active_run = self.__mlflow.start_run(run_name=run)
logger.info(f"Instantiated mlflow: run id: {active_run.info.run_id}")

def get_mlflow(self):
""" method to access initialized mlflow
Expand All @@ -80,6 +112,12 @@ def get_mlflow(self):
logger.info("get mlflow")
return self.__mlflow

@staticmethod
def _validate(host):
if not host:
logger.warning(f"host_id not found")
raise ValueError("host_id not configured")


class RefreshTokenThread(threading.Thread):
""" Its service thread which keeps running till main thread runs
Expand Down
8 changes: 7 additions & 1 deletion kubeflow/compiled/evaluate_the_model.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: evaluate-the-model-
annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.0.1, pipelines.kubeflow.org/pipeline_compilation_time: '2020-11-04T17:16:55.837308',
annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.0.1, pipelines.kubeflow.org/pipeline_compilation_time: '2021-01-04T15:53:39.220225',
pipelines.kubeflow.org/pipeline_spec: '{"description": "Evaluate the model", "inputs":
[{"default": "unitytechnologies/datasetinsights:latest", "name": "docker", "optional":
true, "type": "String"}, {"default": "https://storage.googleapis.com/datasetinsights/data/groceries/v3.zip",
Expand Down Expand Up @@ -47,6 +47,12 @@ spec:
--kfp-ui-metadata-filename=kfp_ui_metadata.json, --kfp-metrics-filename=kfp_metrics.json]
command: [datasetinsights, evaluate]
env:
- name: MLFLOW_HOST_ID
valueFrom:
secretKeyRef: {name: dev-mlflow-secret, key: MLFLOW_HOST_ID}
- name: MLFLOW_CLIENT_ID
valueFrom:
secretKeyRef: {name: dev-mlflow-secret, key: MLFLOW_CLIENT_ID}
- {name: GOOGLE_APPLICATION_CREDENTIALS, value: /secret/gcp-credentials/user-gcp-sa.json}
- {name: CLOUDSDK_AUTH_CREDENTIAL_FILE_OVERRIDE, value: /secret/gcp-credentials/user-gcp-sa.json}
image: '{{inputs.parameters.docker}}'
Expand Down
8 changes: 7 additions & 1 deletion kubeflow/compiled/train_on_real_world_dataset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: train-on-real-world-dataset-
annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.0.1, pipelines.kubeflow.org/pipeline_compilation_time: '2020-11-04T17:16:53.753331',
annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.0.1, pipelines.kubeflow.org/pipeline_compilation_time: '2021-01-04T15:53:36.873123',
pipelines.kubeflow.org/pipeline_spec: '{"description": "Train on real world dataset",
"inputs": [{"default": "unitytechnologies/datasetinsights:latest", "name": "docker",
"optional": true, "type": "String"}, {"default": "https://storage.googleapis.com/datasetinsights/data/groceries/v3.zip",
Expand Down Expand Up @@ -72,6 +72,12 @@ spec:
--kfp-ui-metadata-filename=kfp_ui_metadata.json, '--checkpoint-dir={{inputs.parameters.checkpoint_dir}}']
command: [datasetinsights, train]
env:
- name: MLFLOW_HOST_ID
valueFrom:
secretKeyRef: {name: dev-mlflow-secret, key: MLFLOW_HOST_ID}
- name: MLFLOW_CLIENT_ID
valueFrom:
secretKeyRef: {name: dev-mlflow-secret, key: MLFLOW_CLIENT_ID}
- {name: GOOGLE_APPLICATION_CREDENTIALS, value: /secret/gcp-credentials/user-gcp-sa.json}
- {name: CLOUDSDK_AUTH_CREDENTIAL_FILE_OVERRIDE, value: /secret/gcp-credentials/user-gcp-sa.json}
image: '{{inputs.parameters.docker}}'
Expand Down
8 changes: 7 additions & 1 deletion kubeflow/compiled/train_on_synthdet_sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: train-on-the-synthdet-sample-
annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.0.1, pipelines.kubeflow.org/pipeline_compilation_time: '2020-11-04T17:16:52.999713',
annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.0.1, pipelines.kubeflow.org/pipeline_compilation_time: '2021-01-04T15:53:36.094994',
pipelines.kubeflow.org/pipeline_spec: '{"description": "Train on the SynthDet
sample", "inputs": [{"default": "unitytechnologies/datasetinsights:latest",
"name": "docker", "optional": true, "type": "String"}, {"default": "https://storage.googleapis.com/datasetinsights/data/synthetic/SynthDet.zip",
Expand Down Expand Up @@ -73,6 +73,12 @@ spec:
command: [python, -m, torch.distributed.launch, --nproc_per_node=8, --use_env,
datasetinsights, train]
env:
- name: MLFLOW_HOST_ID
valueFrom:
secretKeyRef: {name: dev-mlflow-secret, key: MLFLOW_HOST_ID}
- name: MLFLOW_CLIENT_ID
valueFrom:
secretKeyRef: {name: dev-mlflow-secret, key: MLFLOW_CLIENT_ID}
- {name: GOOGLE_APPLICATION_CREDENTIALS, value: /secret/gcp-credentials/user-gcp-sa.json}
- {name: CLOUDSDK_AUTH_CREDENTIAL_FILE_OVERRIDE, value: /secret/gcp-credentials/user-gcp-sa.json}
image: '{{inputs.parameters.docker}}'
Expand Down
8 changes: 7 additions & 1 deletion kubeflow/compiled/train_on_synthetic_and_real_dataset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: train-on-synthetic-real-world-dataset-
annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.0.1, pipelines.kubeflow.org/pipeline_compilation_time: '2020-11-04T17:16:54.469730',
annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.0.1, pipelines.kubeflow.org/pipeline_compilation_time: '2021-01-04T15:53:37.650746',
pipelines.kubeflow.org/pipeline_spec: '{"description": "Train on Synthetic + Real
World Dataset", "inputs": [{"default": "unitytechnologies/datasetinsights:latest",
"name": "docker", "optional": true, "type": "String"}, {"default": "https://storage.googleapis.com/datasetinsights/data/groceries/v3.zip",
Expand Down Expand Up @@ -75,6 +75,12 @@ spec:
'--checkpoint-file={{inputs.parameters.checkpoint_file}}']
command: [datasetinsights, train]
env:
- name: MLFLOW_HOST_ID
valueFrom:
secretKeyRef: {name: dev-mlflow-secret, key: MLFLOW_HOST_ID}
- name: MLFLOW_CLIENT_ID
valueFrom:
secretKeyRef: {name: dev-mlflow-secret, key: MLFLOW_CLIENT_ID}
- {name: GOOGLE_APPLICATION_CREDENTIALS, value: /secret/gcp-credentials/user-gcp-sa.json}
- {name: CLOUDSDK_AUTH_CREDENTIAL_FILE_OVERRIDE, value: /secret/gcp-credentials/user-gcp-sa.json}
image: '{{inputs.parameters.docker}}'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: train-on-synthetic-dataset-unity-simulation-
annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.0.1, pipelines.kubeflow.org/pipeline_compilation_time: '2020-11-04T17:16:55.152591',
annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.0.1, pipelines.kubeflow.org/pipeline_compilation_time: '2021-01-04T15:53:38.437907',
pipelines.kubeflow.org/pipeline_spec: '{"description": "Train on synthetic dataset
Unity Simulation", "inputs": [{"default": "unitytechnologies/datasetinsights:latest",
"name": "docker", "optional": true, "type": "String"}, {"default": "<unity-project-id>",
Expand Down Expand Up @@ -79,6 +79,12 @@ spec:
command: [python, -m, torch.distributed.launch, --nproc_per_node=8, --use_env,
datasetinsights, train]
env:
- name: MLFLOW_HOST_ID
valueFrom:
secretKeyRef: {name: dev-mlflow-secret, key: MLFLOW_HOST_ID}
- name: MLFLOW_CLIENT_ID
valueFrom:
secretKeyRef: {name: dev-mlflow-secret, key: MLFLOW_CLIENT_ID}
- {name: GOOGLE_APPLICATION_CREDENTIALS, value: /secret/gcp-credentials/user-gcp-sa.json}
- {name: CLOUDSDK_AUTH_CREDENTIAL_FILE_OVERRIDE, value: /secret/gcp-credentials/user-gcp-sa.json}
image: '{{inputs.parameters.docker}}'
Expand Down
15 changes: 15 additions & 0 deletions kubeflow/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,19 @@
KFP_UI_METADATA_FILENAME = "kfp_ui_metadata.json"
KFP_METRICS_FILENAME = "kfp_metrics.json"

mlflow_host_env = {
"name": "MLFLOW_HOST_ID",
"valueFrom": {
"secretKeyRef": {"name": "dev-mlflow-secret", "key": "MLFLOW_HOST_ID"}
},
}
mlflow_client_env = {
"name": "MLFLOW_CLIENT_ID",
"valueFrom": {
"secretKeyRef": {"name": "dev-mlflow-secret", "key": "MLFLOW_CLIENT_ID"}
},
}


def volume_op(*, volume_size):
""" Create Kubernetes persistant volume to store data.
Expand Down Expand Up @@ -134,6 +147,7 @@ def train_op(
command=command,
arguments=arguments,
pvolumes={DATA_PATH: volume},
container_kwargs={"env": [mlflow_host_env, mlflow_client_env]},
file_outputs={
"mlpipeline-ui-metadata": os.path.join(
KFP_LOG_DIR, KFP_UI_METADATA_FILENAME
Expand Down Expand Up @@ -198,6 +212,7 @@ def evaluate_op(
f"--kfp-ui-metadata-filename={KFP_UI_METADATA_FILENAME}",
f"--kfp-metrics-filename={KFP_METRICS_FILENAME}",
],
container_kwargs={"env": [mlflow_host_env, mlflow_client_env]},
file_outputs={
"mlpipeline-metrics": os.path.join(
KFP_LOG_DIR, KFP_METRICS_FILENAME
Expand Down
6 changes: 3 additions & 3 deletions tests/configs/faster_rcnn_groceries_real_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ val_interval: 1
tracker:
mlflow:
experiment: datasetinsights
run:
client_id:
host:
run: test
client_id: test
host: test
Loading