Skip to content

Commit

Permalink
fix(sdk): visualizations and metrics do not work with data_passing_me…
Browse files Browse the repository at this point in the history
…thods (#6882)

* Update _data_passing_using_volume.py

* Update _data_passing_using_volume.py

* Update _data_passing_using_volume.py

* Update _data_passing_using_volume.py

* Update _data_passing_using_volume.py

* Update artifact_passing_using_volume.yaml

* Update artifact_passing_using_volume.py

* Update _data_passing_using_volume.py
  • Loading branch information
juliusvonkohout authored Nov 16, 2021
1 parent 9ee4534 commit b482ba8
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 88 deletions.
7 changes: 6 additions & 1 deletion sdk/python/kfp/compiler/_data_passing_using_volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,12 @@ def convert_artifact_reference_to_parameter_reference(
'name': subpath_parameter_name,
'value': output_subpath, # Requires Argo 2.3.0+
})
template.get('outputs', {}).pop('artifacts', None)
whitelist = ['mlpipeline-ui-metadata', 'mlpipeline-metrics']
output_artifacts = [artifact for artifact in output_artifacts if artifact['name'] in whitelist]
if not output_artifacts:
template.get('outputs', {}).pop('artifacts', None)
else:
template.get('outputs', {}).update({'artifacts': output_artifacts})

# Rewrite DAG templates
for template in templates:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from pathlib import Path

import kfp
from kfp.components import load_component_from_file
from kfp.components import load_component_from_file, create_component_from_func
from typing import NamedTuple

