Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KED-1570] Calculate layers order and return it to the frontend #148

Merged
merged 23 commits into from
Apr 23, 2020
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 147 additions & 28 deletions package/kedro_viz/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from collections import defaultdict
from contextlib import closing
from pathlib import Path
from typing import Dict
from typing import Dict, List, Set

import click
import kedro
Expand All @@ -47,6 +47,7 @@
from kedro.cli import get_project_context # pylint: disable=ungrouped-imports
from kedro.cli.utils import KedroCliError # pylint: disable=ungrouped-imports
from semver import match
from toposort import toposort_flatten

from kedro_viz.utils import wait_for

Expand Down Expand Up @@ -180,6 +181,95 @@ def _get_pipeline_catalog_from_kedro14(env):
raise KedroCliError(ERROR_PROJECT_ROOT)


def _sort_layers(nodes: Dict[str, Dict], node_dependencies: Dict[str, Set[str]]) -> List[str]:
limdauto marked this conversation as resolved.
Show resolved Hide resolved
"""Given a DAG represented by a dictionary of nodes, some of which have a `layer` attribute,
along with their dependencies, return the list of all layers sorted according to
the nodes' topological order, i.e. a layer should appear before another layer in the list
if its node is a dependency of the other layer's node, directly or indirectly.

For example, given the following graph:
node1(layer=a) -> node2 -> node4 -> node6(layer=d)
| ^
v |
node3(layer=b) -> node5(layer=c)
The layers ordering should be: [a, b, c, d]

The algorithm is as follows:
* For every node, find all layers that depends on it in a depth-first search (dfs).
* While traversing, build up a dictionary of {node_id -> layers} for the node
that has already been visited.
* Turn the final {node_id -> layers} into a {layer -> layers} to represent the layers'
dependencies. Note: the key is a layer and the values are the parents of that layer,
just because that's the format toposort requires.
* Feed this layers dictionary to ``toposort`` and return the sorted values.
* Raise CircularDependencyError if the layers cannot be sorted topologically,
i.e. there are cycles among the layers.

Args:
nodes: A dictionary of {node_id -> node} represents the nodes in the graph.
A node's schema is:
{
"type": str,
"id": str,
"name": str,
"layer": Optional[str]
...
}
node_dependencies: A dictionary of {node_id -> set(child_node_ids)}
limdauto marked this conversation as resolved.
Show resolved Hide resolved
represents the direct dependencies between nodes in the graph.

Returns:
The list of layers sorted based on topological order.

Raises:
CircularDependencyError: When the layers have cyclic dependencies.
"""
node_layers = {} # map node_id to the layers that depend on it

def find_dependent_layers(node_id: str) -> Set[str]:
"""For the given node_id, find all layers that depend on it in a depth-first manner.
Build up the node_layers dependency dictionary while traversing so each node is visited
only once.
"""
if node_id in node_layers:
return node_layers[node_id]

node_layers[node_id] = set()

# for each dependent node of the given node_id,
# mark its layer and all layers that depend on it as dependent layers of the given node_id.
for dependent_node_id in node_dependencies[node_id]:
limdauto marked this conversation as resolved.
Show resolved Hide resolved
dependent_node = nodes[dependent_node_id]
dependent_layer = dependent_node.get("layer")
if dependent_layer is not None:
node_layers[node_id].add(dependent_layer)
node_layers[node_id].update(find_dependent_layers(dependent_node_id))
limdauto marked this conversation as resolved.
Show resolved Hide resolved

return node_layers[node_id]

# populate node_layers dependencies
for node_id in nodes:
find_dependent_layers(node_id)

# compute the layer dependencies dictionary based on the node_layers dependencies,
# represented as {layer -> set(parent_layers)}
layer_dependencies = defaultdict(set)
for node_id, dependent_layers in node_layers.items():
node_layer = nodes[node_id].get("layer")

# add the node's layer as a parent layer for all dependent layers.
# Even if a dependent layer is the same as the node's layer, i.e. a layer is marked
# as its own parent, toposort still works so we don't need to check for that explicitly.
if node_layer is not None:
for layer in dependent_layers:
layer_dependencies[layer].add(node_layer)
limdauto marked this conversation as resolved.
Show resolved Hide resolved

# toposort the layer_dependencies to find the layer order.
# Note that for string, toposort_flatten will default to alphabetical order for tie-break.
return toposort_flatten(layer_dependencies)


