Skip to content

Commit

Permalink
order dependencies when subsetting pipeline (#4851)
Browse files Browse the repository at this point in the history
resolves #4848

### Test Plan

added repro test case base on issue
  • Loading branch information
alangenfeld authored and OwenKephart committed Sep 16, 2021
1 parent e9bfe95 commit 7efd236
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,9 @@ def __init__(self, solid_names: List[str], handle_dict: InputToOutputHandleDict)
self._dynamic_fan_out_index[output_handle.solid_name],
)
else:
check.failed("Unexpected dynamic fan in dep created")
check.failed(
f"Unexpected dynamic fan in dep created {output_handle} -> {input_handle}"
)

output_handle_list = [output_handle]
else:
Expand Down
12 changes: 9 additions & 3 deletions python_modules/dagster/dagster/core/definitions/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,8 @@ def _dep_key_of(solid: Node) -> SolidInvocation:


def _get_pipeline_subset_def(
pipeline_def, solids_to_execute: AbstractSet[str]
pipeline_def: PipelineDefinition,
solids_to_execute: AbstractSet[str],
) -> "PipelineSubsetDefinition":
"""
Build a pipeline which is a subset of another pipeline.
Expand All @@ -613,7 +614,11 @@ def _get_pipeline_subset_def(
),
)

solids = list(map(graph.solid_named, solids_to_execute))
# go in topo order to ensure deps dict is ordered
solids = list(
filter(lambda solid: solid.name in solids_to_execute, graph.solids_in_topological_order)
)

deps: Dict[
Union[str, SolidInvocation],
Dict[str, IDependencyDefinition],
Expand All @@ -627,7 +632,7 @@ def _get_pipeline_subset_def(
deps[_dep_key_of(solid)][input_handle.input_def.name] = DependencyDefinition(
solid=output_handle.solid.name, output=output_handle.output_def.name
)
if graph.dependency_structure.has_dynamic_fan_in_dep(input_handle):
elif graph.dependency_structure.has_dynamic_fan_in_dep(input_handle):
output_handle = graph.dependency_structure.get_dynamic_fan_in_dep(input_handle)
if output_handle.solid.name in solids_to_execute:
deps[_dep_key_of(solid)][
Expand All @@ -647,6 +652,7 @@ def _get_pipeline_subset_def(
if output_handle.solid.name in solids_to_execute
]
)
# else input is unconnected

try:
sub_pipeline_def = PipelineSubsetDefinition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,26 @@ def echo(_, x: int) -> int:


@solid(
output_defs=[DynamicOutputDefinition()],
config_schema={
"range": Field(int, is_required=False, default_value=3),
}
)
def num_range(context) -> int:
return context.solid_config["range"]


@solid(
output_defs=[DynamicOutputDefinition()],
config_schema={
"fail": Field(bool, is_required=False, default_value=False),
},
tags={"first": "1"},
)
def emit(context):
def emit(context, num: int = 3):
if context.solid_config["fail"]:
raise Exception("FAILURE")

for i in range(context.solid_config["range"]):
for i in range(num):
yield DynamicOutput(value=i, mapping_key=str(i))


Expand All @@ -72,15 +80,16 @@ def dynamic_echo(_, nums):

@pipeline(mode_defs=[ModeDefinition(resource_defs={"io_manager": fs_io_manager})])
def dynamic_pipeline():
numbers = emit()

numbers = emit(num_range())
dynamic = numbers.map(lambda num: multiply_by_two(multiply_inputs(num, emit_ten())))
n = multiply_by_two.alias("double_total")(sum_numbers(dynamic.collect()))
echo(n) # test transitive downstream of collect


@pipeline(mode_defs=[ModeDefinition(resource_defs={"io_manager": fs_io_manager})])
def fan_repeat():
one = emit().map(multiply_by_two)
one = emit(num_range()).map(multiply_by_two)
two = dynamic_echo(one.collect()).map(multiply_by_two).map(echo)
three = dynamic_echo(two.collect()).map(multiply_by_two)
sum_numbers(three.collect())
Expand Down Expand Up @@ -130,12 +139,28 @@ def test_map_empty(run_config):
result = execute_pipeline(
reconstructable(dynamic_pipeline),
instance=instance,
run_config=merge_dicts({"solids": {"emit": {"config": {"range": 0}}}}, run_config),
run_config=merge_dicts({"solids": {"num_range": {"config": {"range": 0}}}}, run_config),
)
assert result.success
assert result.result_for_solid("double_total").output_value() == 0


@pytest.mark.parametrize(
"run_config",
_run_configs(),
)
def test_map_selection(run_config):
with instance_for_test() as instance:
result = execute_pipeline(
reconstructable(dynamic_pipeline),
instance=instance,
run_config=merge_dicts({"solids": {"emit": {"inputs": {"num": 2}}}}, run_config),
solid_selection=["emit*", "emit_ten"],
)
assert result.success
assert result.result_for_solid("double_total").output_value() == 40


def test_composite_wrapping():
# regression test from user report

Expand Down Expand Up @@ -253,7 +278,7 @@ def test_fan_out_in_out_in(run_config):
empty_result = execute_pipeline(
reconstructable(fan_repeat),
instance=instance,
run_config={"solids": {"emit": {"config": {"range": 0}}}},
run_config={"solids": {"num_range": {"config": {"range": 0}}}},
)
assert empty_result.success
assert empty_result.result_for_solid("sum_numbers").output_value() == 0
Expand Down

0 comments on commit 7efd236

Please sign in to comment.