Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KED-1570] Calculate layers order and return it to the frontend #148

Merged
merged 23 commits into from
Apr 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 142 additions & 29 deletions package/kedro_viz/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from collections import defaultdict
from contextlib import closing
from pathlib import Path
from typing import Dict
from typing import Dict, List, Set

import click
import kedro
Expand All @@ -48,6 +48,7 @@
from kedro.cli import get_project_context # pylint: disable=ungrouped-imports
from kedro.cli.utils import KedroCliError # pylint: disable=ungrouped-imports
from semver import match
from toposort import toposort_flatten

from kedro_viz.utils import wait_for

Expand Down Expand Up @@ -181,6 +182,99 @@ def _get_pipeline_catalog_from_kedro14(env):
raise KedroCliError(ERROR_PROJECT_ROOT)


def _sort_layers(nodes: Dict[str, Dict], dependencies: Dict[str, Set[str]]) -> List[str]:
"""Given a DAG represented by a dictionary of nodes, some of which have a `layer` attribute,
along with their dependencies, return the list of all layers sorted according to
the nodes' topological order, i.e. a layer should appear before another layer in the list
if its node is a dependency of the other layer's node, directly or indirectly.

For example, given the following graph:
node1(layer=a) -> node2 -> node4 -> node6(layer=d)
| ^
v |
node3(layer=b) -> node5(layer=c)
The layers ordering should be: [a, b, c, d]

In theory, this is a problem of finding the
[transitive closure](https://en.wikipedia.org/wiki/Transitive_closure) in a graph of layers
and then toposort them. The algorithm below follows a repeated depth-first search approach:
* For every node, find all layers that depends on it in a depth-first search.
* While traversing, build up a dictionary of {node_id -> layers} for the node
that has already been visited.
* Turn the final {node_id -> layers} into a {layer -> layers} to represent the layers'
dependencies. Note: the key is a layer and the values are the parents of that layer,
just because that's the format toposort requires.
* Feed this layers dictionary to ``toposort`` and return the sorted values.
* Raise CircularDependencyError if the layers cannot be sorted topologically,
i.e. there are cycles among the layers.

Args:
nodes: A dictionary of {node_id -> node} represents the nodes in the graph.
A node's schema is:
{
"type": str,
"id": str,
"name": str,
"layer": Optional[str]
...
}
dependencies: A dictionary of {node_id -> set(child_ids)}
represents the direct dependencies between nodes in the graph.

Returns:
The list of layers sorted based on topological order.

Raises:
CircularDependencyError: When the layers have cyclic dependencies.
"""
node_layers = {} # map node_id to the layers that depend on it

def find_child_layers(node_id: str) -> Set[str]:
"""For the given node_id, find all layers that depend on it in a depth-first manner.
Build up the node_layers dependency dictionary while traversing so each node is visited
only once.
Note: Python's default recursive depth limit is 1000, which means this algorithm won't
work for pipeline with more than 1000 nodes. However, we can rewrite this using stack if
we run into this limit in practice.
"""
if node_id in node_layers:
return node_layers[node_id]

node_layers[node_id] = set()

# for each child node of the given node_id,
# mark its layer and all layers that depend on it as child layers of the given node_id.
for child_node_id in dependencies[node_id]:
child_node = nodes[child_node_id]
child_layer = child_node.get("layer")
if child_layer is not None:
node_layers[node_id].add(child_layer)
node_layers[node_id].update(find_child_layers(child_node_id))

return node_layers[node_id]

# populate node_layers dependencies
for node_id in nodes:
find_child_layers(node_id)

# compute the layer dependencies dictionary based on the node_layers dependencies,
# represented as {layer -> set(parent_layers)}
layer_dependencies = defaultdict(set)
for node_id, child_layers in node_layers.items():
node_layer = nodes[node_id].get("layer")

# add the node's layer as a parent layer for all child layers.
# Even if a child layer is the same as the node's layer, i.e. a layer is marked
# as its own parent, toposort still works so we don't need to check for that explicitly.
if node_layer is not None:
for layer in child_layers:
layer_dependencies[layer].add(node_layer)
limdauto marked this conversation as resolved.
Show resolved Hide resolved

