Skip to content

Commit

Permalink
make full refresh a config item
Browse files Browse the repository at this point in the history
Added integration tests, fixed existing tests
  • Loading branch information
Jacob Beck committed May 12, 2020
1 parent 9d0eab6 commit 13a66f5
Show file tree
Hide file tree
Showing 14 changed files with 122 additions and 39 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## dbt 0.18.0 (Release TBD)

### Features
- Added a `full_refresh` config item that overrides the behavior of the `--full-refresh` flag ([#1009](https://github.com/fishtown-analytics/dbt/issues/1009), [#2348](https://github.com/fishtown-analytics/dbt/pull/2348))

## dbt 0.17.0 (Release TBD)

## dbt 0.17.0rc1 (May 12, 2020)
Expand Down Expand Up @@ -28,7 +31,7 @@
- Track distinct project hashes in anonymous usage metrics for package downloads ([#2351](https://github.com/fishtown-analytics/dbt/issues/2351), [#2429](https://github.com/fishtown-analytics/dbt/pull/2429))

Contributors:
- [@azhard](https://github.com/azhard) ([#2413](https://github.com/fishtown-analytics/dbt/pull/2413), [#2422](https://github.com/fishtown-analytics/dbt/pull/2422))
- [@azhard](https://github.com/azhard) ([#2413](https://github.com/fishtown-analytics/dbt/pull/2413), [#2422](https://github.com/fishtown-analytics/dbt/pull/2422))
- [@mikaelene](https://github.com/mikaelene) [#2414](https://github.com/fishtown-analytics/dbt/pull/2414)
- [@raalsky](https://github.com/Raalsky) ([#2343](https://github.com/fishtown-analytics/dbt/pull/2343))
- [@alf-mindshift](https://github.com/alf-mindshift) ([docs#90](https://github.com/fishtown-analytics/dbt-docs/pull/90))
Expand Down
1 change: 1 addition & 0 deletions core/dbt/contracts/graph/model_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ class NodeConfig(BaseConfig):
# TODO: hide this one?
metadata=MergeBehavior.Append.meta(),
)
full_refresh: Optional[bool] = None

@classmethod
def from_dict(cls, data, validate=True):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@
{{ return(relation is not none
and relation.type == 'table'
and model.config.materialized == 'incremental'
and not flags.FULL_REFRESH) }}
and not should_full_refresh()) }}
{% endif %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,12 @@
identifier=relation.identifier
)) -%}
{% endmacro %}


{% macro should_full_refresh() %}
{% set config_full_refresh = config.get('full_refresh', false) %}
{% if config_full_refresh is none %}
{% set config_full_refresh = flags.FULL_REFRESH %}
{% endif %}
{% do return(config_full_refresh) %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
{% materialization incremental, default -%}

{% set unique_key = config.get('unique_key') %}
{% set full_refresh_mode = flags.FULL_REFRESH %}

{% set target_relation = this.incorporate(type='table') %}
{% set existing_relation = load_relation(this) %}
Expand All @@ -16,7 +15,7 @@
{% set to_drop = [] %}
{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view or full_refresh_mode %}
{% elif existing_relation.is_view or should_full_refresh() %}
{#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #}
{% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %}
{% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
{% materialization seed, default %}

{%- set identifier = model['alias'] -%}
{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
-- that's an error. If we were told to full refresh, drop it. This behavior differs
-- for Snowflake and BigQuery, so multiple dispatch is used.
{%- if old_relation is not none and old_relation.is_table -%}
{{ handle_existing_table(flags.FULL_REFRESH, old_relation) }}
{{ handle_existing_table(should_full_refresh(), old_relation) }}
{%- endif -%}

-- build model
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

{% macro bq_insert_overwrite(tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns) %}
{%- set partition_type =
'date' if partition_by.data_type in ('timestamp, datetime')
'date' if partition_by.data_type in ('timestamp, datetime')
else partition_by.data_type -%}

{% if partitions is not none and partitions != [] %} {# static #}

{% set predicate -%}
{{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in (
{{ partitions | join (', ') }}
Expand All @@ -33,11 +33,11 @@
{{sql}}
)
{%- endset -%}

{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=true) }}

{% else %} {# dynamic #}

{% set predicate -%}
{{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement)
{%- endset %}
Expand Down Expand Up @@ -65,7 +65,7 @@
array_agg(distinct {{ partition_by.render() }})
from {{ tmp_relation }}
);

{#
TODO: include_sql_header is a hack; consider a better approach that includes
the sql_header at the materialization-level instead
Expand All @@ -75,7 +75,7 @@

-- 4. clean up the temp table
drop table if exists {{ tmp_relation }}

{% endif %}

{% endmacro %}
Expand All @@ -84,12 +84,12 @@
{% materialization incremental, adapter='bigquery' -%}

{%- set unique_key = config.get('unique_key') -%}
{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}

{%- set target_relation = this %}
{%- set existing_relation = load_relation(this) %}
{%- set tmp_relation = make_temp_relation(this) %}

{#-- Validate early so we don't run SQL if the strategy is invalid --#}
{% set strategy = dbt_bigquery_validate_get_incremental_strategy(config) -%}

Expand Down Expand Up @@ -118,14 +118,14 @@

{#-- if partitioned, use BQ scripting to get the range of partition values to be updated --#}
{% if strategy == 'insert_overwrite' %}

{% set missing_partition_msg -%}
The 'insert_overwrite' strategy requires the `partition_by` config.
{%- endset %}
{% if partition_by is none %}
{% do exceptions.raise_compiler_error(missing_partition_msg) %}
{% endif %}

{% set build_sql = bq_insert_overwrite(
tmp_relation,
target_relation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
{% materialization incremental, adapter='snowflake' -%}

{%- set unique_key = config.get('unique_key') -%}
{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}

{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select * from {{ ref('seed_actual') }}
66 changes: 49 additions & 17 deletions test/integration/005_simple_seed_test/test_simple_seed.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ class TestSimpleSeed(DBTIntegrationTest):

def setUp(self):
DBTIntegrationTest.setUp(self)

self.run_sql_file("seed.sql")

@property
Expand All @@ -16,7 +15,7 @@ def schema(self):

@property
def models(self):
return "models"
return "models-downstream-seed"

@property
def project_config(self):
Expand All @@ -28,27 +27,61 @@ def project_config(self):
}
}

def use_full_refresh_project(self, full_refresh: bool):
overrides = {
'seeds': {
'quote_columns': False,
'full_refresh': full_refresh,
}
}
self.use_default_project(overrides)

def _seed_and_run(self):
assert len(self.run_dbt(['seed'])) == 1
self.assertTablesEqual('seed_actual', 'seed_expected')

assert len(self.run_dbt(['run'])) == 1
self.assertTablesEqual('model', 'seed_expected')

def _after_seed_model_state(self, cmd, exists: bool):
assert len(self.run_dbt(cmd)) == 1
self.assertTablesEqual('seed_actual', 'seed_expected')
if exists:
self.assertTableDoesExist('model')
else:
self.assertTableDoesNotExist('model')

@use_profile('postgres')
def test_postgres_simple_seed(self):
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
self.assertTablesEqual("seed_actual","seed_expected")
self._seed_and_run()

# this should truncate the seed_actual table, then re-insert.
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
self.assertTablesEqual("seed_actual","seed_expected")
self._after_seed_model_state(['seed'], exists=True)

@use_profile('postgres')
def test_postgres_simple_seed_with_drop(self):
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
self.assertTablesEqual("seed_actual","seed_expected")
def test_postgres_simple_seed_full_refresh_flag(self):
self._seed_and_run()

# this should drop the seed table, then re-create
results = self.run_dbt(["seed", "--full-refresh"])
self.assertEqual(len(results), 1)
self.assertTablesEqual("seed_actual","seed_expected")
# this should drop the seed_actual table, then re-create it, so the
# model won't exist.
self._after_seed_model_state(['seed', '--full-refresh'], exists=False)

@use_profile('postgres')
def test_postgres_simple_seed_full_refresh_config(self):
self._seed_and_run()

# set the full_refresh config to False
self.use_full_refresh_project(False)

self._after_seed_model_state(['seed'], exists=True)
# make sure we ignore the full-refresh flag (the config is higher
# priority than the flag)
self._after_seed_model_state(['seed', '--full-refresh'], exists=True)

# this should drop the seed_actual table, then re-create it, so the
# model won't exist.
self.use_full_refresh_project(True)
self._after_seed_model_state(['seed'], exists=False)


class TestSimpleSeedCustomSchema(DBTIntegrationTest):
Expand Down Expand Up @@ -89,7 +122,6 @@ def test_postgres_simple_seed_with_schema(self):
self.assertEqual(len(results), 1)
self.assertTablesEqual("seed_actual","seed_expected", table_a_schema=schema_name)


@use_profile('postgres')
def test_postgres_simple_seed_with_drop_and_schema(self):
schema_name = "{}_{}".format(self.unique_schema(), 'custom_schema')
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
select 1 as id

{% if adapter.already_exists(this.schema, this.identifier) and not flags.FULL_REFRESH %}
{% if adapter.already_exists(this.schema, this.identifier) and not should_full_refresh() %}
where id > (select max(id) from {{this}})
{% endif %}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ def schema(self):
def models(self):
return "models"

def run_dbt_full_refresh(self):
return self.run_dbt(['run', '--full-refresh'])

@use_profile('postgres')
def test_postgres_full_refresh(self):
# initial full-refresh should have no effect
results = self.run_dbt(['run', '--full-refresh'])
results = self.run_dbt_full_refresh()
self.assertEqual(len(results), 3)

self.assertTablesEqual("seed", "view")
Expand All @@ -37,13 +40,13 @@ def test_postgres_full_refresh(self):

# adds one record to the incremental model. full-refresh should truncate then re-run
self.run_sql_file("invalidate_incremental.sql")
results = self.run_dbt(['run', '--full-refresh'])
results = self.run_dbt_full_refresh()
self.assertEqual(len(results), 3)
self.assertTablesEqual("seed", "incremental")

self.run_sql_file("update.sql")

results = self.run_dbt(['run', '--full-refresh'])
results = self.run_dbt_full_refresh()
self.assertEqual(len(results), 3)

self.assertTablesEqual("seed", "view")
Expand All @@ -59,3 +62,14 @@ def test_postgres_delete__dbt_tmp_relation(self):

self.assertTableDoesNotExist('view__dbt_tmp')
self.assertTablesEqual("seed", "view")


class TestRuntimeMaterializationWithConfig(TestRuntimeMaterialization):
@property
def project_config(self):
result = super().project_config
result.update({'models': {'full_refresh': True}})
return result

def run_dbt_full_refresh(self):
return self.run_dbt(['run'])
Loading

0 comments on commit 13a66f5

Please sign in to comment.