-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix for "table dropped by concurrent query" on Redshift #825
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
{% materialization table, default %} | ||
{%- set identifier = model['alias'] -%} | ||
{%- set tmp_identifier = identifier + '__dbt_tmp' -%} | ||
{%- set backup_identifier = identifier + '__dbt_backup' -%} | ||
{%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%} | ||
|
||
{%- set existing_relations = adapter.list_relations(schema=schema) -%} | ||
|
@@ -10,15 +11,18 @@ | |
schema=schema, type='table') -%} | ||
{%- set intermediate_relation = api.Relation.create(identifier=tmp_identifier, | ||
schema=schema, type='table') -%} | ||
{%- set backup_relation = api.Relation.create(identifier=backup_identifier, | ||
schema=schema, type='table') -%} | ||
{%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%} | ||
{%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%} | ||
{%- set create_as_temporary = (exists_as_table and non_destructive_mode) -%} | ||
|
||
|
||
-- drop the temp relation if it exists for some reason | ||
-- drop the temp relations if they exists for some reason | ||
{{ adapter.drop_relation(intermediate_relation) }} | ||
{{ adapter.drop_relation(backup_relation) }} | ||
|
||
-- setup: if the target relation already exists, truncate or drop it | ||
-- setup: if the target relation already exists, truncate or drop it (if it's a view) | ||
{% if non_destructive_mode -%} | ||
{% if exists_as_table -%} | ||
{{ adapter.truncate_relation(old_relation) }} | ||
|
@@ -60,12 +64,17 @@ | |
{% if non_destructive_mode -%} | ||
-- noop | ||
{%- else -%} | ||
{{ drop_relation_if_exists(old_relation) }} | ||
{% if exists_as_table %} | ||
-- move the existing table out of the way | ||
{{ adapter.rename_relation(target_relation, backup_relation) }} | ||
{% endif %} | ||
|
||
{{ adapter.rename_relation(intermediate_relation, target_relation) }} | ||
{%- endif %} | ||
|
||
-- `COMMIT` happens here | ||
{{ adapter.commit() }} | ||
{{ drop_relation_if_exists(backup_relation) }} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would this logic fail if the backup relation wasn't deleted on the previous run? i.e.: run 1:
i'd imagine you could fix this by also dropping the backup relation at the start of the run, if it exists. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. really good point. We do something similar for the __dbt_tmp relation -- i will update! |
||
|
||
{{ run_hooks(post_hooks, inside_transaction=False) }} | ||
{% endmaterialization %} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
|
||
This test warrants some explanation. In dbt <=0.10.1, Redshift table and view materializations suffered from issues around concurrent transactions. In order to reliably reproduce this error, a query needs to select from a dbt model as the table is being rebuilt. Critically, this concurrent select needs to query the table during the drop/swap portition of the materialization. This looks like: | ||
|
||
```sql | ||
begin; | ||
create table as (...); | ||
drop table old_table cascade; | ||
// <---- The concurrent query needs to be running here! | ||
alter table new_table rename to old_table; | ||
commit; | ||
``` | ||
|
||
In order to reliably reproduce this failure, the model shown above needs to block for a long time between the `drop` and `alter` statements. We can't just stick a sleep() call in there, as this code is defined in the materialization. Instead, we can reliably reproduce the failure by: | ||
|
||
1) creating a view that depends on this model | ||
2) issuing a long-running query on the view before `dbt run` is invoked | ||
3) issuing _another_ long-running query against the original model | ||
|
||
Since long-running query (step 2) is selecting from the view, Redshift blocks on the `drop ... cascade`, of the materialization, which causes the query from step 3 time to overlap with the critical section of the materialization between the `drop` and `alter` statements. | ||
|
||
In dbt v0.10.1, this integration test results in: | ||
|
||
``` | ||
====================================================================== | ||
FAIL: test__redshift__concurrent_transaction (test_concurrent_transaction.TestConcurrentTransaction) | ||
---------------------------------------------------------------------- | ||
Traceback (most recent call last): | ||
File "/usr/src/app/test/integration/032_concurrent_transaction_test/test_concurrent_transaction.py", line 84, in test__redshift__concurrent_transaction | ||
self.assertEqual(self.query_state['model_1'], 'good') | ||
AssertionError: 'error: table 3379442 dropped by concurrent transaction\n' != 'good' | ||
- error: table 3379442 dropped by concurrent transaction | ||
+ good | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
|
||
{% macro create_udfs() %} | ||
|
||
CREATE OR REPLACE FUNCTION {{ target.schema }}.f_sleep (x float) | ||
RETURNS bool IMMUTABLE | ||
AS | ||
$$ | ||
from time import sleep | ||
sleep(x) | ||
return True | ||
$$ LANGUAGE plpythonu; | ||
|
||
{% endmacro %} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
|
||
{{ config(materialized='incremental', sql_where=True, unique_key='id') }} | ||
|
||
-- incremental model | ||
select 1 as id |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
|
||
|
||
select * from {{ ref('model_1') }} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
|
||
{{ config(materialized='table') }} | ||
|
||
-- table model | ||
select 1 as id |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
|
||
|
||
select * from {{ ref('model_1') }} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
|
||
{{ config(materialized='view') }} | ||
|
||
-- view model | ||
select 1 as id |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
|
||
|
||
select * from {{ ref('model_1') }} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
from nose.plugins.attrib import attr | ||
from test.integration.base import DBTIntegrationTest | ||
import threading | ||
|
||
class BaseTestConcurrentTransaction(DBTIntegrationTest): | ||
|
||
def reset(self): | ||
self.query_state = { | ||
'view_model': 'wait', | ||
'model_1': 'wait', | ||
} | ||
|
||
@property | ||
def schema(self): | ||
return "concurrent_transaction_032" | ||
|
||
@property | ||
def project_config(self): | ||
return { | ||
"macro-paths": ["test/integration/032_concurrent_transaction_test/macros"], | ||
"on-run-start": [ | ||
"{{ create_udfs() }}", | ||
], | ||
} | ||
|
||
def run_select_and_check(self, rel, sql): | ||
try: | ||
res = self.run_sql(sql, fetch='one') | ||
|
||
# The result is the output of f_sleep(), which is True | ||
if res[0] == True: | ||
self.query_state[rel] = 'good' | ||
else: | ||
self.query_state[rel] = 'bad' | ||
|
||
except Exception as e: | ||
if 'concurrent transaction' in str(e): | ||
self.query_state[rel] = 'error: {}'.format(e) | ||
else: | ||
self.query_state[rel] = 'error: {}'.format(e) | ||
|
||
def async_select(self, rel, sleep=10): | ||
# Run the select statement in a thread. When the query returns, the global | ||
# query_state will be update with a state of good/bad/error, and the associated | ||
# error will be reported if one was raised. | ||
|
||
schema = self.unique_schema() | ||
query = ''' | ||
-- async_select: {rel} | ||
select {schema}.f_sleep({sleep}) from {schema}.{rel} | ||
'''.format( | ||
schema=schema, | ||
sleep=sleep, | ||
rel=rel) | ||
|
||
thread = threading.Thread(target=lambda: self.run_select_and_check(rel, query)) | ||
thread.start() | ||
return thread | ||
|
||
def run_test(self): | ||
self.use_profile("redshift") | ||
|
||
# First run the project to make sure the models exist | ||
self.run_dbt(args=['run']) | ||
|
||
# Execute long-running queries in threads | ||
t1 = self.async_select('view_model', 10) | ||
t2 = self.async_select('model_1', 5) | ||
|
||
# While the queries are executing, re-run the project | ||
res = self.run_dbt(args=['run', '--threads', '8']) | ||
|
||
# Finally, wait for these threads to finish | ||
t1.join() | ||
t2.join() | ||
|
||
self.assertTrue(len(res) > 0) | ||
|
||
# If the query succeeded, the global query_state should be 'good' | ||
self.assertEqual(self.query_state['view_model'], 'good') | ||
self.assertEqual(self.query_state['model_1'], 'good') | ||
|
||
class TableTestConcurrentTransaction(BaseTestConcurrentTransaction): | ||
@property | ||
def models(self): | ||
return "test/integration/032_concurrent_transaction_test/models-table" | ||
|
||
@attr(type="redshift") | ||
def test__redshift__concurrent_transaction_table(self): | ||
self.reset() | ||
self.run_test() | ||
|
||
class ViewTestConcurrentTransaction(BaseTestConcurrentTransaction): | ||
@property | ||
def models(self): | ||
return "test/integration/032_concurrent_transaction_test/models-view" | ||
|
||
@attr(type="redshift") | ||
def test__redshift__concurrent_transaction_view(self): | ||
self.reset() | ||
self.run_test() | ||
|
||
class IncrementalTestConcurrentTransaction(BaseTestConcurrentTransaction): | ||
@property | ||
def models(self): | ||
return "test/integration/032_concurrent_transaction_test/models-incremental" | ||
|
||
@attr(type="redshift") | ||
def test__redshift__concurrent_transaction_incremental(self): | ||
self.reset() | ||
self.run_test() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to make this change because dbt was previously adding a
begin
statement before thedrop
. Since thedrop
now happens after the transaction, there was no correspondingcommit
, and the transaction containing thedrop
was rolled back.I checked the other places where dbt uses this method, and I believe this change should be safe, but I'm very open to improving how dbt handles transactions here