Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DSL refactor #619

Merged
merged 14 commits into from
Jan 9, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 110 additions & 115 deletions sdk/python/kfp/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,125 +38,16 @@ def my_pipeline(a: dsl.PipelineParam, b: dsl.PipelineParam):
```
"""

def _sanitize_name(self, name):
"""From _make_kubernetes_name
_sanitize_name cleans and converts the names in the workflow.
"""
return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-', name.lower())).lstrip('-').rstrip('-')

def _pipelineparam_full_name(self, param):
"""_pipelineparam_full_name
"""_pipelineparam_full_name converts the names of pipeline parameters
to unique names in the argo yaml

Args:
param(PipelineParam): pipeline parameter
"""
if param.op_name:
return param.op_name + '-' + param.name
return self._sanitize_name(param.name)

def _build_conventional_artifact(self, name):
return {
'name': name,
'path': '/' + name + '.json',
's3': {
# TODO: parameterize namespace for minio service
'endpoint': 'minio-service.kubeflow:9000',
'bucket': 'mlpipeline',
'key': 'runs/{{workflow.uid}}/{{pod.name}}/' + name + '.tgz',
'insecure': True,
'accessKeySecret': {
'name': 'mlpipeline-minio-artifact',
'key': 'accesskey',
},
'secretKeySecret': {
'name': 'mlpipeline-minio-artifact',
'key': 'secretkey'
}
},
}

def _op_to_template(self, op):
"""Generate template given an operator inherited from dsl.ContainerOp."""

processed_args = None
if op.arguments:
processed_args = list(map(str, op.arguments))
for i, _ in enumerate(processed_args):
if op.argument_inputs:
for param in op.argument_inputs:
full_name = self._pipelineparam_full_name(param)
processed_args[i] = re.sub(str(param), '{{inputs.parameters.%s}}' % full_name,
processed_args[i])
input_parameters = []
for param in op.inputs:
one_parameter = {'name': self._pipelineparam_full_name(param)}
if param.value:
one_parameter['value'] = str(param.value)
input_parameters.append(one_parameter)
# Sort to make the results deterministic.
input_parameters.sort(key=lambda x: x['name'])

output_parameters = []
for param in op.outputs.values():
output_parameters.append({
'name': self._pipelineparam_full_name(param),
'valueFrom': {'path': op.file_outputs[param.name]}
})
output_parameters.sort(key=lambda x: x['name'])

template = {
'name': op.name,
'container': {
'image': op.image,
}
}
if processed_args:
template['container']['args'] = processed_args
if input_parameters:
template['inputs'] = {'parameters': input_parameters}

template['outputs'] = {}
if output_parameters:
template['outputs'] = {'parameters': output_parameters}

# Generate artifact for metadata output
# The motivation of appending the minio info in the yaml
# is to specify a unique path for the metadata.
# TODO: after argo addresses the issue that configures a unique path
# for the artifact output when default artifact repository is configured,
# this part needs to be updated to use the default artifact repository.
output_artifacts = []
output_artifacts.append(self._build_conventional_artifact('mlpipeline-ui-metadata'))
output_artifacts.append(self._build_conventional_artifact('mlpipeline-metrics'))
template['outputs']['artifacts'] = output_artifacts
if op.command:
template['container']['command'] = op.command

# Set resources.
if op.resource_limits or op.resource_requests:
template['container']['resources'] = {}
if op.resource_limits:
template['container']['resources']['limits'] = op.resource_limits
if op.resource_requests:
template['container']['resources']['requests'] = op.resource_requests

# Set nodeSelector.
if op.node_selector:
template['nodeSelector'] = op.node_selector

if op.env_variables:
template['container']['env'] = list(map(K8sHelper.convert_k8s_obj_to_json, op.env_variables))
if op.volume_mounts:
template['container']['volumeMounts'] = list(map(K8sHelper.convert_k8s_obj_to_json, op.volume_mounts))

if op.pod_annotations or op.pod_labels:
template['metadata'] = {}
if op.pod_annotations:
template['metadata']['annotations'] = op.pod_annotations
if op.pod_labels:
template['metadata']['labels'] = op.pod_labels

return template
return dsl._utils._sanitize_k8s_name(param.name)

def _get_groups_for_ops(self, root_group):
"""Helper function to get belonging groups for each op.
Expand Down Expand Up @@ -320,6 +211,110 @@ def _resolve_value_or_reference(self, value_or_reference, potential_references):
else:
return str(value_or_reference)

def _op_to_template(self, op):
"""Generate template given an operator inherited from dsl.ContainerOp."""

def _build_conventional_artifact(name):
return {
'name': name,
'path': '/' + name + '.json',
's3': {
# TODO: parameterize namespace for minio service
'endpoint': 'minio-service.kubeflow:9000',
'bucket': 'mlpipeline',
'key': 'runs/{{workflow.uid}}/{{pod.name}}/' + name + '.tgz',
'insecure': True,
'accessKeySecret': {
'name': 'mlpipeline-minio-artifact',
'key': 'accesskey',
},
'secretKeySecret': {
'name': 'mlpipeline-minio-artifact',
'key': 'secretkey'
}
},
}

processed_args = None
if op.arguments:
processed_args = list(map(str, op.arguments))
for i, _ in enumerate(processed_args):
if op.argument_inputs:
for param in op.argument_inputs:
full_name = self._pipelineparam_full_name(param)
processed_args[i] = re.sub(str(param), '{{inputs.parameters.%s}}' % full_name,
processed_args[i])
input_parameters = []
for param in op.inputs:
one_parameter = {'name': self._pipelineparam_full_name(param)}
if param.value:
one_parameter['value'] = str(param.value)
input_parameters.append(one_parameter)
# Sort to make the results deterministic.
input_parameters.sort(key=lambda x: x['name'])

output_parameters = []
for param in op.outputs.values():
output_parameters.append({
'name': self._pipelineparam_full_name(param),
'valueFrom': {'path': op.file_outputs[param.name]}
})
output_parameters.sort(key=lambda x: x['name'])

template = {
'name': op.name,
'container': {
'image': op.image,
}
}
if processed_args:
template['container']['args'] = processed_args
if input_parameters:
template['inputs'] = {'parameters': input_parameters}

template['outputs'] = {}
if output_parameters:
template['outputs'] = {'parameters': output_parameters}

# Generate artifact for metadata output
# The motivation of appending the minio info in the yaml
# is to specify a unique path for the metadata.
# TODO: after argo addresses the issue that configures a unique path
# for the artifact output when default artifact repository is configured,
# this part needs to be updated to use the default artifact repository.
output_artifacts = []
output_artifacts.append(_build_conventional_artifact('mlpipeline-ui-metadata'))
output_artifacts.append(_build_conventional_artifact('mlpipeline-metrics'))
template['outputs']['artifacts'] = output_artifacts
if op.command:
template['container']['command'] = op.command

# Set resources.
if op.resource_limits or op.resource_requests:
template['container']['resources'] = {}
if op.resource_limits:
template['container']['resources']['limits'] = op.resource_limits
if op.resource_requests:
template['container']['resources']['requests'] = op.resource_requests

# Set nodeSelector.
if op.node_selector:
template['nodeSelector'] = op.node_selector

if op.env_variables:
template['container']['env'] = list(map(K8sHelper.convert_k8s_obj_to_json, op.env_variables))
if op.volume_mounts:
template['container']['volumeMounts'] = list(map(K8sHelper.convert_k8s_obj_to_json, op.volume_mounts))

if op.pod_annotations or op.pod_labels:
template['metadata'] = {}
if op.pod_annotations:
template['metadata']['annotations'] = op.pod_annotations
if op.pod_labels:
template['metadata']['labels'] = op.pod_labels

return template

def _group_to_template(self, group, inputs, outputs, dependencies):
"""Generate template given an OpsGroup.

Expand Down Expand Up @@ -498,10 +493,10 @@ def _compile(self, pipeline_func):
raise ValueError('Please use a function with @dsl.pipeline decorator.')

pipeline_name, _ = dsl.Pipeline.get_pipeline_functions()[pipeline_func]
pipeline_name = self._sanitize_name(pipeline_name)
pipeline_name = dsl._utils._sanitize_k8s_name(pipeline_name)

# Create the arg list with no default values and call pipeline function.
args_list = [dsl.PipelineParam(self._sanitize_name(arg_name))
args_list = [dsl.PipelineParam(dsl._utils._sanitize_k8s_name(arg_name))
for arg_name in argspec.args]
with dsl.Pipeline(pipeline_name) as p:
pipeline_func(*args_list)
Expand All @@ -510,7 +505,7 @@ def _compile(self, pipeline_func):
self._validate_exit_handler(p)

# Fill in the default values.
args_list_with_defaults = [dsl.PipelineParam(self._sanitize_name(arg_name))
args_list_with_defaults = [dsl.PipelineParam(dsl._utils._sanitize_k8s_name(arg_name))
for arg_name in argspec.args]
if argspec.defaults:
for arg, default in zip(reversed(args_list_with_defaults), reversed(argspec.defaults)):
Expand Down
3 changes: 2 additions & 1 deletion sdk/python/kfp/dsl/_container_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def __init__(self, name: str, image: str, command: str=None, arguments: str=None
"""Create a new instance of ContainerOp.

Args:
name: the name of the op. Has to be unique within a pipeline.
name: the name of the op. It does not have to be unique within a pipeline
because the pipeline will generates a unique new name in case of conflicts.
image: the container image name, such as 'python:3.5-jessie'
command: the command to run in the container.
If None, uses default CMD in defined in container.
Expand Down
7 changes: 2 additions & 5 deletions sdk/python/kfp/dsl/_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from . import _container_op
from . import _ops_group
import re
from ._utils import _sanitize_k8s_name
import sys


Expand All @@ -38,9 +38,6 @@ def _pipeline(func):

return _pipeline

def _make_kubernetes_name(name):
return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-', name.lower())).lstrip('-').rstrip('-')

class Pipeline():
"""A pipeline contains a list of operators.

Expand Down Expand Up @@ -108,7 +105,7 @@ def add_op(self, op: _container_op.ContainerOp, define_only: bool):
op: An operator of ContainerOp or its inherited type.
"""

kubernetes_name = _make_kubernetes_name(op.human_name)
kubernetes_name = _sanitize_k8s_name(op.human_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed the previous change, so I am adding my thoughts here: one design goal is to hide k8s as much as possible in "dsl" layer, and push the k8s stuff to compiler (I used to call it "argo compiler). This way the DSL layer is more generic, and that's why there are "dsl" and "compiler" separate directories.

I feel like we don't have to sanitize the pipeline name here; We can store it as it is (so respect user's choice) and in compiler https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/compiler/compiler.py#L457 we can sanitize there. That way, we can move the util to compiler since it is very k8s specific.

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm all for the DSL hiding k8s. However, the pipeline sanitizes the name for operators which will be part of the final argo yaml. (in Pipeline.add_op() function).
I can sanitize the op names all in the compiler codes, though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes. Sorry I mixed pipeline name with step name. I think we can sanitize the name in compiler too as you mentioned.

That way, a different "compiler" may choose to sanitize it in a different way, or even not sanitize it at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@Ark-kun Ark-kun Jan 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Pipeline class (do not confuse with the @pipeline decorator) is a compiler helper class. It can probably moved to DSL compiler. It's only used during the compilation and it makes sense to sanitize the names/ids at this point. The original name remains untouched in op.human_name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Pipeline class is used in the @pipeline decorator and it might not be a good idea to move the Pipeline class to the Compiler because the dsl would depend on the compiler otherwise. Then there would be a dependency loop since the compiler depends on the dsl library. Right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Pipeline class is used in the @pipeline decorator

This is an implementation detail. We've talked with @qimingj and AFAIK he agreed that it was a good idea to unlink them like we did with @python_component. I've prepared the code for the most common case, but there was a slight problem for multi-pipeline file compilation. If we deprecate that feature (it's not currently used anywhere) we can unlink them easier.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That way, a different "compiler" may choose to sanitize it in a different way, or even not sanitize it at all.

I agree with that. That's why I moved the sanitization code from the ContainerOp to the compiler-relater Pipeline class. As you remember, I tried to detangle the ContainerOp from the Pipeline even more, by making it not required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a proposal: Let's break the dependency between the DSL classes and the compiler by adding generic events/hooks for events like ContainerOp creation or @pipeline application. The compiler can set the handlers for those hooks to execute some compiler-specific code. This way, the DSL does not depend on the compiler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ark-kun can you send a separate change and we can discuss more details? I feel it should be a separate change from this one. Small changes are more manageable.

The reason we had a "Pipeline" class are: 1) we need a "scope" that can record all ContainerOp objects. Hence the global Pipeline._default_pipeline variable. 2) Someday we can expose the Pipeline class to support dynamic pipeline construction in DSL (we don't expose that now because we want to limit the surfacing area and promote pipeline function).

Events/hooks is one option. We can compare the approaches by 1) Keep DSL as simple as it is now.
2) Favor simplicity for compiler provider. 3) Hopefully reduce or remove global variables.

step_id = kubernetes_name
#If there is an existing op with this name then generate a new name.
if step_id in self.ops:
Expand Down
20 changes: 20 additions & 0 deletions sdk/python/kfp/dsl/_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import re
def _sanitize_k8s_name(name):
"""From _make_kubernetes_name
_sanitize_name cleans and converts the names in the workflow.
"""
return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-', name.lower())).lstrip('-').rstrip('-')