From 633858a2183e41f9e0652a7ce74374d882177593 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Mon, 18 Mar 2019 14:41:41 -0600 Subject: [PATCH 1/4] When building the graph underlying the graph queue, preserve transitive dependencies while removing the skipped nodes --- core/dbt/linker.py | 55 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 38 insertions(+), 17 deletions(-) diff --git a/core/dbt/linker.py b/core/dbt/linker.py index ad1a3bc9e6f..dfd6fec8bd0 100644 --- a/core/dbt/linker.py +++ b/core/dbt/linker.py @@ -174,6 +174,43 @@ def join(self): self.inner.join() +def _remove_node_from_graph(graph, node): + # find all our direct parents in the graph, and all our direct + # children. note: do not use the _iter forms here, we need actual lists + # as we'll be mutating the graph again + parents = graph.predecessors(node) + children = graph.successors(node) + # remove the actual node + graph.remove_node(node) + # now redraw the edges, so that if A -> B -> C and B is to be + # removed, we will now have A -> C + for parent in parents: + for child in children: + graph.add_edge(parent, child) + + +def _subset_graph(graph, include_nodes): + """Create and return a new graph that is a shallow copy of graph but with + only the nodes in include_nodes. Transitive edges across removed nodes are + preserved as explicit new edges. + """ + new_graph = nx.DiGraph(graph) + + include_nodes = set(include_nodes) + + for node in graph.nodes(): + if node not in include_nodes: + _remove_node_from_graph(new_graph, node) + + for node in include_nodes: + if node not in new_graph: + raise RuntimeError( + "Couldn't find model '{}' -- does it exist or is " + "it disabled?".format(node) + ) + return new_graph + + class Linker(object): def __init__(self, data=None): if data is None: @@ -209,23 +246,7 @@ def as_graph_queue(self, manifest, limit_to=None): else: graph_nodes = limit_to - new_graph = nx.DiGraph(self.graph) - - to_remove = [] - graph_nodes_lookup = set(graph_nodes) - for node in new_graph.nodes(): - if node not in graph_nodes_lookup: - to_remove.append(node) - - for node in to_remove: - new_graph.remove_node(node) - - for node in graph_nodes: - if node not in new_graph: - raise RuntimeError( - "Couldn't find model '{}' -- does it exist or is " - "it disabled?".format(node) - ) + new_graph = _subset_graph(self.graph, graph_nodes) return GraphQueue(new_graph, manifest) def get_dependent_nodes(self, node): From 38921fad1765c3f66ae44b74f71f6352c9da3228 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Mon, 18 Mar 2019 15:48:22 -0600 Subject: [PATCH 2/4] tests --- .../models/users_rollup_dependency.sql | 5 +++ .../test_graph_selection.py | 33 +++++++++++++++---- .../test_schema_test_graph_selection.py | 2 +- .../test_tag_selection.py | 10 +++--- 4 files changed, 39 insertions(+), 11 deletions(-) create mode 100644 test/integration/007_graph_selection_tests/models/users_rollup_dependency.sql diff --git a/test/integration/007_graph_selection_tests/models/users_rollup_dependency.sql b/test/integration/007_graph_selection_tests/models/users_rollup_dependency.sql new file mode 100644 index 00000000000..f539772cbb2 --- /dev/null +++ b/test/integration/007_graph_selection_tests/models/users_rollup_dependency.sql @@ -0,0 +1,5 @@ +{{ + config(materialized='table') +}} + +select * from {{ ref('users_rollup') }} diff --git a/test/integration/007_graph_selection_tests/test_graph_selection.py b/test/integration/007_graph_selection_tests/test_graph_selection.py index 7f6bfa87d73..7fcb1d1b936 100644 --- a/test/integration/007_graph_selection_tests/test_graph_selection.py +++ b/test/integration/007_graph_selection_tests/test_graph_selection.py @@ -60,7 +60,7 @@ def test__postgres__tags_and_children(self): self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql") results = self.run_dbt(['run', '--models', 'tag:base+']) - self.assertEqual(len(results), 3) + self.assertEqual(len(results), 4) created_models = self.get_models_in_schema() self.assertFalse('base_users' in created_models) @@ -89,7 +89,7 @@ def test__postgres__specific_model_and_children(self): self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql") results = self.run_dbt(['run', '--models', 'users+']) - self.assertEqual(len(results), 3) + self.assertEqual(len(results), 4) self.assertTablesEqual("seed", "users") self.assertTablesEqual("summary_expected", "users_rollup") @@ -104,7 +104,7 @@ def test__snowflake__specific_model_and_children(self): self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql") results = self.run_dbt(['run', '--models', 'users+']) - self.assertEqual(len(results), 3) + self.assertEqual(len(results), 4) self.assertManyTablesEqual( ["SEED", "USERS"], @@ -194,7 +194,7 @@ def test__postgres__locally_qualified_name(self): def test__postgres__childrens_parents(self): self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql") results = self.run_dbt(['run', '--models', '@base_users']) - self.assertEqual(len(results), 3) + self.assertEqual(len(results), 4) created_models = self.get_models_in_schema() self.assertIn('users_rollup', created_models) @@ -207,8 +207,8 @@ def test__postgres__childrens_parents(self): def test__postgres__more_childrens_parents(self): self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql") results = self.run_dbt(['run', '--models', '@users']) - # base_users, emails, users_rollup, but not users (ephemeral) - self.assertEqual(len(results), 3) + # base_users, emails, users_rollup, users_rollup_dependency, but not users (ephemeral) + self.assertEqual(len(results), 4) created_models = self.get_models_in_schema() self.assertIn('users_rollup', created_models) @@ -216,3 +216,24 @@ def test__postgres__more_childrens_parents(self): self.assertIn('emails_alt', created_models) self.assertNotIn('subdir', created_models) self.assertNotIn('nested_users', created_models) + + @attr(type='snowflake') + def test__snowflake__skip_intermediate(self): + self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql") + results = self.run_dbt(['run', '--models', '@users']) + # base_users, emails, users_rollup, users_rollup_dependency + self.assertEqual(len(results), 4) + + # now re-run, skipping users_rollup + results = self.run_dbt(['run', '--models', '@users', '--exclude', 'users_rollup']) + self.assertEqual(len(results), 3) + + # make sure that users_rollup_dependency and users don't interleave + users = [r for r in results if r.node.name == 'users'][0] + dep = [r for r in results if r.node.name == 'users_rollup_dependency'][0] + user_last_end = users.timing[1]['completed_at'] + dep_first_start = dep.timing[0]['started_at'] + self.assertTrue( + user_last_end < dep_first_start, + 'dependency started before its transitive parent ({} > {})'.format(user_last_end, dep_first_start) + ) diff --git a/test/integration/007_graph_selection_tests/test_schema_test_graph_selection.py b/test/integration/007_graph_selection_tests/test_schema_test_graph_selection.py index 3068f855f2f..6aea428cb8f 100644 --- a/test/integration/007_graph_selection_tests/test_schema_test_graph_selection.py +++ b/test/integration/007_graph_selection_tests/test_schema_test_graph_selection.py @@ -26,7 +26,7 @@ def run_schema_and_assert(self, include, exclude, expected_tests): self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql") self.run_dbt(["deps"]) results = self.run_dbt(['run', '--exclude', 'never_selected']) - self.assertEqual(len(results), 8) + self.assertEqual(len(results), 9) args = FakeArgs() args.models = include diff --git a/test/integration/007_graph_selection_tests/test_tag_selection.py b/test/integration/007_graph_selection_tests/test_tag_selection.py index 1eccf7252ed..bd15dff6340 100644 --- a/test/integration/007_graph_selection_tests/test_tag_selection.py +++ b/test/integration/007_graph_selection_tests/test_tag_selection.py @@ -42,7 +42,7 @@ def test__postgres__select_tag_and_children(self): self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql") results = self.run_dbt(['run', '--models', '+tag:specified_in_project+']) - self.assertEqual(len(results), 2) + self.assertEqual(len(results), 3) models_run = [r.node['name'] for r in results] self.assertTrue('users' in models_run) @@ -69,8 +69,10 @@ def test__postgres__select_tag_in_model_with_project_Config(self): self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql") results = self.run_dbt(['run', '--models', '@tag:users']) - self.assertEqual(len(results), 3) + self.assertEqual(len(results), 4) models_run = set(r.node['name'] for r in results) - self.assertEqual({'users', 'users_rollup', 'emails_alt'}, models_run) - + self.assertEqual( + {'users', 'users_rollup', 'emails_alt', 'users_rollup_dependency'}, + models_run + ) From 9ae229a0d599eeee5eb2e49892416fc844d518e9 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Mon, 18 Mar 2019 15:51:17 -0600 Subject: [PATCH 3/4] ugh, forgot to remove this I guess --- test/integration/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/base.py b/test/integration/base.py index 137e04afbae..a4c52326772 100644 --- a/test/integration/base.py +++ b/test/integration/base.py @@ -439,7 +439,7 @@ def run_dbt(self, args=None, expect_pass=True, strict=True): args = ["run"] if strict: - args = ["--single-threaded", "--strict"] + args + args = ["--strict"] + args args.append('--log-cache-events') logger.info("Invoking dbt with {}".format(args)) From 69c8a09d435a89780ed742c953f0cf2f0ca841bf Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Mon, 18 Mar 2019 16:49:46 -0600 Subject: [PATCH 4/4] Use the transitive closure to calculate the graph Don't maintain the links manually while removing nodes Just take the transitive closure and remove all nodes --- core/dbt/linker.py | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/core/dbt/linker.py b/core/dbt/linker.py index dfd6fec8bd0..f27116ad83b 100644 --- a/core/dbt/linker.py +++ b/core/dbt/linker.py @@ -174,33 +174,18 @@ def join(self): self.inner.join() -def _remove_node_from_graph(graph, node): - # find all our direct parents in the graph, and all our direct - # children. note: do not use the _iter forms here, we need actual lists - # as we'll be mutating the graph again - parents = graph.predecessors(node) - children = graph.successors(node) - # remove the actual node - graph.remove_node(node) - # now redraw the edges, so that if A -> B -> C and B is to be - # removed, we will now have A -> C - for parent in parents: - for child in children: - graph.add_edge(parent, child) - - def _subset_graph(graph, include_nodes): """Create and return a new graph that is a shallow copy of graph but with only the nodes in include_nodes. Transitive edges across removed nodes are preserved as explicit new edges. """ - new_graph = nx.DiGraph(graph) + new_graph = nx.algorithms.transitive_closure(graph) include_nodes = set(include_nodes) for node in graph.nodes(): if node not in include_nodes: - _remove_node_from_graph(new_graph, node) + new_graph.remove_node(node) for node in include_nodes: if node not in new_graph: