Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make full refresh a config item #2438

Merged
merged 2 commits into from
May 12, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## dbt 0.18.0 (Release TBD)

### Features
- Added a `full_refresh` config item that overrides the behavior of the `--full-refresh` flag ([#1009](https://github.com/fishtown-analytics/dbt/issues/1009), [#2348](https://github.com/fishtown-analytics/dbt/pull/2348))

## dbt 0.17.0 (Release TBD)

## dbt 0.17.0rc1 (May 12, 2020)
Expand Down Expand Up @@ -28,7 +31,7 @@
- Track distinct project hashes in anonymous usage metrics for package downloads ([#2351](https://github.com/fishtown-analytics/dbt/issues/2351), [#2429](https://github.com/fishtown-analytics/dbt/pull/2429))

Contributors:
- [@azhard](https://github.com/azhard) ([#2413](https://github.com/fishtown-analytics/dbt/pull/2413), [#2422](https://github.com/fishtown-analytics/dbt/pull/2422))
- [@azhard](https://github.com/azhard) ([#2413](https://github.com/fishtown-analytics/dbt/pull/2413), [#2422](https://github.com/fishtown-analytics/dbt/pull/2422))
- [@mikaelene](https://github.com/mikaelene) [#2414](https://github.com/fishtown-analytics/dbt/pull/2414)
- [@raalsky](https://github.com/Raalsky) ([#2343](https://github.com/fishtown-analytics/dbt/pull/2343))
- [@alf-mindshift](https://github.com/alf-mindshift) ([docs#90](https://github.com/fishtown-analytics/dbt-docs/pull/90))
Expand Down
1 change: 1 addition & 0 deletions core/dbt/contracts/graph/model_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ class NodeConfig(BaseConfig):
# TODO: hide this one?
metadata=MergeBehavior.Append.meta(),
)
full_refresh: Optional[bool] = None

@classmethod
def from_dict(cls, data, validate=True):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@
{{ return(relation is not none
and relation.type == 'table'
and model.config.materialized == 'incremental'
and not flags.FULL_REFRESH) }}
and not should_full_refresh()) }}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⭐ ⭐ ⭐

{% endif %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,12 @@
identifier=relation.identifier
)) -%}
{% endmacro %}


{% macro should_full_refresh() %}
{% set config_full_refresh = config.get('full_refresh', false) %}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the second arg to config.get is the validator, right? Do we even want to supply that here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I always forget it's not just a dict!

{% if config_full_refresh is none %}
{% set config_full_refresh = flags.FULL_REFRESH %}
{% endif %}
{% do return(config_full_refresh) %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
{% materialization incremental, default -%}

{% set unique_key = config.get('unique_key') %}
{% set full_refresh_mode = flags.FULL_REFRESH %}

{% set target_relation = this.incorporate(type='table') %}
{% set existing_relation = load_relation(this) %}
Expand All @@ -16,7 +15,7 @@
{% set to_drop = [] %}
{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view or full_refresh_mode %}
{% elif existing_relation.is_view or should_full_refresh() %}
{#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #}
{% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %}
{% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
{% materialization seed, default %}

{%- set identifier = model['alias'] -%}
{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
-- that's an error. If we were told to full refresh, drop it. This behavior differs
-- for Snowflake and BigQuery, so multiple dispatch is used.
{%- if old_relation is not none and old_relation.is_table -%}
{{ handle_existing_table(flags.FULL_REFRESH, old_relation) }}
{{ handle_existing_table(should_full_refresh(), old_relation) }}
{%- endif -%}

-- build model
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

{% macro bq_insert_overwrite(tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns) %}
{%- set partition_type =
'date' if partition_by.data_type in ('timestamp, datetime')
'date' if partition_by.data_type in ('timestamp, datetime')
else partition_by.data_type -%}

{% if partitions is not none and partitions != [] %} {# static #}

{% set predicate -%}
{{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in (
{{ partitions | join (', ') }}
Expand All @@ -33,11 +33,11 @@
{{sql}}
)
{%- endset -%}

{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=true) }}

{% else %} {# dynamic #}

{% set predicate -%}
{{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement)
{%- endset %}
Expand Down Expand Up @@ -65,7 +65,7 @@
array_agg(distinct {{ partition_by.render() }})
from {{ tmp_relation }}
);

{#
TODO: include_sql_header is a hack; consider a better approach that includes
the sql_header at the materialization-level instead
Expand All @@ -75,7 +75,7 @@

-- 4. clean up the temp table
drop table if exists {{ tmp_relation }}

{% endif %}

{% endmacro %}
Expand All @@ -84,12 +84,12 @@
{% materialization incremental, adapter='bigquery' -%}

{%- set unique_key = config.get('unique_key') -%}
{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}

{%- set target_relation = this %}
{%- set existing_relation = load_relation(this) %}
{%- set tmp_relation = make_temp_relation(this) %}

{#-- Validate early so we don't run SQL if the strategy is invalid --#}
{% set strategy = dbt_bigquery_validate_get_incremental_strategy(config) -%}

Expand Down Expand Up @@ -118,14 +118,14 @@

{#-- if partitioned, use BQ scripting to get the range of partition values to be updated --#}
{% if strategy == 'insert_overwrite' %}

{% set missing_partition_msg -%}
The 'insert_overwrite' strategy requires the `partition_by` config.
{%- endset %}
{% if partition_by is none %}
{% do exceptions.raise_compiler_error(missing_partition_msg) %}
{% endif %}

{% set build_sql = bq_insert_overwrite(
tmp_relation,
target_relation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
{% materialization incremental, adapter='snowflake' -%}

{%- set unique_key = config.get('unique_key') -%}
{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}

{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select * from {{ ref('seed_actual') }}
66 changes: 49 additions & 17 deletions test/integration/005_simple_seed_test/test_simple_seed.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ class TestSimpleSeed(DBTIntegrationTest):

def setUp(self):
DBTIntegrationTest.setUp(self)

self.run_sql_file("seed.sql")

@property
Expand All @@ -16,7 +15,7 @@ def schema(self):

@property
def models(self):
return "models"
return "models-downstream-seed"

@property
def project_config(self):
Expand All @@ -28,27 +27,61 @@ def project_config(self):
}
}

def use_full_refresh_project(self, full_refresh: bool):
overrides = {
'seeds': {
'quote_columns': False,
'full_refresh': full_refresh,
}
}
self.use_default_project(overrides)

def _seed_and_run(self):
assert len(self.run_dbt(['seed'])) == 1
self.assertTablesEqual('seed_actual', 'seed_expected')

assert len(self.run_dbt(['run'])) == 1
self.assertTablesEqual('model', 'seed_expected')

def _after_seed_model_state(self, cmd, exists: bool):
assert len(self.run_dbt(cmd)) == 1
self.assertTablesEqual('seed_actual', 'seed_expected')
if exists:
self.assertTableDoesExist('model')
else:
self.assertTableDoesNotExist('model')

@use_profile('postgres')
def test_postgres_simple_seed(self):
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
self.assertTablesEqual("seed_actual","seed_expected")
self._seed_and_run()

# this should truncate the seed_actual table, then re-insert.
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
self.assertTablesEqual("seed_actual","seed_expected")
self._after_seed_model_state(['seed'], exists=True)

@use_profile('postgres')
def test_postgres_simple_seed_with_drop(self):
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
self.assertTablesEqual("seed_actual","seed_expected")
def test_postgres_simple_seed_full_refresh_flag(self):
self._seed_and_run()

# this should drop the seed table, then re-create
results = self.run_dbt(["seed", "--full-refresh"])
self.assertEqual(len(results), 1)
self.assertTablesEqual("seed_actual","seed_expected")
# this should drop the seed_actual table, then re-create it, so the
# model won't exist.
self._after_seed_model_state(['seed', '--full-refresh'], exists=False)

@use_profile('postgres')
def test_postgres_simple_seed_full_refresh_config(self):
self._seed_and_run()

# set the full_refresh config to False
self.use_full_refresh_project(False)

self._after_seed_model_state(['seed'], exists=True)
# make sure we ignore the full-refresh flag (the config is higher
# priority than the flag)
self._after_seed_model_state(['seed', '--full-refresh'], exists=True)

# this should drop the seed_actual table, then re-create it, so the
# model won't exist.
self.use_full_refresh_project(True)
self._after_seed_model_state(['seed'], exists=False)


class TestSimpleSeedCustomSchema(DBTIntegrationTest):
Expand Down Expand Up @@ -89,7 +122,6 @@ def test_postgres_simple_seed_with_schema(self):
self.assertEqual(len(results), 1)
self.assertTablesEqual("seed_actual","seed_expected", table_a_schema=schema_name)


@use_profile('postgres')
def test_postgres_simple_seed_with_drop_and_schema(self):
schema_name = "{}_{}".format(self.unique_schema(), 'custom_schema')
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
select 1 as id

{% if adapter.already_exists(this.schema, this.identifier) and not flags.FULL_REFRESH %}
{% if adapter.already_exists(this.schema, this.identifier) and not should_full_refresh() %}
where id > (select max(id) from {{this}})
{% endif %}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ def schema(self):
def models(self):
return "models"

def run_dbt_full_refresh(self):
return self.run_dbt(['run', '--full-refresh'])

@use_profile('postgres')
def test_postgres_full_refresh(self):
# initial full-refresh should have no effect
results = self.run_dbt(['run', '--full-refresh'])
results = self.run_dbt_full_refresh()
self.assertEqual(len(results), 3)

self.assertTablesEqual("seed", "view")
Expand All @@ -37,13 +40,13 @@ def test_postgres_full_refresh(self):

# adds one record to the incremental model. full-refresh should truncate then re-run
self.run_sql_file("invalidate_incremental.sql")
results = self.run_dbt(['run', '--full-refresh'])
results = self.run_dbt_full_refresh()
self.assertEqual(len(results), 3)
self.assertTablesEqual("seed", "incremental")

self.run_sql_file("update.sql")

results = self.run_dbt(['run', '--full-refresh'])
results = self.run_dbt_full_refresh()
self.assertEqual(len(results), 3)

self.assertTablesEqual("seed", "view")
Expand All @@ -59,3 +62,14 @@ def test_postgres_delete__dbt_tmp_relation(self):

self.assertTableDoesNotExist('view__dbt_tmp')
self.assertTablesEqual("seed", "view")


class TestRuntimeMaterializationWithConfig(TestRuntimeMaterialization):
@property
def project_config(self):
result = super().project_config
result.update({'models': {'full_refresh': True}})
return result

def run_dbt_full_refresh(self):
return self.run_dbt(['run'])
Loading