Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't toposort nodes in non-user-facing operations #3146

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Many thanks to the following Kedroids for contributing PRs to this release:
## Bug fixes and other changes
* Updated dataset factories to resolve nested catalog config properly.
* Updated `OmegaConfigLoader` to handle paths containing dots outside of `conf_source`.
* Improved performance of pipeline operations by not topologically sorting nodes in non-user-facing code.
* Made `settings.py` optional.

## Documentation changes
Expand Down
30 changes: 15 additions & 15 deletions kedro/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def __init__(

nodes = list(
chain.from_iterable(
[[n] if isinstance(n, Node) else n.nodes for n in nodes]
[[n] if isinstance(n, Node) else n._nodes for n in nodes]
)
)
_validate_transcoded_inputs_outputs(nodes)
Expand All @@ -163,15 +163,15 @@ def __init__(
for output in node.outputs:
self._nodes_by_output[_strip_transcoding(output)] = node

self._nodes = nodes
self._nodes: list[Node] = nodes
self._topo_sorted_nodes = _topologically_sorted(self.node_dependencies)

def __repr__(self): # pragma: no cover
"""Pipeline ([node1, ..., node10 ...], name='pipeline_name')"""
max_nodes_to_display = 10

nodes_reprs = [repr(node) for node in self.nodes[:max_nodes_to_display]]
if len(self.nodes) > max_nodes_to_display:
if len(self._nodes) > max_nodes_to_display:
nodes_reprs.append("...")
sep = ",\n"
nodes_reprs_str = f"[\n{sep.join(nodes_reprs)}\n]" if nodes_reprs else "[]"
Expand All @@ -181,7 +181,7 @@ def __repr__(self): # pragma: no cover
def __add__(self, other):
if not isinstance(other, Pipeline):
return NotImplemented
return Pipeline(set(self.nodes + other.nodes))
return Pipeline(set(self._nodes + other._nodes))

def __radd__(self, other):
if isinstance(other, int) and other == 0:
Expand All @@ -191,17 +191,17 @@ def __radd__(self, other):
def __sub__(self, other):
if not isinstance(other, Pipeline):
return NotImplemented
return Pipeline(set(self.nodes) - set(other.nodes))
return Pipeline(set(self._nodes) - set(other._nodes))

def __and__(self, other):
if not isinstance(other, Pipeline):
return NotImplemented
return Pipeline(set(self.nodes) & set(other.nodes))
return Pipeline(set(self._nodes) & set(other._nodes))

def __or__(self, other):
if not isinstance(other, Pipeline):
return NotImplemented
return Pipeline(set(self.nodes + other.nodes))
return Pipeline(set(self._nodes + other._nodes))

def all_inputs(self) -> set[str]:
"""All inputs for all nodes in the pipeline.
Expand All @@ -210,7 +210,7 @@ def all_inputs(self) -> set[str]:
All node input names as a Set.

"""
return set.union(set(), *(node.inputs for node in self.nodes))
return set.union(set(), *(node.inputs for node in self._nodes))

def all_outputs(self) -> set[str]:
"""All outputs of all nodes in the pipeline.
Expand All @@ -219,7 +219,7 @@ def all_outputs(self) -> set[str]:
All node outputs.

"""
return set.union(set(), *(node.outputs for node in self.nodes))
return set.union(set(), *(node.outputs for node in self._nodes))

def _remove_intermediates(self, datasets: set[str]) -> set[str]:
intermediate = {_strip_transcoding(i) for i in self.all_inputs()} & {
Expand Down Expand Up @@ -417,7 +417,7 @@ def only_nodes_with_namespace(self, node_namespace: str) -> Pipeline:
"""
nodes = [
n
for n in self.nodes
for n in self._nodes
if n.namespace and n.namespace.startswith(node_namespace)
]
if not nodes:
Expand Down Expand Up @@ -676,7 +676,7 @@ def only_nodes_with_tags(self, *tags: str) -> Pipeline:
of the tags provided are being copied.
"""
tags = set(tags)
nodes = [node for node in self.nodes if tags & node.tags]
nodes = (node for node in self._nodes if tags & node.tags)
return Pipeline(nodes)

def filter( # noqa: PLR0913
Expand Down Expand Up @@ -760,11 +760,11 @@ def filter( # noqa: PLR0913
# would give different outcomes depending on the order of filter methods:
# only_nodes and then from_inputs would give node1, while only_nodes and then
# from_inputs would give node1 and node3.
filtered_pipeline = Pipeline(self.nodes)
filtered_pipeline = Pipeline(self._nodes)
for subset_pipeline in subset_pipelines:
filtered_pipeline &= subset_pipeline

if not filtered_pipeline.nodes:
if not filtered_pipeline._nodes:
raise ValueError(
"Pipeline contains no nodes after applying all provided filters"
)
Expand All @@ -779,7 +779,7 @@ def tag(self, tags: str | Iterable[str]) -> Pipeline:
Returns:
New ``Pipeline`` object with nodes tagged.
"""
nodes = [n.tag(tags) for n in self.nodes]
nodes = [n.tag(tags) for n in self._nodes]
return Pipeline(nodes)

def to_json(self):
Expand Down Expand Up @@ -816,7 +816,7 @@ def _check_node(node_: Node, pipeline_: Pipeline = None):
if isinstance(each, Node):
_check_node(each)
elif isinstance(each, Pipeline):
for node in each.nodes:
for node in each._nodes:
_check_node(node, pipeline_=each)

if duplicates:
Expand Down