# toposort the layer_dependencies to find the layer order.
# Note that for string, toposort_flatten will default to alphabetical order for tie-break.
return toposort_flatten(layer_dependencies)


# pylint: disable=too-many-locals
def format_pipeline_data(pipeline, catalog):
"""
Expand All @@ -196,9 +290,18 @@ def pretty_name(name):
parts = [n[0].upper() + n[1:] for n in name.split()]
return " ".join(parts)

nodes = []
# keep tracking of node_id -> node data in the graph
nodes = {}
# keep track of a sorted list of nodes to returned to the client
sorted_nodes = []
# keep track of node_id -> set(child_node_ids) for layers sorting
node_dependencies = defaultdict(set)
# keep track of edges in the graph: [{source_node_id -> target_node_id}]
edges = []
# keep_track of {data_set_namespace -> set(tags)}
namespace_tags = defaultdict(set)
# keep track of {data_set_namespace -> layer it belongs to}
namespace_to_layer = {}
all_tags = set()
namespace_to_layer = {}

Expand All @@ -210,46 +313,56 @@ def pretty_name(name):
for node in sorted(pipeline.nodes, key=lambda n: n.name):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't make a deep investigation of the algorithm however I understand that we use toposort. So, having that in mind I have two questions:

  • pipeline.nodes returns toposorted list of nodes. Why do we need to sort them by name?
  • can toposorted list of nodes simplify somehow layers ordering algorithm?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Not sure. It was there before. I thought of removing this sort by name but that breaks a bunch of tests so didn't change it to reduce the scope of this PR.
  • That's a good question. Unfortunately, we can't rely on toposorted list of nodes alone :( There is an example here if you are interested: [KED-1570] Experiment with layer ordering and assignment client-side #147 (comment) -- the example suggests adding transition nodes and toposort again, but adding transition nodes still require knowing layers' dependencies, so there is no avoiding toposorting layers.

task_id = _hash(str(node))
all_tags.update(node.tags)
nodes.append(
{
"type": "task",
"id": task_id,
"name": getattr(node, "short_name", node.name),
"full_name": getattr(node, "_func_name", str(node)),
"tags": sorted(node.tags),
}
)
nodes[task_id] = {
"type": "task",
"id": task_id,
"name": getattr(node, "short_name", node.name),
"full_name": getattr(node, "_func_name", str(node)),
"tags": sorted(node.tags),
}
sorted_nodes.append(nodes[task_id])

for data_set in node.inputs:
namespace = data_set.split("@")[0]
namespace_to_layer[namespace] = dataset_to_layer.get(data_set)
edges.append({"source": _hash(namespace), "target": task_id})
namespace_id = _hash(namespace)
edges.append({"source": namespace_id, "target": task_id})
namespace_tags[namespace].update(node.tags)
node_dependencies[namespace_id].add(task_id)

for data_set in node.outputs:
namespace = data_set.split("@")[0]
namespace_to_layer[namespace] = dataset_to_layer.get(data_set)
edges.append({"source": task_id, "target": _hash(namespace)})
namespace_id = _hash(namespace)
edges.append({"source": task_id, "target": namespace_id})
namespace_tags[namespace].update(node.tags)
node_dependencies[task_id].add(namespace_id)

for namespace, tags in sorted(namespace_tags.items()):
is_param = bool("param" in namespace.lower())
nodes.append(
{
"type": "parameters" if is_param else "data",
"id": _hash(namespace),
"name": pretty_name(namespace),
"full_name": namespace,
"tags": sorted(tags),
"layer": namespace_to_layer[namespace],
}
)

tags = []
for tag in sorted(all_tags):
tags.append({"id": tag, "name": pretty_name(tag)})

return {"nodes": nodes, "edges": edges, "tags": tags}
node_id = _hash(namespace)
nodes[node_id] = {
"type": "parameters" if is_param else "data",
"id": node_id,
"name": pretty_name(namespace),
"full_name": namespace,
"tags": sorted(tags),
"layer": namespace_to_layer[namespace],
}
sorted_nodes.append(nodes[node_id])

# sort tags
sorted_tags = [{"id": tag, "name": pretty_name(tag)} for tag in sorted(all_tags)]

# sort layers
sorted_layers = _sort_layers(nodes, node_dependencies)

return {
"nodes": sorted_nodes,
"edges": edges,
"tags": sorted_tags,
"layers": sorted_layers
}


@app.route("/api/nodes.json")
Expand Down
1 change: 1 addition & 0 deletions package/tests/result.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"layers": ["raw", "final"],
"nodes": [
{
"type": "task",
Expand Down
107 changes: 105 additions & 2 deletions package/tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@
from kedro.extras.datasets.pickle import PickleDataSet
from kedro.io import DataCatalog, MemoryDataSet
from kedro.pipeline import Pipeline, node
from toposort import CircularDependencyError

from kedro_viz import server
from kedro_viz.server import _allocate_port, format_pipeline_data
from kedro_viz.server import _allocate_port, _sort_layers, format_pipeline_data
from kedro_viz.utils import WaitForException

EXPECTED_PIPELINE_DATA = {
Expand Down Expand Up @@ -109,6 +110,7 @@
},
],
"tags": [{"name": "Bob", "id": "bob"}],
"layers": [],
}


Expand Down Expand Up @@ -289,7 +291,7 @@ def test_pipeline_flag(cli_runner, client):
response = client.get("/api/nodes.json")
assert response.status_code == 200
data = json.loads(response.data.decode())
assert data == {"edges": [], "nodes": [], "tags": []}
assert data == {"edges": [], "layers": [], "nodes": [], "tags": []}


@pytest.mark.usefixtures("patched_get_project_context")
Expand Down Expand Up @@ -575,3 +577,104 @@ def test_format_pipeline_data(pipeline, catalog):
result_file_path = Path(__file__).parent / "result.json"
json_data = json.loads(result_file_path.read_text())
assert json_data == result


@pytest.mark.parametrize("graph_schema,nodes,node_dependencies,expected", [
(
# direct dependency
"node_1(layer=raw) -> node_2(layer=int)",
{"node_1": {"id": "node_1", "layer": "raw"}, "node_2": {"id": "node_2", "layer": "int"}},
{"node_1": {"node_2"}, "node_2": set()},
["raw", "int"]
),
(
# more than 1 node in a layer
"node_1 -> node_2(layer=raw) -> node_3(layer=raw) -> node_4(layer=int)",
{"node_1": {"id": "node_1"}, "node_2": {"id": "node_2", "layer": "raw"},
"node_3": {"id": "node_3", "layer": "raw"}, "node_4": {"id": "node_4", "layer": "int"}},
{"node_1": {"node_2"}, "node_2": {"node_3"}, "node_3": {"node_4"}, "node_4": set()},
["raw", "int"]
),
(
# indirect dependency
"node_1(layer=raw) -> node_2 -> node_3(layer=int)",
{"node_1": {"id": "node_1", "layer": "raw"}, "node_2": {"id": "node_2"},
"node_3": {"id": "node_3", "layer": "int"}},
{"node_1": {"node_2"}, "node_2": {"node_3"}, "node_3": set()},
["raw", "int"]
),
(
# fan-in dependency
"""
node_1(layer=raw) -> node_2 -> node_3(layer=int) -> node_6(layer=feature)
node_4(layer=int) -> node_5 -----------------------------^
""",
{"node_1": {"id": "node_1", "layer": "raw"}, "node_2": {"id": "node_2"},
"node_3": {"id": "node_3", "layer": "int"}, "node_4": {"id": "node_4", "layer": "int"},
"node_5": {"id": "node_5"}, "node_6": {"id": "node_6", "layer": "feature"}},
{"node_1": {"node_2"}, "node_2": {"node_3"}, "node_3": {"node_6"}, "node_4": {"node_5"},
"node_5": {"node_6"}, "node_6": set()},
["raw", "int", "feature"]
),
(
# fan-out dependency: note that model_input comes after feature here based on
# alphabetical order since they have no dependency relationship.
"""
node_1(layer=raw) -> node_2 -> node_3(layer=int) -> node_6 -> node_7(layer=feature)
|----------> node_4(layer=int) -> node_5(layer=model_input)
""",
{"node_1": {"id": "node_1", "layer": "raw"}, "node_2": {"id": "node_2"},
"node_3": {"id": "node_3", "layer": "int"}, "node_4": {"id": "node_4", "layer": "int"},
"node_5": {"id": "node_5", "layer": "model_input"},
"node_6": {"id": "node_6"}, "node_7": {"id": "node_7", "layer": "feature"}},
{"node_1": {"node_2"}, "node_2": {"node_3"}, "node_3": {"node_6"},
"node_4": {"node_5"}, "node_5": set(), "node_6": {"node_7"}, "node_7": set()},
["raw", "int", "feature", "model_input"]
),
(
# fan-out-fan-in dependency
"""
node_1(layer=raw) -> node_2 -> node_3(layer=int) -> node_6 -> node_7(layer=feature)
|----------> node_4(layer=int) -> node_5(layer=model_input) --^
""",
{"node_1": {"id": "node_1", "layer": "raw"}, "node_2": {"id": "node_2"},
"node_3": {"id": "node_3", "layer": "int"}, "node_4": {"id": "node_4", "layer": "int"},
"node_5": {"id": "node_5", "layer": "model_input"},
"node_6": {"id": "node_6"},
"node_7": {"id": "node_7", "layer": "feature"}},
{"node_1": {"node_2"}, "node_2": {"node_3"}, "node_3": {"node_6"},
"node_4": {"node_5"}, "node_5": {"node_7"}, "node_6": {"node_7"}, "node_7": set()},
["raw", "int", "model_input", "feature"]
),
(
# disjoint dependency: when two groups of layers have no direct dependencies,
# their order is determined by topological order first and alphabetical order second,
# which is the default of the toposort library. In the example below, toposort the layers
# will give [{c, d}, {b, a}], so it will be come [c, d, a, b] when flattened.
"""
node_1(layer=c) -> node_2(layer=a)
node_3(layer=d) -> node_4(layer=b)
""",
{"node_1": {"id": "node_1", "layer": "c"}, "node_2": {"id": "node_2", "layer": "a"},
"node_3": {"id": "node_3", "layer": "d"}, "node_4": {"id": "node_4", "layer": "b"}},
{"node_1": {"node_2"}, "node_2": {}, "node_3": {"node_4"}, "node_4": {}},
["c", "d", "a", "b"]
)
])
def test_sort_layers(graph_schema, nodes, node_dependencies, expected):
assert _sort_layers(nodes, node_dependencies) == expected, graph_schema


def test_sort_layers_should_raise_on_cyclic_layers():
# node_1(layer=raw) -> node_2(layer=int) -> node_3(layer=raw)
nodes = {
"node_1": {"id": "node_1", "layer": "raw"},
"node_2": {"id": "node_2", "layer": "int"},
"node_3": {"id": "node_3", "layer": "raw"},
}
node_dependencies = {"node_1": {"node_2"}, "node_2": {"node_3"}, "node_3": set()}
with pytest.raises(
CircularDependencyError,
match="Circular dependencies exist among these items: {'int':{'raw'}, 'raw':{'int'}}"
):
_sort_layers(nodes, node_dependencies)
4 changes: 4 additions & 0 deletions src/selectors/layers.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ describe('Selectors', () => {

expect(
nodes.every(node => {
// we don't need to check y/height positions if the layer isn't there.
if (node.layer === null) {
richardwestenra marked this conversation as resolved.
Show resolved Hide resolved
return true;
}
const i = layerIDs.indexOf(node.layer);
const prevLayer = layersObj[layerIDs[i - 1]];
const thisLayer = layersObj[node.layer];
Expand Down
7 changes: 4 additions & 3 deletions src/store/normalize-data.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,10 @@ const addTag = state => tag => {
* @param {Object} layer - Layer object
*/
const addLayer = state => layer => {
const { id, name } = layer;
state.layer.ids.push(id);
state.layer.name[id] = name;
// using layer name as both layerId and name.
// It futureproofs it if we need a separate layer ID in the future.
state.layer.ids.push(layer);
state.layer.name[layer] = layer;
};

/**
Expand Down
Loading