From 41aa696c8a969a03a8e55428ef6382f2c2912228 Mon Sep 17 00:00:00 2001 From: Ondrej Zacha Date: Sun, 17 Mar 2024 22:58:48 +0100 Subject: [PATCH 1/9] Improve suggestions to resume a failed pipeline - if dataset (or param) is persistent & shared, don't keep looking for ancestors - only look for ancestors producing impersistent inputs - minimize number of suggested nodes (= shorter message for the same pipeline) - testable logic, tests cases outside of scenarios for sequential runner Signed-off-by: Ondrej Zacha --- kedro/runner/runner.py | 170 +++++++++++++++----- tests/runner/conftest.py | 111 ++++++++++++- tests/runner/test_sequential_runner.py | 212 +++++++++++++++++++++++-- 3 files changed, 430 insertions(+), 63 deletions(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index ae653f37ea..423c3938d7 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -15,7 +15,7 @@ as_completed, wait, ) -from typing import Any, Iterable, Iterator +from typing import Any, Collection, Iterable, Iterator from more_itertools import interleave from pluggy import PluginManager @@ -198,17 +198,13 @@ def _suggest_resume_scenario( postfix = "" if done_nodes: - node_names = (n.name for n in remaining_nodes) - resume_p = pipeline.only_nodes(*node_names) - start_p = resume_p.only_nodes_with_inputs(*resume_p.inputs()) - - # find the nearest persistent ancestors of the nodes in start_p - start_p_persistent_ancestors = _find_persistent_ancestors( - pipeline, start_p.nodes, catalog + start_node_names = find_nodes_to_resume_from( + pipeline=pipeline, + unfinished_nodes=remaining_nodes, + catalog=catalog, ) - - start_node_names = (n.name for n in start_p_persistent_ancestors) - postfix += f" --from-nodes \"{','.join(start_node_names)}\"" + start_nodes_str = ",".join(sorted(start_node_names)) + postfix += f' --from-nodes "{start_nodes_str}"' if not postfix: self._logger.warning( @@ -216,21 +212,47 @@ def _suggest_resume_scenario( ) else: self._logger.warning( - "There are %d nodes that have not run.\n" + f"There are {len(remaining_nodes)} nodes that have not run.\n" "You can resume the pipeline run from the nearest nodes with " "persisted inputs by adding the following " - "argument to your previous command:\n%s", - len(remaining_nodes), - postfix, + f"argument to your previous command: {postfix}" ) -def _find_persistent_ancestors( - pipeline: Pipeline, children: Iterable[Node], catalog: DataCatalog +def find_nodes_to_resume_from( + pipeline: Pipeline, unfinished_nodes: Collection[Node], catalog: DataCatalog +) -> set[str]: + """Given a collection of unfinished nodes in a pipeline using + a certain catalog, find the node names to pass to pipeline.from_nodes() + to cover all unfinished nodes, including any additional nodes + that should be re-run if their outputs are not persisted. + + Args: + pipeline: the ``Pipeline`` to find starting nodes for. + unfinished_nodes: collection of ``Node``s that have not finished yet + catalog: the ``DataCatalog`` of the run. + + Returns: + Set of node names to pass to pipeline.from_nodes() to continue + the run. + + """ + all_nodes_that_need_to_run = find_all_required_nodes( + pipeline, unfinished_nodes, catalog + ) + + # Find which of the remaining nodes would need to run first (in topo sort) + persistent_ancestors = find_initial_node_group(pipeline, all_nodes_that_need_to_run) + + return {n.name for n in persistent_ancestors} + + +def find_all_required_nodes( + pipeline: Pipeline, unfinished_nodes: Iterable[Node], catalog: DataCatalog ) -> set[Node]: """Breadth-first search approach to finding the complete set of persistent ancestors of an iterable of ``Node``s. Persistent - ancestors exclusively have persisted ``Dataset``s as inputs. + ancestors exclusively have persisted ``Dataset``s or parameters as inputs. Args: pipeline: the ``Pipeline`` to find ancestors in. @@ -242,54 +264,114 @@ def _find_persistent_ancestors( ``Node``s. """ - ancestor_nodes_to_run = set() - queue, visited = deque(children), set(children) + nodes_to_run = set(unfinished_nodes) + initial_nodes = _nodes_with_external_inputs(pipeline, unfinished_nodes) + + queue, visited = deque(initial_nodes), set(initial_nodes) while queue: current_node = queue.popleft() - if _has_persistent_inputs(current_node, catalog): - ancestor_nodes_to_run.add(current_node) - continue - for parent in _enumerate_parents(pipeline, current_node): - if parent in visited: + nodes_to_run.add(current_node) + non_persistent_inputs = _enumerate_non_persistent_inputs(current_node, catalog) + # Look for the nodes that produce non-persistent inputs (if they exist) + for node in _enumerate_nodes_with_outputs(pipeline, non_persistent_inputs): + if node in visited: continue - visited.add(parent) - queue.append(parent) - return ancestor_nodes_to_run + visited.add(node) + queue.append(node) + # Make sure no downstream tasks are skipped + nodes_to_run = pipeline.from_nodes(*(n.name for n in nodes_to_run)).nodes -def _enumerate_parents(pipeline: Pipeline, child: Node) -> list[Node]: - """For a given ``Node``, returns a list containing the direct parents - of that ``Node`` in the given ``Pipeline``. + return set(nodes_to_run) + + +def _nodes_with_external_inputs( + pipeline: Pipeline, nodes_of_interest: Iterable[Node] +) -> set[Node]: + """For given ``Node``s in a ``Pipeline``, find their + subset which depends on external inputs of the ``Pipeline``. Args: - pipeline: the ``Pipeline`` to search for direct parents in. - child: the ``Node`` to find parents of. + pipeline: the ``Pipeline`` to search for nodes in. + nodes_of_interest: the ``Node``s to analyze. Returns: - A list of all ``Node``s that are direct parents of ``child``. + A set of ``Node``s that depend on external inputs + of nodes of interest. """ - parent_pipeline = pipeline.only_nodes_with_outputs(*child.inputs) - return parent_pipeline.nodes + p_nodes_of_interest = pipeline.only_nodes(*(n.name for n in nodes_of_interest)) + p_nodes_with_external_inputs = p_nodes_of_interest.only_nodes_with_inputs( + *p_nodes_of_interest.inputs() + ) + return set(p_nodes_with_external_inputs.nodes) -def _has_persistent_inputs(node: Node, catalog: DataCatalog) -> bool: - """Check if a ``Node`` exclusively has persisted Datasets as inputs. - If at least one input is a ``MemoryDataset``, return False. +def _enumerate_non_persistent_inputs(node: Node, catalog: DataCatalog) -> set[str]: + """Enumerate non-persistent input datasets of a ``Node``. Args: node: the ``Node`` to check the inputs of. catalog: the ``DataCatalog`` of the run. Returns: - True if the ``Node`` being checked exclusively has inputs that - are not ``MemoryDataset``, else False. + Set of names of non-persistent inputs of given ``Node``. """ + # We use _datasets because they pertain parameter name format + catalog_datasets = catalog._datasets + non_persistent_inputs: set[str] = set() for node_input in node.inputs: - if isinstance(catalog._datasets[node_input], MemoryDataset): - return False - return True + if node_input.startswith("params:"): + continue + if node_input not in catalog_datasets or isinstance( + catalog_datasets[node_input], MemoryDataset + ): + non_persistent_inputs.add(node_input) + + return non_persistent_inputs + + +def _enumerate_nodes_with_outputs( + pipeline: Pipeline, outputs: Collection[str] +) -> list[Node]: + """For given outputs, returns a list containing nodes that + generate them in the given ``Pipeline``. + + Args: + pipeline: the ``Pipeline`` to search for nodes in. + outputs: the dataset names to find source nodes for. + + Returns: + A list of all ``Node``s that are producing ``outputs``. + + """ + parent_pipeline = pipeline.only_nodes_with_outputs(*outputs) + return parent_pipeline.nodes + + +def find_initial_node_group(pipeline: Pipeline, nodes: Iterable[Node]) -> list[Node]: + """Given a collection of ``Node``s in a ``Pipeline``, + find the initial group of ``Node``s to be run (in topological order). + + This can be used to define a sub-pipeline with the smallest possible + set of nodes to pass to --from-nodes. + + Args: + pipeline: the ``Pipeline`` to search for initial ``Node``s in. + nodes: the ``Node``s to find initial group for. + + Returns: + A list of initial ``Node``s to run given inputs (in topological order). + + """ + node_names = set(n.name for n in nodes) + if len(node_names) == 0: + # TODO: or raise? + return [] + sub_pipeline = pipeline.only_nodes(*node_names) + initial_nodes = sub_pipeline.grouped_nodes[0] + return initial_nodes def run_node( diff --git a/tests/runner/conftest.py b/tests/runner/conftest.py index 25ca233e97..15d278d717 100644 --- a/tests/runner/conftest.py +++ b/tests/runner/conftest.py @@ -15,6 +15,10 @@ def identity(arg): return arg +def first_arg(*args): + return args[0] + + def sink(arg): pass @@ -36,7 +40,7 @@ def return_not_serialisable(arg): return lambda x: x -def multi_input_list_output(arg1, arg2): +def multi_input_list_output(arg1, arg2, arg3=None): return [arg1, arg2] @@ -80,6 +84,9 @@ def _save(arg): "ds0_B": persistent_dataset, "ds2_A": persistent_dataset, "ds2_B": persistent_dataset, + "dsX": persistent_dataset, + "dsY": persistent_dataset, # TODO: names? + "params:p": MemoryDataset(1), } ) @@ -148,21 +155,31 @@ def unfinished_outputs_pipeline(): @pytest.fixture def two_branches_crossed_pipeline(): - """A ``Pipeline`` with an X-shape (two branches with one common node)""" + r"""A ``Pipeline`` with an X-shape (two branches with one common node): + + (node1_A) (node1_B) + \ / + (node2) + / \ + (node3_A) (node3_B) + / \ + (node4_A) (node4_B) + + """ return pipeline( [ - node(identity, "ds0_A", "ds1_A", name="node1_A"), - node(identity, "ds0_B", "ds1_B", name="node1_B"), + node(first_arg, "ds0_A", "ds1_A", name="node1_A"), + node(first_arg, "ds0_B", "ds1_B", name="node1_B"), node( multi_input_list_output, ["ds1_A", "ds1_B"], ["ds2_A", "ds2_B"], name="node2", ), - node(identity, "ds2_A", "ds3_A", name="node3_A"), - node(identity, "ds2_B", "ds3_B", name="node3_B"), - node(identity, "ds3_A", "ds4_A", name="node4_A"), - node(identity, "ds3_B", "ds4_B", name="node4_B"), + node(first_arg, "ds2_A", "ds3_A", name="node3_A"), + node(first_arg, "ds2_B", "ds3_B", name="node3_B"), + node(first_arg, "ds3_A", "ds4_A", name="node4_A"), + node(first_arg, "ds3_B", "ds4_B", name="node4_B"), ] ) @@ -175,3 +192,81 @@ def pipeline_with_memory_datasets(): node(func=identity, inputs="Input2", outputs="MemOutput2", name="node2"), ] ) + + +@pytest.fixture +def pipeline_asymmetric(): + r""" + + (node1) + \ + (node3) (node2) + \ / + (node4) + + """ + return pipeline( + [ + node(first_arg, ["ds0_A"], ["_ds1"], name="node1"), + node(first_arg, ["ds0_B"], ["_ds2"], name="node2"), + node(first_arg, ["_ds1"], ["_ds3"], name="node3"), + node(first_arg, ["_ds2", "_ds3"], ["_ds4"], name="node4"), + ] + ) + + +@pytest.fixture +def pipeline_triangular(): + r""" + + (node1) + | \ + | (node2) + | / + (node3) + + """ + return pipeline( + [ + node(first_arg, ["ds0_A"], ["_ds1_A"], name="node1"), + node(first_arg, ["_ds1_A"], ["ds2_A"], name="node2"), + node(first_arg, ["ds2_A", "_ds1_A"], ["_ds3_A"], name="node3"), + ] + ) + + +@pytest.fixture +def empty_pipeline(): + return pipeline([]) + + +@pytest.fixture( + params=[(), ("dsX",), ("params:p",)], + ids=[ + "no_extras", + "extra_persistent_ds", + "extra_param", + ], +) +def two_branches_crossed_pipeline_variable_inputs(request): + """A ``Pipeline`` with an X-shape (two branches with one common node). + Non-persistent datasets (other than parameters) are prefixed with an underscore. + """ + extra_inputs = list(request.param) + + return pipeline( + [ + node(first_arg, ["ds0_A"] + extra_inputs, "_ds1_A", name="node1_A"), + node(first_arg, ["ds0_B"] + extra_inputs, "_ds1_B", name="node1_B"), + node( + multi_input_list_output, + ["_ds1_A", "_ds1_B"] + extra_inputs, + ["ds2_A", "ds2_B"], + name="node2", + ), + node(first_arg, ["ds2_A"] + extra_inputs, "_ds3_A", name="node3_A"), + node(first_arg, ["ds2_B"] + extra_inputs, "_ds3_B", name="node3_B"), + node(first_arg, ["_ds3_A"] + extra_inputs, "_ds4_A", name="node4_A"), + node(first_arg, ["_ds3_B"] + extra_inputs, "_ds4_B", name="node4_B"), + ] + ) diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index 0e28feed6d..546e4cface 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -17,6 +17,10 @@ from kedro.pipeline import node from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline from kedro.runner import SequentialRunner +from kedro.runner.runner import ( + find_all_required_nodes, + find_nodes_to_resume_from, +) from tests.runner.conftest import exception_fn, identity, sink, source @@ -252,18 +256,18 @@ def test_confirms(self, mocker, test_pipeline, is_async): fake_dataset_instance.confirm.assert_called_once_with() -@pytest.mark.parametrize( - "failing_node_names,expected_pattern", - [ - (["node1_A"], r"No nodes ran."), - (["node2"], r"(node1_A,node1_B|node1_B,node1_A)"), - (["node3_A"], r"(node3_A,node3_B|node3_B,node3_A)"), - (["node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"), - (["node3_A", "node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"), - (["node2", "node4_A"], r"(node1_A,node1_B|node1_B,node1_A)"), - ], -) class TestSuggestResumeScenario: + @pytest.mark.parametrize( + "failing_node_names,expected_pattern", + [ + (["node1_A"], r"No nodes ran."), + (["node2"], r"(node1_A,node1_B|node1_B,node1_A)"), + (["node3_A"], r"(node3_A,node3_B|node3_B,node3_A)"), + (["node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"), + (["node3_A", "node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"), + (["node2", "node4_A"], r"(node1_A,node1_B|node1_B,node1_A)"), + ], + ) def test_suggest_resume_scenario( self, caplog, @@ -286,6 +290,44 @@ def test_suggest_resume_scenario( ) assert re.search(expected_pattern, caplog.text) + @pytest.mark.parametrize( + "failing_node_names,expected_pattern", + [ + (["node1_A"], r"No nodes ran."), + (["node2"], r'"node1_A,node1_B"'), + (["node3_A"], r'"node3_A,node3_B"'), + (["node4_A"], r'"node3_A,node3_B"'), + (["node3_A", "node4_A"], r'"node3_A,node3_B"'), + (["node2", "node4_A"], r'"node1_A,node1_B"'), + ], + ) + def test_stricter_suggest_resume_scenario( + self, + caplog, + two_branches_crossed_pipeline_variable_inputs, + persistent_dataset_catalog, + failing_node_names, + expected_pattern, + ): + """ + Stricter version of previous test. + Covers pipelines where inputs are shared across nodes. + """ + test_pipeline = two_branches_crossed_pipeline_variable_inputs + + nodes = {n.name: n for n in test_pipeline.nodes} + for name in failing_node_names: + test_pipeline -= modular_pipeline([nodes[name]]) + test_pipeline += modular_pipeline([nodes[name]._copy(func=exception_fn)]) + + with pytest.raises(Exception, match="test exception"): + SequentialRunner().run( + test_pipeline, + persistent_dataset_catalog, + hook_manager=_create_hook_manager(), + ) + assert re.search(expected_pattern, caplog.text) + class TestMemoryDatasetBehaviour: def test_run_includes_memory_datasets(self, pipeline_with_memory_datasets): @@ -311,3 +353,151 @@ def test_run_includes_memory_datasets(self, pipeline_with_memory_datasets): assert ( "RegularOutput" not in output ) # This output is registered in DataCatalog and so should not be in free outputs + + +# TODO: move to separate test module? +@pytest.mark.parametrize( + "pipeline_name,remaining_node_names,expected_result", + [ + ("pipeline_asymmetric", {"node2", "node3", "node4"}, {"node1", "node2"}), + ("pipeline_asymmetric", {"node3", "node4"}, {"node1", "node2"}), + ("pipeline_asymmetric", {"node4"}, {"node1", "node2"}), + ("pipeline_asymmetric", set(), set()), + ("empty_pipeline", set(), set()), + ("pipeline_triangular", {"node3"}, {"node1"}), + ( + "two_branches_crossed_pipeline", + { + "node1_A", + "node1_B", + "node2", + "node3_A", + "node3_B", + "node4_A", + "node4_B", + }, + {"node1_A", "node1_B"}, + ), + ( + "two_branches_crossed_pipeline", + {"node2", "node3_A", "node3_B", "node4_A", "node4_B"}, + {"node1_A", "node1_B"}, + ), + ( + "two_branches_crossed_pipeline", + ["node3_A", "node3_B", "node4_A", "node4_B"], + {"node3_A", "node3_B"}, + ), + ( + "two_branches_crossed_pipeline", + ["node4_A", "node4_B"], + {"node3_A", "node3_B"}, + ), + ("two_branches_crossed_pipeline", ["node4_A"], {"node3_A"}), + ("two_branches_crossed_pipeline", ["node3_A", "node4_A"], {"node3_A"}), + ], +) +class TestResumeLogicBehaviour: + def test_simple_pipeline( + self, + pipeline_name, + persistent_dataset_catalog, + remaining_node_names, + expected_result, + request, + ): + """ + Test suggestion for simple pipelines with a mix of persistent + and memory datasets. + """ + test_pipeline = request.getfixturevalue(pipeline_name) + + remaining_nodes = test_pipeline.only_nodes(*remaining_node_names).nodes + result_node_names = find_nodes_to_resume_from( + test_pipeline, remaining_nodes, persistent_dataset_catalog + ) + assert expected_result == result_node_names + + def test_all_datasets_persistent( + self, + pipeline_name, + persistent_dataset_catalog, + remaining_node_names, + expected_result, + request, + ): + """ + Test suggestion for pipelines where all datasets are persisted: + In that case, exactly the set of remaining nodes should be re-run. + """ + test_pipeline = request.getfixturevalue(pipeline_name) + + catalog = DataCatalog( + dict.fromkeys( + test_pipeline.datasets(), + LambdaDataset(load=lambda: 42, save=lambda data: None), + ) + ) + + remaining_nodes = set(test_pipeline.only_nodes(*remaining_node_names).nodes) + result_node_names = find_nodes_to_resume_from( + test_pipeline, + remaining_nodes, + catalog, + ) + final_pipeline_nodes = set(test_pipeline.from_nodes(*result_node_names).nodes) + assert final_pipeline_nodes == remaining_nodes + + @pytest.mark.parametrize("extra_input", ["params:p", "dsY"]) + def test_added_shared_input( + self, + pipeline_name, + persistent_dataset_catalog, + remaining_node_names, + expected_result, + extra_input, + request, + ): + """ + Test suggestion for pipelines where a single persistent dataset or + parameter is shared across all nodes. These do not change and + therefore should not affect resume suggestion. + """ + test_pipeline = request.getfixturevalue(pipeline_name) + + # Add parameter shared across all nodes + test_pipeline = modular_pipeline( + [n._copy(inputs=[*n.inputs, extra_input]) for n in test_pipeline.nodes] + ) + + remaining_nodes = test_pipeline.only_nodes(*remaining_node_names).nodes + result_node_names = find_nodes_to_resume_from( + test_pipeline, remaining_nodes, persistent_dataset_catalog + ) + assert expected_result == result_node_names + + def test_suggestion_consistency( + self, + pipeline_name, + persistent_dataset_catalog, + remaining_node_names, + expected_result, + request, + ): + """ + Test that suggestions are internally consistent; pipeline generated + from resume nodes should exactly contain set of all required nodes. + """ + test_pipeline = request.getfixturevalue(pipeline_name) + + remaining_nodes = test_pipeline.only_nodes(*remaining_node_names).nodes + required_nodes = find_all_required_nodes( + test_pipeline, remaining_nodes, persistent_dataset_catalog + ) + resume_node_names = find_nodes_to_resume_from( + test_pipeline, remaining_nodes, persistent_dataset_catalog + ) + + assert set(required_nodes) == set( + test_pipeline.from_nodes(*resume_node_names).nodes + ) From 42c41a3824a6ffb03ae39011f8402f4b6008b69a Mon Sep 17 00:00:00 2001 From: Ondrej Zacha Date: Mon, 18 Mar 2024 08:26:09 +0100 Subject: [PATCH 2/9] Improve docstring Signed-off-by: Ondrej Zacha --- kedro/runner/runner.py | 10 +++++----- tests/runner/conftest.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 423c3938d7..a307d1c47f 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -215,7 +215,7 @@ def _suggest_resume_scenario( f"There are {len(remaining_nodes)} nodes that have not run.\n" "You can resume the pipeline run from the nearest nodes with " "persisted inputs by adding the following " - f"argument to your previous command: {postfix}" + f"argument to your previous command:\n{postfix}" ) @@ -251,8 +251,9 @@ def find_all_required_nodes( pipeline: Pipeline, unfinished_nodes: Iterable[Node], catalog: DataCatalog ) -> set[Node]: """Breadth-first search approach to finding the complete set of - persistent ancestors of an iterable of ``Node``s. Persistent - ancestors exclusively have persisted ``Dataset``s or parameters as inputs. + ``Node``s which need to run to cover all unfinished nodes, + including any additional nodes that should be re-run if their outputs + are not persisted. Args: pipeline: the ``Pipeline`` to find ancestors in. @@ -272,7 +273,7 @@ def find_all_required_nodes( current_node = queue.popleft() nodes_to_run.add(current_node) non_persistent_inputs = _enumerate_non_persistent_inputs(current_node, catalog) - # Look for the nodes that produce non-persistent inputs (if they exist) + # Look for the nodes that produce non-persistent inputs (if those exist) for node in _enumerate_nodes_with_outputs(pipeline, non_persistent_inputs): if node in visited: continue @@ -367,7 +368,6 @@ def find_initial_node_group(pipeline: Pipeline, nodes: Iterable[Node]) -> list[N """ node_names = set(n.name for n in nodes) if len(node_names) == 0: - # TODO: or raise? return [] sub_pipeline = pipeline.only_nodes(*node_names) initial_nodes = sub_pipeline.grouped_nodes[0] diff --git a/tests/runner/conftest.py b/tests/runner/conftest.py index 15d278d717..505c2c3f93 100644 --- a/tests/runner/conftest.py +++ b/tests/runner/conftest.py @@ -85,7 +85,7 @@ def _save(arg): "ds2_A": persistent_dataset, "ds2_B": persistent_dataset, "dsX": persistent_dataset, - "dsY": persistent_dataset, # TODO: names? + "dsY": persistent_dataset, "params:p": MemoryDataset(1), } ) From dd070310a825cbf37e11cc49278ea3d6d39a2a97 Mon Sep 17 00:00:00 2001 From: Ondrej Zacha Date: Mon, 18 Mar 2024 10:49:29 +0100 Subject: [PATCH 3/9] Make some methods private Signed-off-by: Ondrej Zacha --- kedro/runner/runner.py | 10 ++++++---- tests/runner/test_sequential_runner.py | 4 ++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index a307d1c47f..64d138ced6 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -237,17 +237,19 @@ def find_nodes_to_resume_from( the run. """ - all_nodes_that_need_to_run = find_all_required_nodes( + all_nodes_that_need_to_run = _find_all_required_nodes( pipeline, unfinished_nodes, catalog ) # Find which of the remaining nodes would need to run first (in topo sort) - persistent_ancestors = find_initial_node_group(pipeline, all_nodes_that_need_to_run) + persistent_ancestors = _find_initial_node_group( + pipeline, all_nodes_that_need_to_run + ) return {n.name for n in persistent_ancestors} -def find_all_required_nodes( +def _find_all_required_nodes( pipeline: Pipeline, unfinished_nodes: Iterable[Node], catalog: DataCatalog ) -> set[Node]: """Breadth-first search approach to finding the complete set of @@ -351,7 +353,7 @@ def _enumerate_nodes_with_outputs( return parent_pipeline.nodes -def find_initial_node_group(pipeline: Pipeline, nodes: Iterable[Node]) -> list[Node]: +def _find_initial_node_group(pipeline: Pipeline, nodes: Iterable[Node]) -> list[Node]: """Given a collection of ``Node``s in a ``Pipeline``, find the initial group of ``Node``s to be run (in topological order). diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index 546e4cface..50f4b51b35 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -18,7 +18,7 @@ from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline from kedro.runner import SequentialRunner from kedro.runner.runner import ( - find_all_required_nodes, + _find_all_required_nodes, find_nodes_to_resume_from, ) from tests.runner.conftest import exception_fn, identity, sink, source @@ -491,7 +491,7 @@ def test_suggestion_consistency( test_pipeline = request.getfixturevalue(pipeline_name) remaining_nodes = test_pipeline.only_nodes(*remaining_node_names).nodes - required_nodes = find_all_required_nodes( + required_nodes = _find_all_required_nodes( test_pipeline, remaining_nodes, persistent_dataset_catalog ) resume_node_names = find_nodes_to_resume_from( From 0855d9efd8678da39c09ebafe43556789286f013 Mon Sep 17 00:00:00 2001 From: Ondrej Zacha Date: Mon, 18 Mar 2024 11:29:37 +0100 Subject: [PATCH 4/9] Simplify _nodes_with_external_inputs Signed-off-by: Ondrej Zacha --- kedro/runner/runner.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 64d138ced6..03e34dfe52 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -268,7 +268,7 @@ def _find_all_required_nodes( """ nodes_to_run = set(unfinished_nodes) - initial_nodes = _nodes_with_external_inputs(pipeline, unfinished_nodes) + initial_nodes = _nodes_with_external_inputs(unfinished_nodes) queue, visited = deque(initial_nodes), set(initial_nodes) while queue: @@ -288,14 +288,11 @@ def _find_all_required_nodes( return set(nodes_to_run) -def _nodes_with_external_inputs( - pipeline: Pipeline, nodes_of_interest: Iterable[Node] -) -> set[Node]: - """For given ``Node``s in a ``Pipeline``, find their - subset which depends on external inputs of the ``Pipeline``. +def _nodes_with_external_inputs(nodes_of_interest: Iterable[Node]) -> set[Node]: + """For given ``Node``s , find their subset which depends on + external inputs of the ``Pipeline`` they constitute. Args: - pipeline: the ``Pipeline`` to search for nodes in. nodes_of_interest: the ``Node``s to analyze. Returns: @@ -303,7 +300,7 @@ def _nodes_with_external_inputs( of nodes of interest. """ - p_nodes_of_interest = pipeline.only_nodes(*(n.name for n in nodes_of_interest)) + p_nodes_of_interest = Pipeline(nodes_of_interest) p_nodes_with_external_inputs = p_nodes_of_interest.only_nodes_with_inputs( *p_nodes_of_interest.inputs() ) From 82ff176e214f2370995ec4a8d44b81ad015841ea Mon Sep 17 00:00:00 2001 From: Ondrej Zacha Date: Wed, 20 Mar 2024 20:11:22 +0100 Subject: [PATCH 5/9] PR comments - Use _EPHEMERAL attribute - Move tests to separate file - Docstring updates Signed-off-by: Ondrej Zacha --- kedro/runner/runner.py | 33 +++--- tests/runner/test_resume_logic.py | 158 +++++++++++++++++++++++++ tests/runner/test_sequential_runner.py | 152 ------------------------ 3 files changed, 174 insertions(+), 169 deletions(-) create mode 100644 tests/runner/test_resume_logic.py diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 03e34dfe52..b7c4d3d026 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -198,7 +198,7 @@ def _suggest_resume_scenario( postfix = "" if done_nodes: - start_node_names = find_nodes_to_resume_from( + start_node_names = _find_nodes_to_resume_from( pipeline=pipeline, unfinished_nodes=remaining_nodes, catalog=catalog, @@ -219,7 +219,7 @@ def _suggest_resume_scenario( ) -def find_nodes_to_resume_from( +def _find_nodes_to_resume_from( pipeline: Pipeline, unfinished_nodes: Collection[Node], catalog: DataCatalog ) -> set[str]: """Given a collection of unfinished nodes in a pipeline using @@ -237,14 +237,10 @@ def find_nodes_to_resume_from( the run. """ - all_nodes_that_need_to_run = _find_all_required_nodes( - pipeline, unfinished_nodes, catalog - ) + nodes_to_be_run = _find_all_required_nodes(pipeline, unfinished_nodes, catalog) # Find which of the remaining nodes would need to run first (in topo sort) - persistent_ancestors = _find_initial_node_group( - pipeline, all_nodes_that_need_to_run - ) + persistent_ancestors = _find_initial_node_group(pipeline, nodes_to_be_run) return {n.name for n in persistent_ancestors} @@ -258,13 +254,13 @@ def _find_all_required_nodes( are not persisted. Args: - pipeline: the ``Pipeline`` to find ancestors in. - children: the iterable containing ``Node``s to find ancestors of. + pipeline: the ``Pipeline`` to analyze. + unfinished_nodes: the iterable of ``Node``s which have not finished yet. catalog: the ``DataCatalog`` of the run. Returns: - A set containing first persistent ancestors of the given - ``Node``s. + A set containing all input unfinished ``Node``s and all remaining + ``Node``s that need to run in case their outputs are not persisted. """ nodes_to_run = set(unfinished_nodes) @@ -283,14 +279,15 @@ def _find_all_required_nodes( queue.append(node) # Make sure no downstream tasks are skipped - nodes_to_run = pipeline.from_nodes(*(n.name for n in nodes_to_run)).nodes + nodes_to_run = set(pipeline.from_nodes(*(n.name for n in nodes_to_run)).nodes) - return set(nodes_to_run) + return nodes_to_run def _nodes_with_external_inputs(nodes_of_interest: Iterable[Node]) -> set[Node]: """For given ``Node``s , find their subset which depends on - external inputs of the ``Pipeline`` they constitute. + external inputs of the ``Pipeline`` they constitute. External inputs + are pipeline inputs not produced by other ``Node``s in the ``Pipeline``. Args: nodes_of_interest: the ``Node``s to analyze. @@ -324,8 +321,10 @@ def _enumerate_non_persistent_inputs(node: Node, catalog: DataCatalog) -> set[st for node_input in node.inputs: if node_input.startswith("params:"): continue - if node_input not in catalog_datasets or isinstance( - catalog_datasets[node_input], MemoryDataset + + if ( + node_input not in catalog_datasets + or catalog_datasets[node_input]._EPHEMERAL ): non_persistent_inputs.add(node_input) diff --git a/tests/runner/test_resume_logic.py b/tests/runner/test_resume_logic.py new file mode 100644 index 0000000000..8d17a07eb0 --- /dev/null +++ b/tests/runner/test_resume_logic.py @@ -0,0 +1,158 @@ +import pytest + +from kedro.io import ( + DataCatalog, + LambdaDataset, +) +from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline +from kedro.runner.runner import ( + _find_all_required_nodes, + _find_nodes_to_resume_from, +) + + +@pytest.mark.parametrize( + "pipeline_name,remaining_node_names,expected_result", + [ + ("pipeline_asymmetric", {"node2", "node3", "node4"}, {"node1", "node2"}), + ("pipeline_asymmetric", {"node3", "node4"}, {"node1", "node2"}), + ("pipeline_asymmetric", {"node4"}, {"node1", "node2"}), + ("pipeline_asymmetric", set(), set()), + ("empty_pipeline", set(), set()), + ("pipeline_triangular", {"node3"}, {"node1"}), + ( + "two_branches_crossed_pipeline", + { + "node1_A", + "node1_B", + "node2", + "node3_A", + "node3_B", + "node4_A", + "node4_B", + }, + {"node1_A", "node1_B"}, + ), + ( + "two_branches_crossed_pipeline", + {"node2", "node3_A", "node3_B", "node4_A", "node4_B"}, + {"node1_A", "node1_B"}, + ), + ( + "two_branches_crossed_pipeline", + ["node3_A", "node3_B", "node4_A", "node4_B"], + {"node3_A", "node3_B"}, + ), + ( + "two_branches_crossed_pipeline", + ["node4_A", "node4_B"], + {"node3_A", "node3_B"}, + ), + ("two_branches_crossed_pipeline", ["node4_A"], {"node3_A"}), + ("two_branches_crossed_pipeline", ["node3_A", "node4_A"], {"node3_A"}), + ], +) +class TestResumeLogicBehaviour: + def test_simple_pipeline( + self, + pipeline_name, + persistent_dataset_catalog, + remaining_node_names, + expected_result, + request, + ): + """ + Test suggestion for simple pipelines with a mix of persistent + and memory datasets. + """ + test_pipeline = request.getfixturevalue(pipeline_name) + + remaining_nodes = test_pipeline.only_nodes(*remaining_node_names).nodes + result_node_names = _find_nodes_to_resume_from( + test_pipeline, remaining_nodes, persistent_dataset_catalog + ) + assert expected_result == result_node_names + + def test_all_datasets_persistent( + self, + pipeline_name, + persistent_dataset_catalog, + remaining_node_names, + expected_result, + request, + ): + """ + Test suggestion for pipelines where all datasets are persisted: + In that case, exactly the set of remaining nodes should be re-run. + """ + test_pipeline = request.getfixturevalue(pipeline_name) + + catalog = DataCatalog( + dict.fromkeys( + test_pipeline.datasets(), + LambdaDataset(load=lambda: 42, save=lambda data: None), + ) + ) + + remaining_nodes = set(test_pipeline.only_nodes(*remaining_node_names).nodes) + result_node_names = _find_nodes_to_resume_from( + test_pipeline, + remaining_nodes, + catalog, + ) + final_pipeline_nodes = set(test_pipeline.from_nodes(*result_node_names).nodes) + assert final_pipeline_nodes == remaining_nodes + + @pytest.mark.parametrize("extra_input", ["params:p", "dsY"]) + def test_added_shared_input( + self, + pipeline_name, + persistent_dataset_catalog, + remaining_node_names, + expected_result, + extra_input, + request, + ): + """ + Test suggestion for pipelines where a single persistent dataset or + parameter is shared across all nodes. These do not change and + therefore should not affect resume suggestion. + """ + test_pipeline = request.getfixturevalue(pipeline_name) + + # Add parameter shared across all nodes + test_pipeline = modular_pipeline( + [n._copy(inputs=[*n.inputs, extra_input]) for n in test_pipeline.nodes] + ) + + remaining_nodes = test_pipeline.only_nodes(*remaining_node_names).nodes + result_node_names = _find_nodes_to_resume_from( + test_pipeline, remaining_nodes, persistent_dataset_catalog + ) + assert expected_result == result_node_names + + def test_suggestion_consistency( + self, + pipeline_name, + persistent_dataset_catalog, + remaining_node_names, + expected_result, + request, + ): + """ + Test that suggestions are internally consistent; pipeline generated + from resume nodes should exactly contain set of all required nodes. + """ + test_pipeline = request.getfixturevalue(pipeline_name) + + remaining_nodes = test_pipeline.only_nodes(*remaining_node_names).nodes + required_nodes = _find_all_required_nodes( + test_pipeline, remaining_nodes, persistent_dataset_catalog + ) + resume_node_names = _find_nodes_to_resume_from( + test_pipeline, remaining_nodes, persistent_dataset_catalog + ) + + assert set(required_nodes) == set( + test_pipeline.from_nodes(*resume_node_names).nodes + ) diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index 50f4b51b35..a49c6e397a 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -17,10 +17,6 @@ from kedro.pipeline import node from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline from kedro.runner import SequentialRunner -from kedro.runner.runner import ( - _find_all_required_nodes, - find_nodes_to_resume_from, -) from tests.runner.conftest import exception_fn, identity, sink, source @@ -353,151 +349,3 @@ def test_run_includes_memory_datasets(self, pipeline_with_memory_datasets): assert ( "RegularOutput" not in output ) # This output is registered in DataCatalog and so should not be in free outputs - - -# TODO: move to separate test module? -@pytest.mark.parametrize( - "pipeline_name,remaining_node_names,expected_result", - [ - ("pipeline_asymmetric", {"node2", "node3", "node4"}, {"node1", "node2"}), - ("pipeline_asymmetric", {"node3", "node4"}, {"node1", "node2"}), - ("pipeline_asymmetric", {"node4"}, {"node1", "node2"}), - ("pipeline_asymmetric", set(), set()), - ("empty_pipeline", set(), set()), - ("pipeline_triangular", {"node3"}, {"node1"}), - ( - "two_branches_crossed_pipeline", - { - "node1_A", - "node1_B", - "node2", - "node3_A", - "node3_B", - "node4_A", - "node4_B", - }, - {"node1_A", "node1_B"}, - ), - ( - "two_branches_crossed_pipeline", - {"node2", "node3_A", "node3_B", "node4_A", "node4_B"}, - {"node1_A", "node1_B"}, - ), - ( - "two_branches_crossed_pipeline", - ["node3_A", "node3_B", "node4_A", "node4_B"], - {"node3_A", "node3_B"}, - ), - ( - "two_branches_crossed_pipeline", - ["node4_A", "node4_B"], - {"node3_A", "node3_B"}, - ), - ("two_branches_crossed_pipeline", ["node4_A"], {"node3_A"}), - ("two_branches_crossed_pipeline", ["node3_A", "node4_A"], {"node3_A"}), - ], -) -class TestResumeLogicBehaviour: - def test_simple_pipeline( - self, - pipeline_name, - persistent_dataset_catalog, - remaining_node_names, - expected_result, - request, - ): - """ - Test suggestion for simple pipelines with a mix of persistent - and memory datasets. - """ - test_pipeline = request.getfixturevalue(pipeline_name) - - remaining_nodes = test_pipeline.only_nodes(*remaining_node_names).nodes - result_node_names = find_nodes_to_resume_from( - test_pipeline, remaining_nodes, persistent_dataset_catalog - ) - assert expected_result == result_node_names - - def test_all_datasets_persistent( - self, - pipeline_name, - persistent_dataset_catalog, - remaining_node_names, - expected_result, - request, - ): - """ - Test suggestion for pipelines where all datasets are persisted: - In that case, exactly the set of remaining nodes should be re-run. - """ - test_pipeline = request.getfixturevalue(pipeline_name) - - catalog = DataCatalog( - dict.fromkeys( - test_pipeline.datasets(), - LambdaDataset(load=lambda: 42, save=lambda data: None), - ) - ) - - remaining_nodes = set(test_pipeline.only_nodes(*remaining_node_names).nodes) - result_node_names = find_nodes_to_resume_from( - test_pipeline, - remaining_nodes, - catalog, - ) - final_pipeline_nodes = set(test_pipeline.from_nodes(*result_node_names).nodes) - assert final_pipeline_nodes == remaining_nodes - - @pytest.mark.parametrize("extra_input", ["params:p", "dsY"]) - def test_added_shared_input( - self, - pipeline_name, - persistent_dataset_catalog, - remaining_node_names, - expected_result, - extra_input, - request, - ): - """ - Test suggestion for pipelines where a single persistent dataset or - parameter is shared across all nodes. These do not change and - therefore should not affect resume suggestion. - """ - test_pipeline = request.getfixturevalue(pipeline_name) - - # Add parameter shared across all nodes - test_pipeline = modular_pipeline( - [n._copy(inputs=[*n.inputs, extra_input]) for n in test_pipeline.nodes] - ) - - remaining_nodes = test_pipeline.only_nodes(*remaining_node_names).nodes - result_node_names = find_nodes_to_resume_from( - test_pipeline, remaining_nodes, persistent_dataset_catalog - ) - assert expected_result == result_node_names - - def test_suggestion_consistency( - self, - pipeline_name, - persistent_dataset_catalog, - remaining_node_names, - expected_result, - request, - ): - """ - Test that suggestions are internally consistent; pipeline generated - from resume nodes should exactly contain set of all required nodes. - """ - test_pipeline = request.getfixturevalue(pipeline_name) - - remaining_nodes = test_pipeline.only_nodes(*remaining_node_names).nodes - required_nodes = _find_all_required_nodes( - test_pipeline, remaining_nodes, persistent_dataset_catalog - ) - resume_node_names = find_nodes_to_resume_from( - test_pipeline, remaining_nodes, persistent_dataset_catalog - ) - - assert set(required_nodes) == set( - test_pipeline.from_nodes(*resume_node_names).nodes - ) From dd6472272397d1f42d1db322238a5faa24ffb5a9 Mon Sep 17 00:00:00 2001 From: Ondrej Zacha Date: Thu, 28 Mar 2024 21:19:43 +0100 Subject: [PATCH 6/9] Improve comment Signed-off-by: Ondrej Zacha --- kedro/runner/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index b7c4d3d026..a78cd764c8 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -270,8 +270,8 @@ def _find_all_required_nodes( while queue: current_node = queue.popleft() nodes_to_run.add(current_node) + # Look for parent nodes which produce non-persistent inputs (if those exist) non_persistent_inputs = _enumerate_non_persistent_inputs(current_node, catalog) - # Look for the nodes that produce non-persistent inputs (if those exist) for node in _enumerate_nodes_with_outputs(pipeline, non_persistent_inputs): if node in visited: continue From 24123d9217ab13890d694bc931ad23495002dbf6 Mon Sep 17 00:00:00 2001 From: Ondrej Zacha Date: Thu, 28 Mar 2024 21:20:10 +0100 Subject: [PATCH 7/9] Update RELEASE.md Signed-off-by: Ondrej Zacha --- RELEASE.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/RELEASE.md b/RELEASE.md index d50e1b936d..d3b6e7f061 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -7,6 +7,7 @@ * Kedro CLI now provides a better error message when project commands are run outside of a project i.e. `kedro run`. * Dropped the dependency on `toposort` in favour of the built-in `graphlib` module. * Improve the performance of `Pipeline` object creation and summing. +* Improve suggestions to resume failed pipeline runs. ## Bug fixes and other changes * Updated `kedro pipeline create` and `kedro pipeline delete` to read the base environment from the project settings. @@ -18,6 +19,9 @@ ## Documentation changes ## Community contributions +Many thanks to the following Kedroids for contributing PRs to this release: + +* [ondrejzacha](https://github.com/ondrejzacha) # Release 0.19.3 From 53a248ae278ff9e7017f86688075a015ea8e8db5 Mon Sep 17 00:00:00 2001 From: Ondrej Zacha Date: Fri, 29 Mar 2024 11:50:12 +0100 Subject: [PATCH 8/9] Rename _fina_all_required_nodes Signed-off-by: Ondrej Zacha --- kedro/runner/runner.py | 6 ++++-- tests/runner/conftest.py | 26 ++++++++++++++++++++++++-- tests/runner/test_resume_logic.py | 8 ++++---- 3 files changed, 32 insertions(+), 8 deletions(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index a78cd764c8..f9cdd08798 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -237,7 +237,9 @@ def _find_nodes_to_resume_from( the run. """ - nodes_to_be_run = _find_all_required_nodes(pipeline, unfinished_nodes, catalog) + nodes_to_be_run = _find_all_nodes_for_resumed_pipeline( + pipeline, unfinished_nodes, catalog + ) # Find which of the remaining nodes would need to run first (in topo sort) persistent_ancestors = _find_initial_node_group(pipeline, nodes_to_be_run) @@ -245,7 +247,7 @@ def _find_nodes_to_resume_from( return {n.name for n in persistent_ancestors} -def _find_all_required_nodes( +def _find_all_nodes_for_resumed_pipeline( pipeline: Pipeline, unfinished_nodes: Iterable[Node], catalog: DataCatalog ) -> set[Node]: """Breadth-first search approach to finding the complete set of diff --git a/tests/runner/conftest.py b/tests/runner/conftest.py index 505c2c3f93..629000686f 100644 --- a/tests/runner/conftest.py +++ b/tests/runner/conftest.py @@ -221,9 +221,31 @@ def pipeline_triangular(): (node1) | \ - | (node2) + | [node2] | / - (node3) + [node3*] + + """ + return pipeline( + [ + node(first_arg, ["ds0_A"], ["_ds1_A"], name="node1"), + node(first_arg, ["_ds1_A"], ["ds2_A"], name="node2"), + node(first_arg, ["ds2_A", "_ds1_A"], ["_ds3_A"], name="node3"), + ] + ) + + +@pytest.fixture +def pipeline_triangular2(): + r""" + + (node1) + | \ + | [node1b] + | | + | [node2] + | / + [node3 +] """ return pipeline( diff --git a/tests/runner/test_resume_logic.py b/tests/runner/test_resume_logic.py index 8d17a07eb0..bd1f8e8acb 100644 --- a/tests/runner/test_resume_logic.py +++ b/tests/runner/test_resume_logic.py @@ -6,7 +6,7 @@ ) from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline from kedro.runner.runner import ( - _find_all_required_nodes, + _find_all_nodes_for_resumed_pipeline, _find_nodes_to_resume_from, ) @@ -146,13 +146,13 @@ def test_suggestion_consistency( test_pipeline = request.getfixturevalue(pipeline_name) remaining_nodes = test_pipeline.only_nodes(*remaining_node_names).nodes - required_nodes = _find_all_required_nodes( + required_nodes = _find_all_nodes_for_resumed_pipeline( test_pipeline, remaining_nodes, persistent_dataset_catalog ) resume_node_names = _find_nodes_to_resume_from( test_pipeline, remaining_nodes, persistent_dataset_catalog ) - assert set(required_nodes) == set( - test_pipeline.from_nodes(*resume_node_names).nodes + assert set(n.name for n in required_nodes) == set( + n.name for n in test_pipeline.from_nodes(*resume_node_names).nodes ) From 8163d9f61bf19d9d1a14d071d277712449f57dfa Mon Sep 17 00:00:00 2001 From: Nok Date: Tue, 2 Apr 2024 12:48:02 +0000 Subject: [PATCH 9/9] remove unnecessary settings Signed-off-by: Nok --- docs/source/conf.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/source/conf.py b/docs/source/conf.py index 69516ebbc8..21f56f4c04 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -243,9 +243,9 @@ "https://github.com/kedro-org/kedro/blob/main/kedro/templates/project/%7B%7B%20cookiecutter.repo_name%20%7D%7D/.flake8", ] -# retry before render a link broken (fix for "too many requests") -linkcheck_retries = 5 -linkcheck_rate_limit_timeout = 2.0 +# Remove settings to fix Client Error 429 +# linkcheck_retries = 5 +# linkcheck_rate_limit_timeout = 2.0 html_context = { "display_github": True,