# pylint: disable=too-many-locals
def format_pipeline_data(pipeline, catalog):
"""
Format pipeline and catalog data from Kedro for kedro-viz
Expand All @@ -194,49 +284,78 @@ def pretty_name(name):
parts = [n[0].upper() + n[1:] for n in name.split()]
return " ".join(parts)

nodes = []
# keep tracking of node_id -> node data in the graph
nodes = {}
# keep track of a sorted list of nodes to returned to the client
sorted_nodes = []
# keep track of node_id -> set(child_node_ids) for layers sorting
node_dependencies = defaultdict(set)
# keep track of edges in the graph: [{source_node_id -> target_node_id}]
edges = []
# keep_track of {data_set_namespace -> set(tags)}
namespace_tags = defaultdict(set)
# keep track of {data_set_namespace -> layer it belongs to}
namespace_to_layer = {}
all_tags = set()

data_set_to_layer_map = {
ds_name: getattr(ds_obj, "_layer", None)
for ds_name, ds_obj in catalog._data_sets.items() # pylint: disable=protected-access
}

for node in sorted(pipeline.nodes, key=lambda n: n.name):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't make a deep investigation of the algorithm however I understand that we use toposort. So, having that in mind I have two questions:

  • pipeline.nodes returns toposorted list of nodes. Why do we need to sort them by name?
  • can toposorted list of nodes simplify somehow layers ordering algorithm?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Not sure. It was there before. I thought of removing this sort by name but that breaks a bunch of tests so didn't change it to reduce the scope of this PR.
  • That's a good question. Unfortunately, we can't rely on toposorted list of nodes alone :( There is an example here if you are interested: [KED-1570] Experiment with layer ordering and assignment client-side #147 (comment) -- the example suggests adding transition nodes and toposort again, but adding transition nodes still require knowing layers' dependencies, so there is no avoiding toposorting layers.

task_id = _hash(str(node))
nodes.append(
{
"type": "task",
"id": task_id,
"name": getattr(node, "short_name", node.name),
"full_name": getattr(node, "_func_name", str(node)),
"tags": sorted(node.tags),
}
)
all_tags.update(node.tags)
nodes[task_id] = {
"type": "task",
"id": task_id,
"name": getattr(node, "short_name", node.name),
"full_name": getattr(node, "_func_name", str(node)),
"tags": sorted(node.tags),
}
sorted_nodes.append(nodes[task_id])

for data_set in node.inputs:
namespace = data_set.split("@")[0]
edges.append({"source": _hash(namespace), "target": task_id})
namespace_id = _hash(namespace)
namespace_to_layer[namespace] = data_set_to_layer_map.get(data_set)
edges.append({"source": namespace_id, "target": task_id})
namespace_tags[namespace].update(node.tags)
node_dependencies[namespace_id].add(task_id)

for data_set in node.outputs:
namespace = data_set.split("@")[0]
edges.append({"source": task_id, "target": _hash(namespace)})
namespace_id = _hash(namespace)
namespace_to_layer[namespace] = data_set_to_layer_map.get(data_set)
edges.append({"source": task_id, "target": namespace_id})
namespace_tags[namespace].update(node.tags)
node_dependencies[task_id].add(namespace_id)

for namespace, tags in sorted(namespace_tags.items()):
is_param = bool("param" in namespace.lower())
nodes.append(
{
"type": "parameters" if is_param else "data",
"id": _hash(namespace),
"name": pretty_name(namespace),
"full_name": namespace,
"tags": sorted(tags),
}
)

tags = []
for tag in sorted(all_tags):
tags.append({"id": tag, "name": pretty_name(tag)})

return {"nodes": nodes, "edges": edges, "tags": tags}
node_id = _hash(namespace)
nodes[node_id] = {
"type": "parameters" if is_param else "data",
"id": node_id,
"name": pretty_name(namespace),
"full_name": namespace,
"tags": sorted(tags),
"layer": namespace_to_layer[namespace],
}
sorted_nodes.append(nodes[node_id])

# sort tags
sorted_tags = [{"id": tag, "name": pretty_name(tag)} for tag in sorted(all_tags)]

# sort layers
sorted_layers = _sort_layers(nodes, node_dependencies)

return {
"nodes": sorted_nodes,
"edges": edges,
"tags": sorted_tags,
"layers": sorted_layers
}


@app.route("/api/nodes.json")
Expand Down
74 changes: 74 additions & 0 deletions package/tests/result.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
{
"layers": ["raw", "final"],
"nodes": [
{
"type": "task",
"id": "71527409",
"name": "Func1",
"full_name": "func1",
"tags": []
},
{
"type": "task",
"id": "3751475c",
"name": "Func2",
"full_name": "func2",
"tags": []
},
{
"type": "data",
"id": "7366ec9f",
"name": "Bob In",
"full_name": "bob_in",
"tags": [],
"layer": "raw"
},
{
"type": "data",
"id": "60e68b8e",
"name": "Bob Out",
"full_name": "bob_out",
"tags": [],
"layer": null
},
{
"type": "parameters",
"id": "755dc08f",
"name": "Params:value",
"full_name": "params:value",
"tags": [],
"layer": null
},
{
"type": "data",
"id": "37a5301a",
"name": "Result",
"full_name": "result",
"tags": [],
"layer": "final"
}
],
"edges": [
{
"source": "7366ec9f",
"target": "71527409"
},
{
"source": "755dc08f",
"target": "71527409"
},
{
"source": "71527409",
"target": "60e68b8e"
},
{
"source": "60e68b8e",
"target": "3751475c"
},
{
"source": "3751475c",
"target": "37a5301a"
}
],
"tags": []
}
Loading