test_data_dir = Path(__file__).parent / 'test_data'
producer_op = load_component_from_file(
Expand All @@ -11,6 +12,33 @@
consumer_op = load_component_from_file(
str(test_data_dir / 'consume_2.component.yaml'))

def metadata_and_metrics() -> NamedTuple(
"Outputs",
[("mlpipeline_ui_metadata", "UI_metadata"), ("mlpipeline_metrics", "Metrics")],
):
metadata = {
"outputs": [
{"storage": "inline", "source": "*this should be bold*", "type": "markdown"}
]
}
metrics = {
"metrics": [
{
"name": "train-accuracy",
"numberValue": 0.9,
},
{
"name": "test-accuracy",
"numberValue": 0.7,
},
]
}
from collections import namedtuple
import json

return namedtuple("output", ["mlpipeline_ui_metadata", "mlpipeline_metrics"])(
json.dumps(metadata), json.dumps(metrics)
)

@kfp.dsl.pipeline()
def artifact_passing_pipeline():
Expand All @@ -20,6 +48,7 @@ def artifact_passing_pipeline():
consumer_task = consumer_op(processor_task.outputs['output_1'],
processor_task.outputs['output_2'])

markdown_task = create_component_from_func(func=metadata_and_metrics)()
# This line is only needed for compiling using dsl-compile to work
kfp.dsl.get_pipeline_conf(
).data_passing_method = volume_based_data_passing_method
Expand Down
265 changes: 179 additions & 86 deletions sdk/python/tests/compiler/testdata/artifact_passing_using_volume.yaml
Original file line number Diff line number Diff line change
@@ -1,83 +1,159 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"name": "artifact_passing_pipeline"}'
generateName: artifact-passing-pipeline-
annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.9, pipelines.kubeflow.org/pipeline_compilation_time: '2021-11-15T11:23:42.469722',
pipelines.kubeflow.org/pipeline_spec: '{"name": "Artifact passing pipeline"}'}
labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.9}
spec:
arguments:
parameters: []
entrypoint: artifact-passing-pipeline
serviceAccountName: pipeline-runner
templates:
- name: artifact-passing-pipeline
dag:
tasks:
- name: consumer
template: consumer
dependencies: [processor]
arguments:
parameters:
- name: processor-Output-1
value: '{{tasks.processor.outputs.parameters.processor-Output-1}}'
- name: processor-Output-2-subpath
value: '{{tasks.processor.outputs.parameters.processor-Output-2-subpath}}'
dependencies:
- processor
- {name: processor-Output-1, value: '{{tasks.processor.outputs.parameters.processor-Output-1}}'}
- {name: processor-Output-2-subpath, value: '{{tasks.processor.outputs.parameters.processor-Output-2-subpath}}'}
- {name: metadata-and-metrics, template: metadata-and-metrics}
- name: processor
template: processor
dependencies: [producer]
arguments:
parameters:
- name: producer-Output-1
value: '{{tasks.producer.outputs.parameters.producer-Output-1}}'
- name: producer-Output-2-subpath
value: '{{tasks.producer.outputs.parameters.producer-Output-2-subpath}}'
dependencies:
- producer
- name: producer
template: producer
- {name: producer-Output-1, value: '{{tasks.producer.outputs.parameters.producer-Output-1}}'}
- {name: producer-Output-2-subpath, value: '{{tasks.producer.outputs.parameters.producer-Output-2-subpath}}'}
- {name: producer, template: producer}
- name: consumer
metadata:
annotations:
pipelines.kubeflow.org/component_spec: '{"inputs": [{"name": "Input parameter"}, {"name": "Input artifact"}], "name": "Consumer"}'
inputs:
parameters:
- name: processor-Output-1
- name: processor-Output-2-subpath
container:
image: alpine
args: ['{{inputs.parameters.processor-Output-1}}', /tmp/inputs/Input_artifact/data]
command:
- sh
- -c
- |
echo "Input parameter = $0"
echo "Input artifact = " && cat "$1"
args:
- '{{inputs.parameters.processor-Output-1}}'
- /tmp/inputs/Input_artifact/data
image: alpine
volumeMounts:
- name: data-storage
mountPath: /tmp/inputs/Input_artifact
readOnly: true
subPath: '{{inputs.parameters.processor-Output-2-subpath}}'
- name: processor
metadata:
annotations:
pipelines.kubeflow.org/component_spec: '{"inputs": [{"name": "Input parameter"}, {"name": "Input artifact"}], "name": "Processor", "outputs": [{"name": "Output 1"}, {"name": "Output 2"}]}'
- {mountPath: /tmp/inputs/Input_artifact, name: data-storage, subPath: '{{inputs.parameters.processor-Output-2-subpath}}',
readOnly: true}
inputs:
parameters:
- name: producer-Output-1
- name: producer-Output-2-subpath
- {name: processor-Output-1}
- {name: processor-Output-2-subpath}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.9
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"args": [{"inputValue": "Input parameter"}, {"inputPath": "Input artifact"}],
"command": ["sh", "-c", "echo \"Input parameter = $0\"\necho \"Input artifact
= \" && cat \"$1\"\n"], "image": "alpine"}}, "inputs": [{"name": "Input
parameter"}, {"name": "Input artifact"}], "name": "Consumer"}', pipelines.kubeflow.org/component_ref: '{"digest":
"1a8ea3c29c7853bf63d9b4fbd76a66b273621d2229c3cfe08ed68620ebf02982", "url":
"testdata/test_data/consume_2.component.yaml"}',
pipelines.kubeflow.org/arguments.parameters: '{"Input parameter": "{{inputs.parameters.processor-Output-1}}"}'}
- name: metadata-and-metrics
container:
args: ['----output-paths', /tmp/outputs/mlpipeline_ui_metadata/data, /tmp/outputs/mlpipeline_metrics/data]
command:
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def metadata_and_metrics():
metadata = {
"outputs": [
{"storage": "inline", "source": "*this should be bold*", "type": "markdown"}
]
}
metrics = {
"metrics": [
{
"name": "train-accuracy",
"numberValue": 0.9,
},
{
"name": "test-accuracy",
"numberValue": 0.7,
},
]
}
from collections import namedtuple
import json
return namedtuple("output", ["mlpipeline_ui_metadata", "mlpipeline_metrics"])(
json.dumps(metadata), json.dumps(metrics)
)
import argparse
_parser = argparse.ArgumentParser(prog='Metadata and metrics', description='')
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=2)
_parsed_args = vars(_parser.parse_args())
_output_files = _parsed_args.pop("_output_paths", [])
_outputs = metadata_and_metrics(**_parsed_args)
_output_serializers = [
str,
str,
]
import os
for idx, output_file in enumerate(_output_files):
try:
os.makedirs(os.path.dirname(output_file))
except OSError:
pass
with open(output_file, 'w') as f:
f.write(_output_serializers[idx](_outputs[idx]))
image: python:3.7
volumeMounts:
- {mountPath: /tmp/outputs/mlpipeline_ui_metadata, name: data-storage, subPath: 'artifact_data/{{workflow.uid}}_{{pod.name}}/mlpipeline-ui-metadata'}
- {mountPath: /tmp/outputs/mlpipeline_metrics, name: data-storage, subPath: 'artifact_data/{{workflow.uid}}_{{pod.name}}/mlpipeline-metrics'}
outputs:
parameters:
- name: processor-Output-1
valueFrom:
path: /tmp/outputs/Output_1/data
- name: processor-Output-1-subpath
value: artifact_data/{{workflow.uid}}_{{pod.name}}/processor-Output-1
- name: processor-Output-2-subpath
value: artifact_data/{{workflow.uid}}_{{pod.name}}/processor-Output-2
- {name: mlpipeline-ui-metadata-subpath, value: 'artifact_data/{{workflow.uid}}_{{pod.name}}/mlpipeline-ui-metadata'}
- {name: mlpipeline-metrics-subpath, value: 'artifact_data/{{workflow.uid}}_{{pod.name}}/mlpipeline-metrics'}
artifacts:
- {name: mlpipeline-ui-metadata, path: /tmp/outputs/mlpipeline_ui_metadata/data}
- {name: mlpipeline-metrics, path: /tmp/outputs/mlpipeline_metrics/data}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.9
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"args": ["----output-paths", {"outputPath": "mlpipeline_ui_metadata"},
{"outputPath": "mlpipeline_metrics"}], "command": ["sh", "-ec", "program_path=$(mktemp)\nprintf
\"%s\" \"$0\" > \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n",
"def metadata_and_metrics():\n metadata = {\n \"outputs\": [\n {\"storage\":
\"inline\", \"source\": \"*this should be bold*\", \"type\": \"markdown\"}\n ]\n }\n metrics
= {\n \"metrics\": [\n {\n \"name\": \"train-accuracy\",\n \"numberValue\":
0.9,\n },\n {\n \"name\": \"test-accuracy\",\n \"numberValue\":
0.7,\n },\n ]\n }\n from collections import namedtuple\n import
json\n\n return namedtuple(\"output\", [\"mlpipeline_ui_metadata\", \"mlpipeline_metrics\"])(\n json.dumps(metadata),
json.dumps(metrics)\n )\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Metadata
and metrics'', description='''')\n_parser.add_argument(\"----output-paths\",
dest=\"_output_paths\", type=str, nargs=2)\n_parsed_args = vars(_parser.parse_args())\n_output_files
= _parsed_args.pop(\"_output_paths\", [])\n\n_outputs = metadata_and_metrics(**_parsed_args)\n\n_output_serializers
= [\n str,\n str,\n\n]\n\nimport os\nfor idx, output_file in enumerate(_output_files):\n try:\n os.makedirs(os.path.dirname(output_file))\n except
OSError:\n pass\n with open(output_file, ''w'') as f:\n f.write(_output_serializers[idx](_outputs[idx]))\n"],
"image": "python:3.7"}}, "name": "Metadata and metrics", "outputs": [{"name":
"mlpipeline_ui_metadata", "type": "UI_metadata"}, {"name": "mlpipeline_metrics",
"type": "Metrics"}]}', pipelines.kubeflow.org/component_ref: '{}'}
- name: processor
container:
image: alpine
args: ['{{inputs.parameters.producer-Output-1}}', /tmp/inputs/Input_artifact/data,
/tmp/outputs/Output_1/data, /tmp/outputs/Output_2/data]
command:
- sh
- -c
Expand All @@ -86,37 +162,40 @@ spec:
mkdir -p "$(dirname "$3")"
echo "$0" > "$2"
cp "$1" "$3"
args:
- '{{inputs.parameters.producer-Output-1}}'
- /tmp/inputs/Input_artifact/data
- /tmp/outputs/Output_1/data
- /tmp/outputs/Output_2/data
image: alpine
volumeMounts:
- mountPath: /tmp/inputs/Input_artifact
name: data-storage
readOnly: true
subPath: '{{inputs.parameters.producer-Output-2-subpath}}'
- mountPath: /tmp/outputs/Output_1
name: data-storage
subPath: artifact_data/{{workflow.uid}}_{{pod.name}}/processor-Output-1
- mountPath: /tmp/outputs/Output_2
name: data-storage
subPath: artifact_data/{{workflow.uid}}_{{pod.name}}/processor-Output-2
- name: producer
metadata:
annotations:
pipelines.kubeflow.org/component_spec: '{"name": "Producer", "outputs": [{"name": "Output 1"}, {"name": "Output 2"}]}'
- {mountPath: /tmp/inputs/Input_artifact, name: data-storage, subPath: '{{inputs.parameters.producer-Output-2-subpath}}',
readOnly: true}
- {mountPath: /tmp/outputs/Output_1, name: data-storage, subPath: 'artifact_data/{{workflow.uid}}_{{pod.name}}/processor-Output-1'}
- {mountPath: /tmp/outputs/Output_2, name: data-storage, subPath: 'artifact_data/{{workflow.uid}}_{{pod.name}}/processor-Output-2'}
inputs:
parameters:
- {name: producer-Output-1}
- {name: producer-Output-2-subpath}
outputs:
parameters:
- name: producer-Output-1
valueFrom:
path: /tmp/outputs/Output_1/data
- name: producer-Output-1-subpath
value: artifact_data/{{workflow.uid}}_{{pod.name}}/producer-Output-1
- name: producer-Output-2-subpath
value: artifact_data/{{workflow.uid}}_{{pod.name}}/producer-Output-2
- name: processor-Output-1
valueFrom: {path: /tmp/outputs/Output_1/data}
- {name: processor-Output-1-subpath, value: 'artifact_data/{{workflow.uid}}_{{pod.name}}/processor-Output-1'}
- {name: processor-Output-2-subpath, value: 'artifact_data/{{workflow.uid}}_{{pod.name}}/processor-Output-2'}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.9
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"args": [{"inputValue": "Input parameter"}, {"inputPath": "Input artifact"},
{"outputPath": "Output 1"}, {"outputPath": "Output 2"}], "command": ["sh",
"-c", "mkdir -p \"$(dirname \"$2\")\"\nmkdir -p \"$(dirname \"$3\")\"\necho
\"$0\" > \"$2\"\ncp \"$1\" \"$3\"\n"], "image": "alpine"}}, "inputs": [{"name":
"Input parameter"}, {"name": "Input artifact"}], "name": "Processor", "outputs":
[{"name": "Output 1"}, {"name": "Output 2"}]}', pipelines.kubeflow.org/component_ref: '{"digest":
"f11a277f5c5cbc27a2e2cda412547b607671d88a4e7aa8a1665dadb836b592b3", "url":
"testdata/test_data/process_2_2.component.yaml"}',
pipelines.kubeflow.org/arguments.parameters: '{"Input parameter": "{{inputs.parameters.producer-Output-1}}"}'}
- name: producer
container:
image: alpine
args: [/tmp/outputs/Output_1/data, /tmp/outputs/Output_2/data]
command:
- sh
- -c
Expand All @@ -125,17 +204,31 @@ spec:
mkdir -p "$(dirname "$1")"
echo "Data 1" > $0
echo "Data 2" > $1
args:
- /tmp/outputs/Output_1/data
- /tmp/outputs/Output_2/data
image: alpine
volumeMounts:
- mountPath: /tmp/outputs/Output_1
name: data-storage
subPath: artifact_data/{{workflow.uid}}_{{pod.name}}/producer-Output-1
- mountPath: /tmp/outputs/Output_2
name: data-storage
subPath: artifact_data/{{workflow.uid}}_{{pod.name}}/producer-Output-2
- {mountPath: /tmp/outputs/Output_1, name: data-storage, subPath: 'artifact_data/{{workflow.uid}}_{{pod.name}}/producer-Output-1'}
- {mountPath: /tmp/outputs/Output_2, name: data-storage, subPath: 'artifact_data/{{workflow.uid}}_{{pod.name}}/producer-Output-2'}
outputs:
parameters:
- name: producer-Output-1
valueFrom: {path: /tmp/outputs/Output_1/data}
- {name: producer-Output-1-subpath, value: 'artifact_data/{{workflow.uid}}_{{pod.name}}/producer-Output-1'}
- {name: producer-Output-2-subpath, value: 'artifact_data/{{workflow.uid}}_{{pod.name}}/producer-Output-2'}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.9
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"args": [{"outputPath": "Output 1"}, {"outputPath": "Output 2"}], "command":
["sh", "-c", "mkdir -p \"$(dirname \"$0\")\"\nmkdir -p \"$(dirname \"$1\")\"\necho
\"Data 1\" > $0\necho \"Data 2\" > $1\n"], "image": "alpine"}}, "name":
"Producer", "outputs": [{"name": "Output 1"}, {"name": "Output 2"}]}', pipelines.kubeflow.org/component_ref: '{"digest":
"7399eb54ee94a95708fa9f8a47330a39258b22a319a48458e14a63dcedb87ea4", "url":
"testdata/test_data/produce_2.component.yaml"}'}
arguments:
parameters: []
serviceAccountName: pipeline-runner
volumes:
- name: data-storage
persistentVolumeClaim:
claimName: data-volume
persistentVolumeClaim: {claimName: data-volume}

0 comments on commit b482ba8

Please sign in to comment.