forked from kubeflow/pipelines
-
Notifications
You must be signed in to change notification settings - Fork 0
/
component_factory.py
677 lines (570 loc) · 26 KB
/
component_factory.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
# Copyright 2021-2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import dataclasses
import inspect
import itertools
import pathlib
import re
import textwrap
from typing import Callable, List, Mapping, Optional, Tuple, Type, Union
import warnings
import docstring_parser
import kfp
from kfp.dsl import container_component_artifact_channel
from kfp.dsl import container_component_class
from kfp.dsl import graph_component
from kfp.dsl import placeholders
from kfp.dsl import python_component
from kfp.dsl import structures
from kfp.dsl import task_final_status
from kfp.dsl.types import artifact_types
from kfp.dsl.types import custom_artifact_types
from kfp.dsl.types import type_annotations
from kfp.dsl.types import type_utils
_DEFAULT_BASE_IMAGE = 'python:3.7'
SINGLE_OUTPUT_NAME = 'Output'
@dataclasses.dataclass
class ComponentInfo():
"""A dataclass capturing registered components.
This will likely be subsumed/augmented with BaseComponent.
"""
name: str
function_name: str
func: Callable
target_image: str
module_path: pathlib.Path
component_spec: structures.ComponentSpec
output_component_file: Optional[str] = None
base_image: str = _DEFAULT_BASE_IMAGE
packages_to_install: Optional[List[str]] = None
pip_index_urls: Optional[List[str]] = None
# A map from function_name to components. This is always populated when a
# module containing KFP components is loaded. Primarily used by KFP CLI
# component builder to package components in a file into containers.
REGISTERED_MODULES = None
def _python_function_name_to_component_name(name):
name_with_spaces = re.sub(' +', ' ', name.replace('_', ' ')).strip(' ')
return name_with_spaces[0].upper() + name_with_spaces[1:]
def make_index_url_options(pip_index_urls: Optional[List[str]]) -> str:
"""Generates index url options for pip install command based on provided
pip_index_urls.
Args:
pip_index_urls: Optional list of pip index urls
Returns:
- Empty string if pip_index_urls is empty/None.
- '--index-url url --trusted-host url ' if pip_index_urls contains 1
url
- the above followed by '--extra-index-url url --trusted-host url '
for
each next url in pip_index_urls if pip_index_urls contains more than 1
url
Note: In case pip_index_urls is not empty, the returned string will
contain space at the end.
"""
if not pip_index_urls:
return ''
index_url = pip_index_urls[0]
extra_index_urls = pip_index_urls[1:]
options = [f'--index-url {index_url} --trusted-host {index_url}']
options.extend(
f'--extra-index-url {extra_index_url} --trusted-host {extra_index_url}'
for extra_index_url in extra_index_urls)
return ' '.join(options) + ' '
def make_pip_install_command(
install_parts: List[str],
index_url_options: str,
) -> str:
concat_package_list = ' '.join(
[repr(str(package)) for package in install_parts])
return f'python3 -m pip install --quiet --no-warn-script-location {index_url_options}{concat_package_list}'
_install_python_packages_script_template = '''
if ! [ -x "$(command -v pip)" ]; then
python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
fi
PIP_DISABLE_PIP_VERSION_CHECK=1 {pip_install_commands} && "$0" "$@"
'''
def _get_packages_to_install_command(
kfp_package_path: Optional[str] = None,
pip_index_urls: Optional[List[str]] = None,
packages_to_install: Optional[List[str]] = None,
install_kfp_package: bool = True,
target_image: Optional[str] = None,
) -> List[str]:
packages_to_install = packages_to_install or []
kfp_in_user_pkgs = any(pkg.startswith('kfp') for pkg in packages_to_install)
# if the user doesn't say "don't install", they aren't building a
# container component, and they haven't already specified a KFP dep
# themselves, we install KFP for them
inject_kfp_install = install_kfp_package and target_image is None and not kfp_in_user_pkgs
if not inject_kfp_install and not packages_to_install:
return []
pip_install_strings = []
index_url_options = make_index_url_options(pip_index_urls)
if inject_kfp_install:
if kfp_package_path:
kfp_pip_install_command = make_pip_install_command(
install_parts=[kfp_package_path],
index_url_options=index_url_options,
)
else:
kfp_pip_install_command = make_pip_install_command(
install_parts=[
f'kfp=={kfp.__version__}',
'--no-deps',
'typing-extensions>=3.7.4,<5; python_version<"3.9"',
],
index_url_options=index_url_options,
)
pip_install_strings.append(kfp_pip_install_command)
if packages_to_install:
pip_install_strings.append(' && ')
if packages_to_install:
user_packages_pip_install_command = make_pip_install_command(
install_parts=packages_to_install,
index_url_options=index_url_options,
)
pip_install_strings.append(user_packages_pip_install_command)
return [
'sh', '-c',
_install_python_packages_script_template.format(
pip_install_commands=' '.join(pip_install_strings))
]
def _get_function_source_definition(func: Callable) -> str:
func_code = inspect.getsource(func)
# Function might be defined in some indented scope (e.g. in another
# function). We need to handle this and properly dedent the function source
# code
func_code = textwrap.dedent(func_code)
func_code_lines = func_code.split('\n')
# Removing possible decorators (can be multiline) until the function
# definition is found
func_code_lines = itertools.dropwhile(lambda x: not x.startswith('def'),
func_code_lines)
if not func_code_lines:
raise ValueError(
f'Failed to dedent and clean up the source of function "{func.__name__}". It is probably not properly indented.'
)
return '\n'.join(func_code_lines)
def _maybe_make_unique(name: str, names: List[str]):
if name not in names:
return name
for i in range(2, 100):
unique_name = f'{name}_{i}'
if unique_name not in names:
return unique_name
raise RuntimeError(f'Too many arguments with the name {name}')
def extract_component_interface(
func: Callable,
containerized: bool = False,
description: Optional[str] = None,
name: Optional[str] = None,
) -> structures.ComponentSpec:
signature = inspect.signature(func)
parameters = list(signature.parameters.values())
original_docstring = inspect.getdoc(func)
parsed_docstring = docstring_parser.parse(original_docstring)
inputs = {}
outputs = {}
input_names = set()
output_names = set()
for parameter in parameters:
parameter_type = type_annotations.maybe_strip_optional_from_annotation(
parameter.annotation)
passing_style = None
io_name = parameter.name
is_artifact_list = False
if type_annotations.is_Input_Output_artifact_annotation(parameter_type):
# passing_style is either type_annotations.InputAnnotation or
# type_annotations.OutputAnnotation.
passing_style = type_annotations.get_io_artifact_annotation(
parameter_type)
# parameter_type is a type like typing_extensions.Annotated[kfp.dsl.types.artifact_types.Artifact, <class 'kfp.dsl.types.type_annotations.OutputAnnotation'>] OR typing_extensions.Annotated[typing.List[kfp.dsl.types.artifact_types.Artifact], <class 'kfp.dsl.types.type_annotations.OutputAnnotation'>]
is_artifact_list = type_annotations.is_list_of_artifacts(
parameter_type.__origin__)
parameter_type = type_annotations.get_io_artifact_class(
parameter_type)
if not type_annotations.is_artifact_class(parameter_type):
raise ValueError(
f'Input[T] and Output[T] are only supported when T is an artifact or list of artifacts. Found `{io_name} with type {parameter_type}`'
)
if parameter.default is not inspect.Parameter.empty:
if passing_style in [
type_annotations.OutputAnnotation,
type_annotations.OutputPath,
]:
raise ValueError(
'Default values for Output artifacts are not supported.'
)
elif parameter.default is not None:
raise ValueError(
f'Optional Input artifacts may only have default value None. Got: {parameter.default}.'
)
elif isinstance(
parameter_type,
(type_annotations.InputPath, type_annotations.OutputPath)):
passing_style = type(parameter_type)
parameter_type = parameter_type.type
if parameter.default is not inspect.Parameter.empty and not (
passing_style == type_annotations.InputPath and
parameter.default is None):
raise ValueError(
'Path inputs only support default values of None. Default'
' values for outputs are not supported.')
type_struct = type_utils._annotation_to_type_struct(parameter_type)
if type_struct is None:
raise TypeError(
f'Missing type annotation for argument: {parameter.name}')
if passing_style in [
type_annotations.OutputAnnotation, type_annotations.OutputPath
]:
if io_name == SINGLE_OUTPUT_NAME:
raise ValueError(
f'"{SINGLE_OUTPUT_NAME}" is an invalid parameter name.')
io_name = _maybe_make_unique(io_name, output_names)
output_names.add(io_name)
if type_annotations.is_artifact_class(parameter_type):
schema_version = parameter_type.schema_version
output_spec = structures.OutputSpec(
type=type_utils.create_bundled_artifact_type(
type_struct, schema_version),
is_artifact_list=is_artifact_list)
else:
output_spec = structures.OutputSpec(type=type_struct)
outputs[io_name] = output_spec
else:
io_name = _maybe_make_unique(io_name, input_names)
input_names.add(io_name)
type_ = type_utils.create_bundled_artifact_type(
type_struct, parameter_type.schema_version
) if type_annotations.is_artifact_class(
parameter_type) else type_struct
default = None if parameter.default == inspect.Parameter.empty or type_annotations.is_artifact_class(
parameter_type) else parameter.default
optional = parameter.default is not inspect.Parameter.empty or type_utils.is_task_final_status_type(
type_struct)
input_spec = structures.InputSpec(
type=type_,
default=default,
optional=optional,
is_artifact_list=is_artifact_list,
)
inputs[io_name] = input_spec
#Analyzing the return type annotations.
return_ann = signature.return_annotation
if not containerized:
if hasattr(return_ann, '_fields'): #NamedTuple
# Getting field type annotations.
# __annotations__ does not exist in python 3.5 and earlier
# _field_types does not exist in python 3.9 and later
field_annotations = getattr(return_ann, '__annotations__',
None) or getattr(
return_ann, '_field_types', None)
for field_name in return_ann._fields:
output_name = _maybe_make_unique(field_name, output_names)
output_names.add(output_name)
type_var = field_annotations.get(field_name)
if type_annotations.is_list_of_artifacts(type_var):
artifact_cls = type_var.__args__[0]
output_spec = structures.OutputSpec(
type=type_utils.create_bundled_artifact_type(
artifact_cls.schema_title,
artifact_cls.schema_version),
is_artifact_list=True)
elif type_annotations.is_artifact_class(type_var):
output_spec = structures.OutputSpec(
type=type_utils.create_bundled_artifact_type(
type_var.schema_title, type_var.schema_version))
else:
type_struct = type_utils._annotation_to_type_struct(
type_var)
output_spec = structures.OutputSpec(type=type_struct)
outputs[output_name] = output_spec
# Deprecated dict-based way of declaring multiple outputs. Was only used by
# the @component decorator
elif isinstance(return_ann, dict):
warnings.warn(
'The ability to specify multiple outputs using the dict syntax'
' has been deprecated. It will be removed soon after release'
' 0.1.32. Please use typing.NamedTuple to declare multiple'
' outputs.')
for output_name, output_type_annotation in return_ann.items():
output_type_struct = type_utils._annotation_to_type_struct(
output_type_annotation)
output_spec = structures.OutputSpec(type=output_type_struct)
outputs[name] = output_spec
elif signature.return_annotation is not None and signature.return_annotation != inspect.Parameter.empty:
output_name = _maybe_make_unique(SINGLE_OUTPUT_NAME, output_names)
# Fixes exotic, but possible collision:
# `def func(output_path: OutputPath()) -> str: ...`
output_names.add(output_name)
return_ann = signature.return_annotation
if type_annotations.is_list_of_artifacts(return_ann):
artifact_cls = return_ann.__args__[0]
output_spec = structures.OutputSpec(
type=type_utils.create_bundled_artifact_type(
artifact_cls.schema_title, artifact_cls.schema_version),
is_artifact_list=True)
elif type_annotations.is_artifact_class(return_ann):
output_spec = structures.OutputSpec(
type=type_utils.create_bundled_artifact_type(
return_ann.schema_title, return_ann.schema_version),
is_artifact_list=False)
else:
type_struct = type_utils._annotation_to_type_struct(return_ann)
output_spec = structures.OutputSpec(type=type_struct)
outputs[output_name] = output_spec
elif return_ann != inspect.Parameter.empty and return_ann != structures.ContainerSpec:
raise TypeError(
'Return annotation should be either ContainerSpec or omitted for container components.'
)
component_name = name or _python_function_name_to_component_name(
func.__name__)
def assign_descriptions(
inputs_or_outputs: Mapping[str, Union[structures.InputSpec,
structures.OutputSpec]],
docstring_params: List[docstring_parser.DocstringParam],
) -> None:
"""Assigns descriptions to InputSpec or OutputSpec for each component
input/output found in the parsed docstring parameters."""
docstring_inputs = {param.arg_name: param for param in docstring_params}
for name, spec in inputs_or_outputs.items():
if name in docstring_inputs:
spec.description = docstring_inputs[name].description
def parse_docstring_with_return_as_args(
docstring: Union[str,
None]) -> Optional[docstring_parser.Docstring]:
"""Modifies docstring so that a return section can be treated as an
args section, then parses the docstring."""
if docstring is None:
return None
# Returns and Return are the only two keywords docstring_parser uses for returns
# use newline to avoid replacements that aren't in the return section header
return_keywords = ['Returns:\n', 'Returns\n', 'Return:\n', 'Return\n']
for keyword in return_keywords:
if keyword in docstring:
modified_docstring = docstring.replace(keyword.strip(), 'Args:')
return docstring_parser.parse(modified_docstring)
return None
assign_descriptions(inputs, parsed_docstring.params)
modified_parsed_docstring = parse_docstring_with_return_as_args(
original_docstring)
if modified_parsed_docstring is not None:
assign_descriptions(outputs, modified_parsed_docstring.params)
description = get_pipeline_description(
decorator_description=description,
docstring=parsed_docstring,
)
return structures.ComponentSpec(
name=component_name,
description=description,
inputs=inputs or None,
outputs=outputs or None,
implementation=structures.Implementation(),
)
def _get_command_and_args_for_lightweight_component(
func: Callable) -> Tuple[List[str], List[str]]:
imports_source = [
'import kfp',
'from kfp import dsl',
'from kfp.dsl import *',
'from typing import *',
] + custom_artifact_types.get_custom_artifact_type_import_statements(func)
func_source = _get_function_source_definition(func)
source = textwrap.dedent('''
{imports_source}
{func_source}\n''').format(
imports_source='\n'.join(imports_source), func_source=func_source)
command = [
'sh',
'-ec',
textwrap.dedent('''\
program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main \
--component_module_path \
"$program_path/ephemeral_component.py" \
"$@"
'''),
source,
]
args = [
'--executor_input',
placeholders.ExecutorInputPlaceholder(),
'--function_to_execute',
func.__name__,
]
return command, args
def _get_command_and_args_for_containerized_component(
function_name: str) -> Tuple[List[str], List[str]]:
command = [
'python3',
'-m',
'kfp.dsl.executor_main',
]
args = [
'--executor_input',
placeholders.ExecutorInputPlaceholder()._to_string(),
'--function_to_execute',
function_name,
]
return command, args
def create_component_from_func(
func: Callable,
base_image: Optional[str] = None,
target_image: Optional[str] = None,
packages_to_install: List[str] = None,
pip_index_urls: Optional[List[str]] = None,
output_component_file: Optional[str] = None,
install_kfp_package: bool = True,
kfp_package_path: Optional[str] = None,
) -> python_component.PythonComponent:
"""Implementation for the @component decorator.
The decorator is defined under component_decorator.py. See the
decorator for the canonical documentation for this function.
"""
packages_to_install_command = _get_packages_to_install_command(
install_kfp_package=install_kfp_package,
target_image=target_image,
kfp_package_path=kfp_package_path,
packages_to_install=packages_to_install,
pip_index_urls=pip_index_urls,
)
command = []
args = []
if base_image is None:
base_image = _DEFAULT_BASE_IMAGE
component_image = base_image
if target_image:
component_image = target_image
command, args = _get_command_and_args_for_containerized_component(
function_name=func.__name__,)
else:
command, args = _get_command_and_args_for_lightweight_component(
func=func)
component_spec = extract_component_interface(func)
component_spec.implementation = structures.Implementation(
container=structures.ContainerSpecImplementation(
image=component_image,
command=packages_to_install_command + command,
args=args,
))
module_path = pathlib.Path(inspect.getsourcefile(func))
module_path.resolve()
component_name = _python_function_name_to_component_name(func.__name__)
component_info = ComponentInfo(
name=component_name,
function_name=func.__name__,
func=func,
target_image=target_image,
module_path=module_path,
component_spec=component_spec,
output_component_file=output_component_file,
base_image=base_image,
packages_to_install=packages_to_install,
pip_index_urls=pip_index_urls)
if REGISTERED_MODULES is not None:
REGISTERED_MODULES[component_name] = component_info
if output_component_file:
component_spec.save_to_component_yaml(output_component_file)
return python_component.PythonComponent(
component_spec=component_spec, python_func=func)
def make_input_for_parameterized_container_component_function(
name: str, annotation: Union[Type[List[artifact_types.Artifact]],
Type[artifact_types.Artifact]]
) -> Union[placeholders.Placeholder, container_component_artifact_channel
.ContainerComponentArtifactChannel]:
if type_annotations.is_input_artifact(annotation):
if type_annotations.is_list_of_artifacts(annotation.__origin__):
return placeholders.InputListOfArtifactsPlaceholder(name)
else:
return container_component_artifact_channel.ContainerComponentArtifactChannel(
io_type='input', var_name=name)
elif type_annotations.is_output_artifact(annotation):
if type_annotations.is_list_of_artifacts(annotation.__origin__):
return placeholders.OutputListOfArtifactsPlaceholder(name)
else:
return container_component_artifact_channel.ContainerComponentArtifactChannel(
io_type='output', var_name=name)
elif isinstance(
annotation,
(type_annotations.OutputAnnotation, type_annotations.OutputPath)):
return placeholders.OutputParameterPlaceholder(name)
else:
placeholder = placeholders.InputValuePlaceholder(name)
# small hack to encode the runtime value's type for a custom json.dumps function
if (annotation == task_final_status.PipelineTaskFinalStatus or
type_utils.is_task_final_status_type(annotation)):
placeholder._ir_type = 'STRUCT'
else:
placeholder._ir_type = type_utils.get_parameter_type_name(
annotation)
return placeholder
def create_container_component_from_func(
func: Callable) -> container_component_class.ContainerComponent:
"""Implementation for the @container_component decorator.
The decorator is defined under container_component_decorator.py. See
the decorator for the canonical documentation for this function.
"""
component_spec = extract_component_interface(func, containerized=True)
signature = inspect.signature(func)
parameters = list(signature.parameters.values())
arg_list = []
for parameter in parameters:
parameter_type = type_annotations.maybe_strip_optional_from_annotation(
parameter.annotation)
arg_list.append(
make_input_for_parameterized_container_component_function(
parameter.name, parameter_type))
container_spec = func(*arg_list)
container_spec_implementation = structures.ContainerSpecImplementation.from_container_spec(
container_spec)
component_spec.implementation = structures.Implementation(
container_spec_implementation)
component_spec._validate_placeholders()
return container_component_class.ContainerComponent(component_spec, func)
def create_graph_component_from_func(
func: Callable,
name: Optional[str] = None,
description: Optional[str] = None,
display_name: Optional[str] = None,
) -> graph_component.GraphComponent:
"""Implementation for the @pipeline decorator.
The decorator is defined under pipeline_context.py. See the
decorator for the canonical documentation for this function.
"""
component_spec = extract_component_interface(
func,
description=description,
name=name,
)
return graph_component.GraphComponent(
component_spec=component_spec,
pipeline_func=func,
display_name=display_name,
)
def get_pipeline_description(
decorator_description: Union[str, None],
docstring: docstring_parser.Docstring,
) -> Optional[str]:
"""Obtains the correct pipeline description from the pipeline decorator's
description argument and the parsed docstring.
Gives precedence to the decorator argument.
"""
if decorator_description:
return decorator_description
short_description = docstring.short_description
long_description = docstring.long_description
docstring_description = short_description + '\n' + long_description if (
short_description and long_description) else short_description
return docstring_description.strip() if docstring_description else None