Skip to content

Commit

Permalink
Merge pull request #1355 from fishtown-analytics/fix/out-of-order-exe…
Browse files Browse the repository at this point in the history
…cution-on-model-select

Fix out of order execution on model select (#1354)
  • Loading branch information
beckjake authored Mar 19, 2019
2 parents 70262b3 + 69c8a09 commit 02c9bca
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 29 deletions.
40 changes: 23 additions & 17 deletions core/dbt/linker.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,28 @@ def join(self):
self.inner.join()


def _subset_graph(graph, include_nodes):
"""Create and return a new graph that is a shallow copy of graph but with
only the nodes in include_nodes. Transitive edges across removed nodes are
preserved as explicit new edges.
"""
new_graph = nx.algorithms.transitive_closure(graph)

include_nodes = set(include_nodes)

for node in graph.nodes():
if node not in include_nodes:
new_graph.remove_node(node)

for node in include_nodes:
if node not in new_graph:
raise RuntimeError(
"Couldn't find model '{}' -- does it exist or is "
"it disabled?".format(node)
)
return new_graph


class Linker(object):
def __init__(self, data=None):
if data is None:
Expand Down Expand Up @@ -209,23 +231,7 @@ def as_graph_queue(self, manifest, limit_to=None):
else:
graph_nodes = limit_to

new_graph = nx.DiGraph(self.graph)

to_remove = []
graph_nodes_lookup = set(graph_nodes)
for node in new_graph.nodes():
if node not in graph_nodes_lookup:
to_remove.append(node)

for node in to_remove:
new_graph.remove_node(node)

for node in graph_nodes:
if node not in new_graph:
raise RuntimeError(
"Couldn't find model '{}' -- does it exist or is "
"it disabled?".format(node)
)
new_graph = _subset_graph(self.graph, graph_nodes)
return GraphQueue(new_graph, manifest)

def get_dependent_nodes(self, node):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{{
config(materialized='table')
}}

select * from {{ ref('users_rollup') }}
33 changes: 27 additions & 6 deletions test/integration/007_graph_selection_tests/test_graph_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def test__postgres__tags_and_children(self):
self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql")

results = self.run_dbt(['run', '--models', 'tag:base+'])
self.assertEqual(len(results), 3)
self.assertEqual(len(results), 4)

created_models = self.get_models_in_schema()
self.assertFalse('base_users' in created_models)
Expand Down Expand Up @@ -89,7 +89,7 @@ def test__postgres__specific_model_and_children(self):
self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql")

results = self.run_dbt(['run', '--models', 'users+'])
self.assertEqual(len(results), 3)
self.assertEqual(len(results), 4)

self.assertTablesEqual("seed", "users")
self.assertTablesEqual("summary_expected", "users_rollup")
Expand All @@ -104,7 +104,7 @@ def test__snowflake__specific_model_and_children(self):
self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql")

results = self.run_dbt(['run', '--models', 'users+'])
self.assertEqual(len(results), 3)
self.assertEqual(len(results), 4)

self.assertManyTablesEqual(
["SEED", "USERS"],
Expand Down Expand Up @@ -194,7 +194,7 @@ def test__postgres__locally_qualified_name(self):
def test__postgres__childrens_parents(self):
self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql")
results = self.run_dbt(['run', '--models', '@base_users'])
self.assertEqual(len(results), 3)
self.assertEqual(len(results), 4)

created_models = self.get_models_in_schema()
self.assertIn('users_rollup', created_models)
Expand All @@ -207,12 +207,33 @@ def test__postgres__childrens_parents(self):
def test__postgres__more_childrens_parents(self):
self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql")
results = self.run_dbt(['run', '--models', '@users'])
# base_users, emails, users_rollup, but not users (ephemeral)
self.assertEqual(len(results), 3)
# base_users, emails, users_rollup, users_rollup_dependency, but not users (ephemeral)
self.assertEqual(len(results), 4)

created_models = self.get_models_in_schema()
self.assertIn('users_rollup', created_models)
self.assertIn('users', created_models)
self.assertIn('emails_alt', created_models)
self.assertNotIn('subdir', created_models)
self.assertNotIn('nested_users', created_models)

@attr(type='snowflake')
def test__snowflake__skip_intermediate(self):
self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql")
results = self.run_dbt(['run', '--models', '@users'])
# base_users, emails, users_rollup, users_rollup_dependency
self.assertEqual(len(results), 4)

# now re-run, skipping users_rollup
results = self.run_dbt(['run', '--models', '@users', '--exclude', 'users_rollup'])
self.assertEqual(len(results), 3)

# make sure that users_rollup_dependency and users don't interleave
users = [r for r in results if r.node.name == 'users'][0]
dep = [r for r in results if r.node.name == 'users_rollup_dependency'][0]
user_last_end = users.timing[1]['completed_at']
dep_first_start = dep.timing[0]['started_at']
self.assertTrue(
user_last_end < dep_first_start,
'dependency started before its transitive parent ({} > {})'.format(user_last_end, dep_first_start)
)
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def run_schema_and_assert(self, include, exclude, expected_tests):
self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql")
self.run_dbt(["deps"])
results = self.run_dbt(['run', '--exclude', 'never_selected'])
self.assertEqual(len(results), 8)
self.assertEqual(len(results), 9)

args = FakeArgs()
args.models = include
Expand Down
10 changes: 6 additions & 4 deletions test/integration/007_graph_selection_tests/test_tag_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def test__postgres__select_tag_and_children(self):
self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql")

results = self.run_dbt(['run', '--models', '+tag:specified_in_project+'])
self.assertEqual(len(results), 2)
self.assertEqual(len(results), 3)

models_run = [r.node['name'] for r in results]
self.assertTrue('users' in models_run)
Expand All @@ -69,8 +69,10 @@ def test__postgres__select_tag_in_model_with_project_Config(self):
self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql")

results = self.run_dbt(['run', '--models', '@tag:users'])
self.assertEqual(len(results), 3)
self.assertEqual(len(results), 4)

models_run = set(r.node['name'] for r in results)
self.assertEqual({'users', 'users_rollup', 'emails_alt'}, models_run)

self.assertEqual(
{'users', 'users_rollup', 'emails_alt', 'users_rollup_dependency'},
models_run
)
2 changes: 1 addition & 1 deletion test/integration/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ def run_dbt(self, args=None, expect_pass=True, strict=True):
args = ["run"]

if strict:
args = ["--single-threaded", "--strict"] + args
args = ["--strict"] + args
args.append('--log-cache-events')
logger.info("Invoking dbt with {}".format(args))

Expand Down

0 comments on commit 02c9bca

Please sign in to comment.