Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sdk): visualizations and metrics do not work with data_passing_methods #6882

Merged
merged 8 commits into from
Nov 16, 2021
7 changes: 6 additions & 1 deletion sdk/python/kfp/compiler/_data_passing_using_volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,12 @@ def convert_artifact_reference_to_parameter_reference(
'name': subpath_parameter_name,
'value': output_subpath, # Requires Argo 2.3.0+
})
template.get('outputs', {}).pop('artifacts', None)
whitelist = ['mlpipeline-ui-metadata', 'mlpipeline-metrics']
output_artifacts = [artifact for artifact in output_artifacts if artifact['name'] in whitelist]
if not output_artifacts:
template.get('outputs', {}).pop('artifacts', None)
else:
template.get('outputs', {}).update({'artifacts': output_artifacts})

# Rewrite DAG templates
for template in templates:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from pathlib import Path

import kfp
from kfp.components import load_component_from_file
from kfp.components import load_component_from_file, create_component_from_func
from typing import NamedTuple

test_data_dir = Path(__file__).parent / 'test_data'
producer_op = load_component_from_file(
Expand All @@ -11,6 +12,33 @@
consumer_op = load_component_from_file(
str(test_data_dir / 'consume_2.component.yaml'))

def metadata_and_metrics() -> NamedTuple(
"Outputs",
[("mlpipeline_ui_metadata", "UI_metadata"), ("mlpipeline_metrics", "Metrics")],
):
metadata = {
"outputs": [
{"storage": "inline", "source": "*this should be bold*", "type": "markdown"}
]
}
metrics = {
"metrics": [
{
"name": "train-accuracy",
"numberValue": 0.9,
},
{
"name": "test-accuracy",
"numberValue": 0.7,
},
]
}
from collections import namedtuple
import json

return namedtuple("output", ["mlpipeline_ui_metadata", "mlpipeline_metrics"])(
json.dumps(metadata), json.dumps(metrics)
)

@kfp.dsl.pipeline()
def artifact_passing_pipeline():
Expand All @@ -20,6 +48,7 @@ def artifact_passing_pipeline():
consumer_task = consumer_op(processor_task.outputs['output_1'],
processor_task.outputs['output_2'])

markdown_task = create_component_from_func(func=metadata_and_metrics)()
# This line is only needed for compiling using dsl-compile to work
kfp.dsl.get_pipeline_conf(
).data_passing_method = volume_based_data_passing_method
Expand Down
265 changes: 179 additions & 86 deletions sdk/python/tests/compiler/testdata/artifact_passing_using_volume.yaml
Original file line number Diff line number Diff line change
@@ -1,83 +1,159 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"name": "artifact_passing_pipeline"}'
generateName: artifact-passing-pipeline-
annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.9, pipelines.kubeflow.org/pipeline_compilation_time: '2021-11-15T11:23:42.469722',
pipelines.kubeflow.org/pipeline_spec: '{"name": "Artifact passing pipeline"}'}
labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.9}
spec:
arguments:
parameters: []
entrypoint: artifact-passing-pipeline
serviceAccountName: pipeline-runner
templates:
- name: artifact-passing-pipeline
dag:
tasks:
- name: consumer
template: consumer
dependencies: [processor]
arguments:
parameters:
- name: processor-Output-1
value: '{{tasks.processor.outputs.parameters.processor-Output-1}}'
- name: processor-Output-2-subpath
value: '{{tasks.processor.outputs.parameters.processor-Output-2-subpath}}'
dependencies:
- processor
- {name: processor-Output-1, value: '{{tasks.processor.outputs.parameters.processor-Output-1}}'}
- {name: processor-Output-2-subpath, value: '{{tasks.processor.outputs.parameters.processor-Output-2-subpath}}'}
- {name: metadata-and-metrics, template: metadata-and-metrics}
- name: processor
template: processor
dependencies: [producer]
arguments:
parameters:
- name: producer-Output-1
value: '{{tasks.producer.outputs.parameters.producer-Output-1}}'
- name: producer-Output-2-subpath
value: '{{tasks.producer.outputs.parameters.producer-Output-2-subpath}}'
dependencies:
- producer
- name: producer
template: producer
- {name: producer-Output-1, value: '{{tasks.producer.outputs.parameters.producer-Output-1}}'}
- {name: producer-Output-2-subpath, value: '{{tasks.producer.outputs.parameters.producer-Output-2-subpath}}'}
- {name: producer, template: producer}
- name: consumer
metadata:
annotations:
pipelines.kubeflow.org/component_spec: '{"inputs": [{"name": "Input parameter"}, {"name": "Input artifact"}], "name": "Consumer"}'
inputs:
parameters:
- name: processor-Output-1
- name: processor-Output-2-subpath
container:
image: alpine
args: ['{{inputs.parameters.processor-Output-1}}', /tmp/inputs/Input_artifact/data]
command:
- sh
- -c
- |
echo "Input parameter = $0"
echo "Input artifact = " && cat "$1"
args:
- '{{inputs.parameters.processor-Output-1}}'
- /tmp/inputs/Input_artifact/data
image: alpine
volumeMounts:
- name: data-storage
mountPath: /tmp/inputs/Input_artifact
readOnly: true
subPath: '{{inputs.parameters.processor-Output-2-subpath}}'
- name: processor
metadata:
annotations:
pipelines.kubeflow.org/component_spec: '{"inputs": [{"name": "Input parameter"}, {"name": "Input artifact"}], "name": "Processor", "outputs": [{"name": "Output 1"}, {"name": "Output 2"}]}'
- {mountPath: /tmp/inputs/Input_artifact, name: data-storage, subPath: '{{inputs.parameters.processor-Output-2-subpath}}',
readOnly: true}
inputs:
parameters:
- name: producer-Output-1
- name: producer-Output-2-subpath
- {name: processor-Output-1}
- {name: processor-Output-2-subpath}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.9
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"args": [{"inputValue": "Input parameter"}, {"inputPath": "Input artifact"}],
"command": ["sh", "-c", "echo \"Input parameter = $0\"\necho \"Input artifact
= \" && cat \"$1\"\n"], "image": "alpine"}}, "inputs": [{"name": "Input
parameter"}, {"name": "Input artifact"}], "name": "Consumer"}', pipelines.kubeflow.org/component_ref: '{"digest":
"1a8ea3c29c7853bf63d9b4fbd76a66b273621d2229c3cfe08ed68620ebf02982", "url":
"testdata/test_data/consume_2.component.yaml"}',
pipelines.kubeflow.org/arguments.parameters: '{"Input parameter": "{{inputs.parameters.processor-Output-1}}"}'}
- name: metadata-and-metrics
container:
args: ['----output-paths', /tmp/outputs/mlpipeline_ui_metadata/data, /tmp/outputs/mlpipeline_metrics/data]
command:
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def metadata_and_metrics():
metadata = {
"outputs": [
{"storage": "inline", "source": "*this should be bold*", "type": "markdown"}
]
}
metrics = {
"metrics": [
{
"name": "train-accuracy",
"numberValue": 0.9,
},
{
"name": "test-accuracy",
"numberValue": 0.7,
},
]
}
from collections import namedtuple
import json
return namedtuple("output", ["mlpipeline_ui_metadata", "mlpipeline_metrics"])(
json.dumps(metadata), json.dumps(metrics)
)
import argparse
_parser = argparse.ArgumentParser(prog='Metadata and metrics', description='')
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=2)
_parsed_args = vars(_parser.parse_args())
_output_files = _parsed_args.pop("_output_paths", [])
_outputs = metadata_and_metrics(**_parsed_args)
_output_serializers = [
str,
str,
]
import os
for idx, output_file in enumerate(_output_files):
try:
os.makedirs(os.path.dirname(output_file))
except OSError:
pass
with open(output_file, 'w') as f:
f.write(_output_serializers[idx](_outputs[idx]))
image: python:3.7
volumeMounts:
- {mountPath: /tmp/outputs/mlpipeline_ui_metadata, name: data-storage, subPath: 'artifact_data/{{workflow.uid}}_{{pod.name}}/mlpipeline-ui-metadata'}
- {mountPath: /tmp/outputs/mlpipeline_metrics, name: data-storage, subPath: 'artifact_data/{{workflow.uid}}_{{pod.name}}/mlpipeline-metrics'}
outputs:
parameters:
- name: processor-Output-1
valueFrom:
path: /tmp/outputs/Output_1/data
- name: processor-Output-1-subpath
value: artifact_data/{{workflow.uid}}_{{pod.name}}/processor-Output-1
- name: processor-Output-2-subpath
value: artifact_data/{{workflow.uid}}_{{pod.name}}/processor-Output-2
- {name: mlpipeline-ui-metadata-subpath, value: 'artifact_data/{{workflow.uid}}_{{pod.name}}/mlpipeline-ui-metadata'}
- {name: mlpipeline-metrics-subpath, value: 'artifact_data/{{workflow.uid}}_{{pod.name}}/mlpipeline-metrics'}
artifacts:
- {name: mlpipeline-ui-metadata, path: /tmp/outputs/mlpipeline_ui_metadata/data}
- {name: mlpipeline-metrics, path: /tmp/outputs/mlpipeline_metrics/data}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.9
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"args": ["----output-paths", {"outputPath": "mlpipeline_ui_metadata"},
{"outputPath": "mlpipeline_metrics"}], "command": ["sh", "-ec", "program_path=$(mktemp)\nprintf
\"%s\" \"$0\" > \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n",
"def metadata_and_metrics():\n metadata = {\n \"outputs\": [\n {\"storage\":
\"inline\", \"source\": \"*this should be bold*\", \"type\": \"markdown\"}\n ]\n }\n metrics
= {\n \"metrics\": [\n {\n \"name\": \"train-accuracy\",\n \"numberValue\":
0.9,\n },\n {\n \"name\": \"test-accuracy\",\n \"numberValue\":
0.7,\n },\n ]\n }\n from collections import namedtuple\n import
json\n\n return namedtuple(\"output\", [\"mlpipeline_ui_metadata\", \"mlpipeline_metrics\"])(\n json.dumps(metadata),
json.dumps(metrics)\n )\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Metadata
and metrics'', description='''')\n_parser.add_argument(\"----output-paths\",
dest=\"_output_paths\", type=str, nargs=2)\n_parsed_args = vars(_parser.parse_args())\n_output_files
= _parsed_args.pop(\"_output_paths\", [])\n\n_outputs = metadata_and_metrics(**_parsed_args)\n\n_output_serializers
= [\n str,\n str,\n\n]\n\nimport os\nfor idx, output_file in enumerate(_output_files):\n try:\n os.makedirs(os.path.dirname(output_file))\n except
OSError:\n pass\n with open(output_file, ''w'') as f:\n f.write(_output_serializers[idx](_outputs[idx]))\n"],
"image": "python:3.7"}}, "name": "Metadata and metrics", "outputs": [{"name":
"mlpipeline_ui_metadata", "type": "UI_metadata"}, {"name": "mlpipeline_metrics",
"type": "Metrics"}]}', pipelines.kubeflow.org/component_ref: '{}'}
- name: processor
container:
image: alpine
args: ['{{inputs.parameters.producer-Output-1}}', /tmp/inputs/Input_artifact/data,
/tmp/outputs/Output_1/data, /tmp/outputs/Output_2/data]
command:
- sh
- -c
Expand All @@ -86,37 +162,40 @@ spec:
mkdir -p "$(dirname "$3")"
echo "$0" > "$2"
cp "$1" "$3"
args:
- '{{inputs.parameters.producer-Output-1}}'
- /tmp/inputs/Input_artifact/data
- /tmp/outputs/Output_1/data
- /tmp/outputs/Output_2/data
image: alpine
volumeMounts:
- mountPath: /tmp/inputs/Input_artifact
name: data-storage
readOnly: true
subPath: '{{inputs.parameters.producer-Output-2-subpath}}'
- mountPath: /tmp/outputs/Output_1
name: data-storage
subPath: artifact_data/{{workflow.uid}}_{{pod.name}}/processor-Output-1
- mountPath: /tmp/outputs/Output_2
name: data-storage
subPath: artifact_data/{{workflow.uid}}_{{pod.name}}/processor-Output-2
- name: producer
metadata:
annotations:
pipelines.kubeflow.org/component_spec: '{"name": "Producer", "outputs": [{"name": "Output 1"}, {"name": "Output 2"}]}'
- {mountPath: /tmp/inputs/Input_artifact, name: data-storage, subPath: '{{inputs.parameters.producer-Output-2-subpath}}',
readOnly: true}
- {mountPath: /tmp/outputs/Output_1, name: data-storage, subPath: 'artifact_data/{{workflow.uid}}_{{pod.name}}/processor-Output-1'}
- {mountPath: /tmp/outputs/Output_2, name: data-storage, subPath: 'artifact_data/{{workflow.uid}}_{{pod.name}}/processor-Output-2'}
inputs:
parameters:
- {name: producer-Output-1}
- {name: producer-Output-2-subpath}
outputs:
parameters:
- name: producer-Output-1
valueFrom:
path: /tmp/outputs/Output_1/data
- name: producer-Output-1-subpath
value: artifact_data/{{workflow.uid}}_{{pod.name}}/producer-Output-1
- name: producer-Output-2-subpath
value: artifact_data/{{workflow.uid}}_{{pod.name}}/producer-Output-2
- name: processor-Output-1
valueFrom: {path: /tmp/outputs/Output_1/data}
- {name: processor-Output-1-subpath, value: 'artifact_data/{{workflow.uid}}_{{pod.name}}/processor-Output-1'}
- {name: processor-Output-2-subpath, value: 'artifact_data/{{workflow.uid}}_{{pod.name}}/processor-Output-2'}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.9
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"args": [{"inputValue": "Input parameter"}, {"inputPath": "Input artifact"},
{"outputPath": "Output 1"}, {"outputPath": "Output 2"}], "command": ["sh",
"-c", "mkdir -p \"$(dirname \"$2\")\"\nmkdir -p \"$(dirname \"$3\")\"\necho
\"$0\" > \"$2\"\ncp \"$1\" \"$3\"\n"], "image": "alpine"}}, "inputs": [{"name":
"Input parameter"}, {"name": "Input artifact"}], "name": "Processor", "outputs":
[{"name": "Output 1"}, {"name": "Output 2"}]}', pipelines.kubeflow.org/component_ref: '{"digest":
"f11a277f5c5cbc27a2e2cda412547b607671d88a4e7aa8a1665dadb836b592b3", "url":
"testdata/test_data/process_2_2.component.yaml"}',
pipelines.kubeflow.org/arguments.parameters: '{"Input parameter": "{{inputs.parameters.producer-Output-1}}"}'}
- name: producer
container:
image: alpine
args: [/tmp/outputs/Output_1/data, /tmp/outputs/Output_2/data]
command:
- sh
- -c
Expand All @@ -125,17 +204,31 @@ spec:
mkdir -p "$(dirname "$1")"
echo "Data 1" > $0
echo "Data 2" > $1
args:
- /tmp/outputs/Output_1/data
- /tmp/outputs/Output_2/data
image: alpine
volumeMounts:
- mountPath: /tmp/outputs/Output_1
name: data-storage
subPath: artifact_data/{{workflow.uid}}_{{pod.name}}/producer-Output-1
- mountPath: /tmp/outputs/Output_2
name: data-storage
subPath: artifact_data/{{workflow.uid}}_{{pod.name}}/producer-Output-2
- {mountPath: /tmp/outputs/Output_1, name: data-storage, subPath: 'artifact_data/{{workflow.uid}}_{{pod.name}}/producer-Output-1'}
- {mountPath: /tmp/outputs/Output_2, name: data-storage, subPath: 'artifact_data/{{workflow.uid}}_{{pod.name}}/producer-Output-2'}
outputs:
parameters:
- name: producer-Output-1
valueFrom: {path: /tmp/outputs/Output_1/data}
- {name: producer-Output-1-subpath, value: 'artifact_data/{{workflow.uid}}_{{pod.name}}/producer-Output-1'}
- {name: producer-Output-2-subpath, value: 'artifact_data/{{workflow.uid}}_{{pod.name}}/producer-Output-2'}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.9
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"args": [{"outputPath": "Output 1"}, {"outputPath": "Output 2"}], "command":
["sh", "-c", "mkdir -p \"$(dirname \"$0\")\"\nmkdir -p \"$(dirname \"$1\")\"\necho
\"Data 1\" > $0\necho \"Data 2\" > $1\n"], "image": "alpine"}}, "name":
"Producer", "outputs": [{"name": "Output 1"}, {"name": "Output 2"}]}', pipelines.kubeflow.org/component_ref: '{"digest":
"7399eb54ee94a95708fa9f8a47330a39258b22a319a48458e14a63dcedb87ea4", "url":
"testdata/test_data/produce_2.component.yaml"}'}
arguments:
parameters: []
serviceAccountName: pipeline-runner
volumes:
- name: data-storage
persistentVolumeClaim:
claimName: data-volume
persistentVolumeClaim: {claimName: data-volume}