diff --git a/core/dbt/linker.py b/core/dbt/linker.py index ad1a3bc9e6f..f27116ad83b 100644 --- a/core/dbt/linker.py +++ b/core/dbt/linker.py @@ -174,6 +174,28 @@ def join(self): self.inner.join() +def _subset_graph(graph, include_nodes): + """Create and return a new graph that is a shallow copy of graph but with + only the nodes in include_nodes. Transitive edges across removed nodes are + preserved as explicit new edges. + """ + new_graph = nx.algorithms.transitive_closure(graph) + + include_nodes = set(include_nodes) + + for node in graph.nodes(): + if node not in include_nodes: + new_graph.remove_node(node) + + for node in include_nodes: + if node not in new_graph: + raise RuntimeError( + "Couldn't find model '{}' -- does it exist or is " + "it disabled?".format(node) + ) + return new_graph + + class Linker(object): def __init__(self, data=None): if data is None: @@ -209,23 +231,7 @@ def as_graph_queue(self, manifest, limit_to=None): else: graph_nodes = limit_to - new_graph = nx.DiGraph(self.graph) - - to_remove = [] - graph_nodes_lookup = set(graph_nodes) - for node in new_graph.nodes(): - if node not in graph_nodes_lookup: - to_remove.append(node) - - for node in to_remove: - new_graph.remove_node(node) - - for node in graph_nodes: - if node not in new_graph: - raise RuntimeError( - "Couldn't find model '{}' -- does it exist or is " - "it disabled?".format(node) - ) + new_graph = _subset_graph(self.graph, graph_nodes) return GraphQueue(new_graph, manifest) def get_dependent_nodes(self, node): diff --git a/test/integration/007_graph_selection_tests/models/users_rollup_dependency.sql b/test/integration/007_graph_selection_tests/models/users_rollup_dependency.sql new file mode 100644 index 00000000000..f539772cbb2 --- /dev/null +++ b/test/integration/007_graph_selection_tests/models/users_rollup_dependency.sql @@ -0,0 +1,5 @@ +{{ + config(materialized='table') +}} + +select * from {{ ref('users_rollup') }} diff --git a/test/integration/007_graph_selection_tests/test_graph_selection.py b/test/integration/007_graph_selection_tests/test_graph_selection.py index 7f6bfa87d73..7fcb1d1b936 100644 --- a/test/integration/007_graph_selection_tests/test_graph_selection.py +++ b/test/integration/007_graph_selection_tests/test_graph_selection.py @@ -60,7 +60,7 @@ def test__postgres__tags_and_children(self): self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql") results = self.run_dbt(['run', '--models', 'tag:base+']) - self.assertEqual(len(results), 3) + self.assertEqual(len(results), 4) created_models = self.get_models_in_schema() self.assertFalse('base_users' in created_models) @@ -89,7 +89,7 @@ def test__postgres__specific_model_and_children(self): self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql") results = self.run_dbt(['run', '--models', 'users+']) - self.assertEqual(len(results), 3) + self.assertEqual(len(results), 4) self.assertTablesEqual("seed", "users") self.assertTablesEqual("summary_expected", "users_rollup") @@ -104,7 +104,7 @@ def test__snowflake__specific_model_and_children(self): self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql") results = self.run_dbt(['run', '--models', 'users+']) - self.assertEqual(len(results), 3) + self.assertEqual(len(results), 4) self.assertManyTablesEqual( ["SEED", "USERS"], @@ -194,7 +194,7 @@ def test__postgres__locally_qualified_name(self): def test__postgres__childrens_parents(self): self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql") results = self.run_dbt(['run', '--models', '@base_users']) - self.assertEqual(len(results), 3) + self.assertEqual(len(results), 4) created_models = self.get_models_in_schema() self.assertIn('users_rollup', created_models) @@ -207,8 +207,8 @@ def test__postgres__childrens_parents(self): def test__postgres__more_childrens_parents(self): self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql") results = self.run_dbt(['run', '--models', '@users']) - # base_users, emails, users_rollup, but not users (ephemeral) - self.assertEqual(len(results), 3) + # base_users, emails, users_rollup, users_rollup_dependency, but not users (ephemeral) + self.assertEqual(len(results), 4) created_models = self.get_models_in_schema() self.assertIn('users_rollup', created_models) @@ -216,3 +216,24 @@ def test__postgres__more_childrens_parents(self): self.assertIn('emails_alt', created_models) self.assertNotIn('subdir', created_models) self.assertNotIn('nested_users', created_models) + + @attr(type='snowflake') + def test__snowflake__skip_intermediate(self): + self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql") + results = self.run_dbt(['run', '--models', '@users']) + # base_users, emails, users_rollup, users_rollup_dependency + self.assertEqual(len(results), 4) + + # now re-run, skipping users_rollup + results = self.run_dbt(['run', '--models', '@users', '--exclude', 'users_rollup']) + self.assertEqual(len(results), 3) + + # make sure that users_rollup_dependency and users don't interleave + users = [r for r in results if r.node.name == 'users'][0] + dep = [r for r in results if r.node.name == 'users_rollup_dependency'][0] + user_last_end = users.timing[1]['completed_at'] + dep_first_start = dep.timing[0]['started_at'] + self.assertTrue( + user_last_end < dep_first_start, + 'dependency started before its transitive parent ({} > {})'.format(user_last_end, dep_first_start) + ) diff --git a/test/integration/007_graph_selection_tests/test_schema_test_graph_selection.py b/test/integration/007_graph_selection_tests/test_schema_test_graph_selection.py index 3068f855f2f..6aea428cb8f 100644 --- a/test/integration/007_graph_selection_tests/test_schema_test_graph_selection.py +++ b/test/integration/007_graph_selection_tests/test_schema_test_graph_selection.py @@ -26,7 +26,7 @@ def run_schema_and_assert(self, include, exclude, expected_tests): self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql") self.run_dbt(["deps"]) results = self.run_dbt(['run', '--exclude', 'never_selected']) - self.assertEqual(len(results), 8) + self.assertEqual(len(results), 9) args = FakeArgs() args.models = include diff --git a/test/integration/007_graph_selection_tests/test_tag_selection.py b/test/integration/007_graph_selection_tests/test_tag_selection.py index 1eccf7252ed..bd15dff6340 100644 --- a/test/integration/007_graph_selection_tests/test_tag_selection.py +++ b/test/integration/007_graph_selection_tests/test_tag_selection.py @@ -42,7 +42,7 @@ def test__postgres__select_tag_and_children(self): self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql") results = self.run_dbt(['run', '--models', '+tag:specified_in_project+']) - self.assertEqual(len(results), 2) + self.assertEqual(len(results), 3) models_run = [r.node['name'] for r in results] self.assertTrue('users' in models_run) @@ -69,8 +69,10 @@ def test__postgres__select_tag_in_model_with_project_Config(self): self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql") results = self.run_dbt(['run', '--models', '@tag:users']) - self.assertEqual(len(results), 3) + self.assertEqual(len(results), 4) models_run = set(r.node['name'] for r in results) - self.assertEqual({'users', 'users_rollup', 'emails_alt'}, models_run) - + self.assertEqual( + {'users', 'users_rollup', 'emails_alt', 'users_rollup_dependency'}, + models_run + ) diff --git a/test/integration/base.py b/test/integration/base.py index 137e04afbae..a4c52326772 100644 --- a/test/integration/base.py +++ b/test/integration/base.py @@ -439,7 +439,7 @@ def run_dbt(self, args=None, expect_pass=True, strict=True): args = ["run"] if strict: - args = ["--single-threaded", "--strict"] + args + args = ["--strict"] + args args.append('--log-cache-events') logger.info("Invoking dbt with {}".format(args))