Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't toposort nodes in non-user-facing operations #3146

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Major features and improvements
* Improved error message when passing wrong value to node.
* Improved performance of pipeline operations by using non-toposorted nodes in internal code.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Improved performance of pipeline operations by using non-toposorted nodes in internal code.
* Improved performance of pipeline operations by avoiding unnecessary copies of the nodes.

* Cookiecutter errors are shown in short format without the `--verbose` flag.
* Kedro commands now work from any subdirectory within a Kedro project.
* Kedro CLI now provides a better error message when project commands are run outside of a project i.e. `kedro run`.
Expand Down
32 changes: 17 additions & 15 deletions kedro/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def __init__(

nodes_chain = list(
chain.from_iterable(
[[n] if isinstance(n, Node) else n.nodes for n in nodes_list]
[[n] if isinstance(n, Node) else n._nodes for n in nodes_list]
)
)
_validate_transcoded_inputs_outputs(nodes_chain)
Expand All @@ -163,15 +163,15 @@ def __init__(
for output in node.outputs:
self._nodes_by_output[_strip_transcoding(output)] = node

self._nodes = tagged_nodes
self._nodes: list[Node] = tagged_nodes
self._topo_sorted_nodes = _topologically_sorted(self.node_dependencies)

def __repr__(self) -> str: # pragma: no cover
"""Pipeline ([node1, ..., node10 ...], name='pipeline_name')"""
max_nodes_to_display = 10

nodes_reprs = [repr(node) for node in self.nodes[:max_nodes_to_display]]
if len(self.nodes) > max_nodes_to_display:
if len(self._nodes) > max_nodes_to_display:
nodes_reprs.append("...")
sep = ",\n"
nodes_reprs_str = f"[\n{sep.join(nodes_reprs)}\n]" if nodes_reprs else "[]"
Expand All @@ -181,7 +181,7 @@ def __repr__(self) -> str: # pragma: no cover
def __add__(self, other: Any) -> Pipeline:
if not isinstance(other, Pipeline):
return NotImplemented
return Pipeline(set(self.nodes + other.nodes))
return Pipeline(set(self._nodes + other._nodes))

def __radd__(self, other: Any) -> Pipeline:
if isinstance(other, int) and other == 0:
Expand All @@ -191,17 +191,17 @@ def __radd__(self, other: Any) -> Pipeline:
def __sub__(self, other: Any) -> Pipeline:
if not isinstance(other, Pipeline):
return NotImplemented
return Pipeline(set(self.nodes) - set(other.nodes))
return Pipeline(set(self._nodes) - set(other._nodes))

def __and__(self, other: Any) -> Pipeline:
if not isinstance(other, Pipeline):
return NotImplemented
return Pipeline(set(self.nodes) & set(other.nodes))
return Pipeline(set(self._nodes) & set(other._nodes))

def __or__(self, other: Any) -> Pipeline:
if not isinstance(other, Pipeline):
return NotImplemented
return Pipeline(set(self.nodes + other.nodes))
return Pipeline(set(self._nodes + other._nodes))

def all_inputs(self) -> set[str]:
"""All inputs for all nodes in the pipeline.
Expand All @@ -210,7 +210,7 @@ def all_inputs(self) -> set[str]:
All node input names as a Set.

"""
return set.union(set(), *(node.inputs for node in self.nodes))
return set.union(set(), *(node.inputs for node in self._nodes))

def all_outputs(self) -> set[str]:
"""All outputs of all nodes in the pipeline.
Expand All @@ -219,7 +219,7 @@ def all_outputs(self) -> set[str]:
All node outputs.

"""
return set.union(set(), *(node.outputs for node in self.nodes))
return set.union(set(), *(node.outputs for node in self._nodes))

def _remove_intermediates(self, datasets: set[str]) -> set[str]:
intermediate = {_strip_transcoding(i) for i in self.all_inputs()} & {
Expand Down Expand Up @@ -417,7 +417,7 @@ def only_nodes_with_namespace(self, node_namespace: str) -> Pipeline:
"""
nodes = [
n
for n in self.nodes
for n in self._nodes
if n.namespace and n.namespace.startswith(node_namespace)
]
if not nodes:
Expand Down Expand Up @@ -675,8 +675,10 @@ def only_nodes_with_tags(self, *tags: str) -> Pipeline:
nodes of the current one such that only nodes containing *any*
of the tags provided are being copied.
"""

unique_tags = set(tags)
nodes = [node for node in self.nodes if unique_tags & node.tags]
nodes = [node for node in self._nodes if unique_tags & node.tags]

return Pipeline(nodes)

def filter( # noqa: PLR0913
Expand Down Expand Up @@ -760,11 +762,11 @@ def filter( # noqa: PLR0913
# would give different outcomes depending on the order of filter methods:
# only_nodes and then from_inputs would give node1, while only_nodes and then
# from_inputs would give node1 and node3.
filtered_pipeline = Pipeline(self.nodes)
filtered_pipeline = Pipeline(self._nodes)
for subset_pipeline in subset_pipelines:
filtered_pipeline &= subset_pipeline

if not filtered_pipeline.nodes:
if not filtered_pipeline._nodes:
raise ValueError(
"Pipeline contains no nodes after applying all provided filters"
)
Expand All @@ -779,7 +781,7 @@ def tag(self, tags: str | Iterable[str]) -> Pipeline:
Returns:
New ``Pipeline`` object with nodes tagged.
"""
nodes = [n.tag(tags) for n in self.nodes]
nodes = [n.tag(tags) for n in self._nodes]
return Pipeline(nodes)

def to_json(self) -> str:
Expand Down Expand Up @@ -816,7 +818,7 @@ def _check_node(node_: Node, pipeline_: Pipeline | None = None) -> None:
if isinstance(each, Node):
_check_node(each)
elif isinstance(each, Pipeline):
for node in each.nodes:
for node in each._nodes:
_check_node(node, pipeline_=each)

if duplicates:
Expand Down
Loading