From f4558c841ef6fdac2acddb6037c4d081b09e4181 Mon Sep 17 00:00:00 2001 From: alangenfeld Date: Thu, 16 Sep 2021 12:13:19 -0500 Subject: [PATCH] order dependencies when subsetting pipeline --- .../dagster/core/definitions/dependency.py | 4 ++- .../dagster/core/definitions/pipeline.py | 12 +++++-- .../dynamic_tests/test_dynamic_execution.py | 35 ++++++++++++++++--- 3 files changed, 42 insertions(+), 9 deletions(-) diff --git a/python_modules/dagster/dagster/core/definitions/dependency.py b/python_modules/dagster/dagster/core/definitions/dependency.py index f1dfaf75c9ff2..ae2f0914f71df 100644 --- a/python_modules/dagster/dagster/core/definitions/dependency.py +++ b/python_modules/dagster/dagster/core/definitions/dependency.py @@ -798,7 +798,9 @@ def __init__(self, solid_names: List[str], handle_dict: InputToOutputHandleDict) self._dynamic_fan_out_index[output_handle.solid_name], ) else: - check.failed("Unexpected dynamic fan in dep created") + check.failed( + f"Unexpected dynamic fan in dep created {output_handle} -> {input_handle}" + ) output_handle_list = [output_handle] else: diff --git a/python_modules/dagster/dagster/core/definitions/pipeline.py b/python_modules/dagster/dagster/core/definitions/pipeline.py index d9cdf6a760274..aeb61884f3a9f 100644 --- a/python_modules/dagster/dagster/core/definitions/pipeline.py +++ b/python_modules/dagster/dagster/core/definitions/pipeline.py @@ -595,7 +595,8 @@ def _dep_key_of(solid: Node) -> SolidInvocation: def _get_pipeline_subset_def( - pipeline_def, solids_to_execute: AbstractSet[str] + pipeline_def: PipelineDefinition, + solids_to_execute: AbstractSet[str], ) -> "PipelineSubsetDefinition": """ Build a pipeline which is a subset of another pipeline. @@ -613,7 +614,11 @@ def _get_pipeline_subset_def( ), ) - solids = list(map(graph.solid_named, solids_to_execute)) + # go in topo order to ensure deps dict is ordered + solids = list( + filter(lambda solid: solid.name in solids_to_execute, graph.solids_in_topological_order) + ) + deps: Dict[ Union[str, SolidInvocation], Dict[str, IDependencyDefinition], @@ -627,7 +632,7 @@ def _get_pipeline_subset_def( deps[_dep_key_of(solid)][input_handle.input_def.name] = DependencyDefinition( solid=output_handle.solid.name, output=output_handle.output_def.name ) - if graph.dependency_structure.has_dynamic_fan_in_dep(input_handle): + elif graph.dependency_structure.has_dynamic_fan_in_dep(input_handle): output_handle = graph.dependency_structure.get_dynamic_fan_in_dep(input_handle) if output_handle.solid.name in solids_to_execute: deps[_dep_key_of(solid)][ @@ -647,6 +652,7 @@ def _get_pipeline_subset_def( if output_handle.solid.name in solids_to_execute ] ) + # else input is unconnected try: sub_pipeline_def = PipelineSubsetDefinition( diff --git a/python_modules/dagster/dagster_tests/execution_tests/dynamic_tests/test_dynamic_execution.py b/python_modules/dagster/dagster_tests/execution_tests/dynamic_tests/test_dynamic_execution.py index c3b2b1b68f83c..1611b93d1841d 100644 --- a/python_modules/dagster/dagster_tests/execution_tests/dynamic_tests/test_dynamic_execution.py +++ b/python_modules/dagster/dagster_tests/execution_tests/dynamic_tests/test_dynamic_execution.py @@ -44,18 +44,26 @@ def echo(_, x: int) -> int: @solid( - output_defs=[DynamicOutputDefinition()], config_schema={ "range": Field(int, is_required=False, default_value=3), + } +) +def num_range(context) -> int: + return context.solid_config["range"] + + +@solid( + output_defs=[DynamicOutputDefinition()], + config_schema={ "fail": Field(bool, is_required=False, default_value=False), }, tags={"first": "1"}, ) -def emit(context): +def emit(context, num: int): if context.solid_config["fail"]: raise Exception("FAILURE") - for i in range(context.solid_config["range"]): + for i in range(num): yield DynamicOutput(value=i, mapping_key=str(i)) @@ -72,7 +80,8 @@ def dynamic_echo(_, nums): @pipeline(mode_defs=[ModeDefinition(resource_defs={"io_manager": fs_io_manager})]) def dynamic_pipeline(): - numbers = emit() + + numbers = emit(num_range()) dynamic = numbers.map(lambda num: multiply_by_two(multiply_inputs(num, emit_ten()))) n = multiply_by_two.alias("double_total")(sum_numbers(dynamic.collect())) echo(n) # test transitive downstream of collect @@ -130,12 +139,28 @@ def test_map_empty(run_config): result = execute_pipeline( reconstructable(dynamic_pipeline), instance=instance, - run_config=merge_dicts({"solids": {"emit": {"config": {"range": 0}}}}, run_config), + run_config=merge_dicts({"solids": {"num_range": {"config": {"range": 0}}}}, run_config), ) assert result.success assert result.result_for_solid("double_total").output_value() == 0 +@pytest.mark.parametrize( + "run_config", + _run_configs(), +) +def test_map_selection(run_config): + with instance_for_test() as instance: + result = execute_pipeline( + reconstructable(dynamic_pipeline), + instance=instance, + run_config=merge_dicts({"solids": {"emit": {"inputs": {"num": 2}}}}, run_config), + solid_selection=["emit*", "emit_ten"], + ) + assert result.success + assert result.result_for_solid("double_total").output_value() == 40 + + def test_composite_wrapping(): # regression test from user report