Skip to content

Commit

Permalink
local pipeline implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
connor-mccarthy committed Jan 24, 2024
1 parent a990446 commit 18dca03
Show file tree
Hide file tree
Showing 15 changed files with 1,436 additions and 48 deletions.
1 change: 1 addition & 0 deletions sdk/RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Current Version (in development)

## Features
* Support local execution of sequential pipelines [\#10423](https://github.com/kubeflow/pipelines/pull/10423)

## Breaking changes

Expand Down
22 changes: 15 additions & 7 deletions sdk/python/kfp/dsl/pipeline_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from kfp.dsl import structures
from kfp.dsl import utils
from kfp.dsl.types import type_utils
from kfp.local import pipeline_orchestrator
from kfp.pipeline_spec import pipeline_spec_pb2

_register_task_handler = lambda task: utils.maybe_rename_for_k8s(
Expand Down Expand Up @@ -190,13 +191,20 @@ def _execute_locally(self, args: Dict[str, Any]) -> None:
from kfp.local import task_dispatcher

if self.pipeline_spec is not None:
raise NotImplementedError(
'Local pipeline execution is not currently supported.')

self._outputs = task_dispatcher.run_single_task(
pipeline_spec=self.component_spec.to_pipeline_spec(),
arguments=args,
)
self._outputs = pipeline_orchestrator.run_local_pipeline(
pipeline_spec=self.pipeline_spec,
arguments=args,
)
elif self.component_spec is not None:
self._outputs = task_dispatcher.run_single_task(
pipeline_spec=self.component_spec.to_pipeline_spec(),
arguments=args,
)
else:
# user should never hit this
raise ValueError(
'One of pipeline_spec or component_spec must not be None for local execution.'
)
self.state = TaskState.FINAL

@property
Expand Down
31 changes: 29 additions & 2 deletions sdk/python/kfp/local/executor_input_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from google.protobuf import json_format
from google.protobuf import struct_pb2
from kfp import dsl
from kfp.compiler import pipeline_spec_builder
from kfp.dsl import utils
from kfp.pipeline_spec import pipeline_spec_pb2
Expand All @@ -29,19 +30,21 @@ def construct_executor_input(
component_spec: pipeline_spec_pb2.ComponentSpec,
arguments: Dict[str, Any],
task_root: str,
block_input_artifact: bool,
) -> pipeline_spec_pb2.ExecutorInput:
"""Constructs the executor input message for a task execution."""
input_parameter_keys = list(
component_spec.input_definitions.parameters.keys())
input_artifact_keys = list(
component_spec.input_definitions.artifacts.keys())
if input_artifact_keys:
if input_artifact_keys and block_input_artifact:
raise ValueError(
'Input artifacts are not yet supported for local execution.')

output_parameter_keys = list(
component_spec.output_definitions.parameters.keys())
output_artifact_specs_dict = component_spec.output_definitions.artifacts
input_artifact_specs_dict = component_spec.input_definitions.artifacts

inputs = pipeline_spec_pb2.ExecutorInput.Inputs(
parameter_values={
Expand All @@ -52,7 +55,11 @@ def construct_executor_input(
for param_name in input_parameter_keys
},
# input artifact constants are not supported yet
artifacts={},
artifacts={
artifact_name:
dsl_artifact_to_artifact_list(arguments[artifact_name])
for artifact_name, _ in input_artifact_specs_dict.items()
},
)
outputs = pipeline_spec_pb2.ExecutorInput.Outputs(
parameters={
Expand Down Expand Up @@ -134,6 +141,26 @@ def artifact_type_schema_to_artifact_list(
])


def dict_to_protobuf_struct(d: Dict[str, Any]) -> struct_pb2.Struct:
protobuf_struct = struct_pb2.Struct()
protobuf_struct.update(d)
return protobuf_struct


def dsl_artifact_to_artifact_list(
artifact: dsl.Artifact,) -> pipeline_spec_pb2.ArtifactList:
return pipeline_spec_pb2.ArtifactList(artifacts=[
pipeline_spec_pb2.RuntimeArtifact(
name=artifact.name,
type=pipeline_spec_pb2.ArtifactTypeSchema(
schema_title=artifact.schema_title,
schema_version=artifact.schema_version),
uri=artifact.uri,
metadata=dict_to_protobuf_struct(artifact.metadata),
)
])


def executor_input_to_dict(
executor_input: pipeline_spec_pb2.ExecutorInput,
component_spec: pipeline_spec_pb2.ComponentSpec,
Expand Down
65 changes: 64 additions & 1 deletion sdk/python/kfp/local/executor_input_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import unittest

from google.protobuf import json_format
from kfp import dsl
from kfp.local import executor_input_utils
from kfp.local import testing_utilities
from kfp.pipeline_spec import pipeline_spec_pb2
Expand Down Expand Up @@ -76,6 +77,7 @@ def test_no_inputs(self):
component_spec=component_spec,
arguments=arguments,
task_root=task_root,
block_input_artifact=True,
)
expected = pipeline_spec_pb2.ExecutorInput()
json_format.ParseDict(
Expand Down Expand Up @@ -129,6 +131,7 @@ def test_various_io_types(self):
component_spec=component_spec,
arguments=arguments,
task_root=task_root,
block_input_artifact=True,
)
expected = pipeline_spec_pb2.ExecutorInput()
json_format.ParseDict(
Expand Down Expand Up @@ -166,7 +169,7 @@ def test_various_io_types(self):
}, expected)
self.assertEqual(actual, expected)

def test_input_artifacts_not_yet_supported(self):
def test_block_input_artifact(self):
component_spec = pipeline_spec_pb2.ComponentSpec()
json_format.ParseDict(
{
Expand All @@ -191,8 +194,68 @@ def test_input_artifacts_not_yet_supported(self):
component_spec=component_spec,
arguments=arguments,
task_root=task_root,
block_input_artifact=True,
)

def test_allow_input_artifact(self):
component_spec = pipeline_spec_pb2.ComponentSpec()
json_format.ParseDict(
{
'inputDefinitions': {
'artifacts': {
'in_artifact': {
'artifactType': {
'schemaTitle': 'system.Artifact',
'schemaVersion': '0.0.1'
}
}
}
},
'executorLabel': 'exec-comp'
}, component_spec)
task_root = '/foo/bar/my-pipeline-2023-10-10-13-32-59-420710/comp'
arguments = {
'in_artifact':
dsl.Artifact(
name='artifact',
uri='/foo/bar/my-pipeline-2023-10-10-13-32-59-420710/prev-comp/artifact',
metadata={'foo': 'bar'})
}
actual = executor_input_utils.construct_executor_input(
component_spec=component_spec,
arguments=arguments,
task_root=task_root,
block_input_artifact=False,
)
expected = pipeline_spec_pb2.ExecutorInput()
json_format.ParseDict(
{
'inputs': {
'artifacts': {
'in_artifact': {
'artifacts': [{
'name':
'artifact',
'type': {
'schemaTitle': 'system.Artifact',
'schemaVersion': '0.0.1'
},
'uri':
'/foo/bar/my-pipeline-2023-10-10-13-32-59-420710/prev-comp/artifact',
'metadata': {
'foo': 'bar'
}
}]
}
}
},
'outputs': {
'outputFile':
'/foo/bar/my-pipeline-2023-10-10-13-32-59-420710/comp/executor_output.json'
}
}, expected)
self.assertEqual(actual, expected)


class TestExecutorInputToDict(unittest.TestCase):

Expand Down
79 changes: 79 additions & 0 deletions sdk/python/kfp/local/graph_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright 2024 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Graph algorithms which are useful for working with PipelineSpec."""

from typing import Dict, List, Set

from kfp.pipeline_spec import pipeline_spec_pb2


def topological_sort_tasks(
tasks: Dict[str, pipeline_spec_pb2.PipelineTaskSpec]) -> List[str]:
"""Obtains a stack of tasks from a dictionary of task key to
PipelineTaskSpec value.
Args:
tasks: The tasks in the pipeline.
Returns:
A totally ordered stack of tasks. Tasks should be executed in the order they are popped off the right side of the stack.
"""
adj_list = build_adjacency_list(tasks)
return topological_sort(adj_list)


def build_adjacency_list(
tasks: Dict[str,
pipeline_spec_pb2.PipelineTaskSpec]) -> Dict[str, List[str]]:
"""Builds an adjacency list from a dictionary of task key to
PipelineTaskSpec value. This is a data pruning step, which allows for
simplified logic in topological_sort.
Args:
tasks: The tasks in the pipeline.
Returns:
An adjacency list of tasks name to a list of upstream tasks. The key task depends on all value tasks being executed first.
"""
return {
task_name: task_details.dependent_tasks
for task_name, task_details in tasks.items()
}


def topological_sort(adj_list: Dict[str, List[str]]) -> List[str]:
"""Topologicall sorts an adjacency list.
Args:
adj_list: An adjacency list of tasks name to a list of upstream tasks. The key task depends on all value tasks being executed first.
Returns:
A totally ordered stack of tasks. Tasks should be executed in the order they are popped off the right side of the stack.
"""

def dfs(node: str) -> None:
visited.add(node)
for neighbor in adj_list[node]:
if neighbor not in visited:
dfs(neighbor)
result.append(node)

# sort lists to force deterministic result
adj_list = {k: sorted(v) for k, v in adj_list.items()}
visited: Set[str] = set()
result = []
for node in adj_list:
if node not in visited:
dfs(node)
return result[::-1]
Loading

0 comments on commit 18dca03

Please sign in to comment.