Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add insert-by-period materialization #69

Merged
merged 5 commits into from
Jun 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 55 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ This macro implements an "outer union." The list of tables provided to this macr
Usage:
```
{{ dbt_utils.union_tables(
tables=[ref('table_1'), ref('table_2')],
column_override={"some_field": "varchar(100)"},
tables=[ref('table_1'), ref('table_2')],
column_override={"some_field": "varchar(100)"},
exclude=["some_other_field"]
) }}
```
Expand Down Expand Up @@ -266,6 +266,59 @@ Usage:
```
{{ dbt_utils.get_url_parameter(field='page_url', url_parameter='utm_source') }}
```
---
### Materializations
#### insert_by_period ([source](macros/materializations/insert_by_period_materialization.sql))
`insert_by_period` allows dbt to insert records into a table one period (i.e. day, week) at a time.

This materialization is appropriate for event data that can be processed in discrete periods. It is similar in concept to the built-in incremental materialization, but has the added benefit of building the model in chunks even during a full-refresh so is particularly useful for models where the initial run can be problematic.

Should a run of a model using this materialization be interrupted, a subsequent run will continue building the target table from where it was interrupted (granted the `--full-refresh` flag is omitted).

Progress is logged in the command line for easy monitoring.

Usage:
```sql
{{
config(
materialized = "insert_by_period",
period = "day",
timestamp_field = "created_at",
start_date = "2018-01-01",
stop_date = "2018-06-01"
}}

with events as (

select *
from {{ ref('events') }}
where __PERIOD_FILTER__ -- This will be replaced with a filter in the materialization code

)

....complex aggregates here....

```
Configuration values:
* `period`: period to break the model into, must be a valid [datepart](https://docs.aws.amazon.com/redshift/latest/dg/r_Dateparts_for_datetime_functions.html) (default='Week')
* `timestamp_field`: the column name of the timestamp field that will be used to break the model into smaller queries
* `start_date`: literal date or timestamp - generally choose a date that is earlier than the start of your data
* `stop_date`: literal date or timestamp (default=current_timestamp)

Caveats:
* This materialization is compatible with dbt 0.10.1.
* This materialization has been written for Redshift.
* This materialization can only be used for a model where records are not expected to change after they are created.
* Any model post-hooks that use `{{ this }}` will fail using this materialization. For example:
```yaml
models:
project-name:
post-hook: "grant select on {{ this }} to db_reader"
```
A useful workaround is to change the above post-hook to:
```yaml
post-hook: "grant select on {{ this.schema }}.{{ this.name }} to db_reader"
```

----

Expand Down
175 changes: 175 additions & 0 deletions macros/materializations/insert_by_period_materialization.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
{% macro get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%}

{% call statement('period_boundaries', fetch_result=True) -%}
with data as (
select
coalesce(max("{{timestamp_field}}"), '{{start_date}}')::timestamp as start_timestamp,
coalesce(
{{dbt_utils.dateadd('millisecond',
-1,
"nullif('" ~ stop_date ~ "','')::timestamp")}},
{{dbt_utils.current_timestamp()}}
) as stop_timestamp
from "{{target_schema}}"."{{target_table}}"
)

select
start_timestamp,
stop_timestamp,
{{dbt_utils.datediff('start_timestamp',
'stop_timestamp',
period)}} + 1 as num_periods
from data
{%- endcall %}

{%- endmacro %}

{% macro get_period_sql(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%}

{%- set period_filter -%}
("{{timestamp_field}}" > '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' and
"{{timestamp_field}}" <= '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' + interval '1 {{period}}' and
"{{timestamp_field}}" < '{{stop_timestamp}}'::timestamp)
{%- endset -%}

{%- set filtered_sql = sql | replace("__PERIOD_FILTER__", period_filter) -%}

select
{{target_cols_csv}}
from (
{{filtered_sql}}
)

{%- endmacro %}

{% materialization insert_by_period, default -%}
{%- set timestamp_field = config.require('timestamp_field') -%}
{%- set start_date = config.require('start_date') -%}
{%- set stop_date = config.get('stop_date') or '' -%}}
{%- set period = config.get('period') or 'week' -%}

{%- if not '__PERIOD_FILTER__' is in sql -%}
{%- set error_message -%}
Model '{{ model.unique_id }}' does not include the required string '__PERIOD_FILTER__' in its sql
{%- endset -%}
{{ exceptions.raise_compiler_error(error_message) }}
{%- endif -%}

{%- set identifier = model['name'] -%}

{%- set existing_relations = adapter.list_relations(schema=schema) -%}
{%- set old_relation = adapter.get_relation(relations_list=existing_relations,
schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, type='table') -%}

{%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%}
{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}

{%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%}
{%- set exists_not_as_table = (old_relation is not none and not old_relation.is_table) -%}

{%- set should_truncate = (non_destructive_mode and full_refresh_mode and exists_as_table) -%}
{%- set should_drop = (not should_truncate and (full_refresh_mode or exists_not_as_table)) -%}
{%- set force_create = (flags.FULL_REFRESH and not flags.NON_DESTRUCTIVE) -%}

-- setup
{% if old_relation is none -%}
-- noop
{%- elif should_truncate -%}
{{adapter.truncate_relation(old_relation)}}
{%- elif should_drop -%}
{{adapter.drop_relation(old_relation)}}
{%- set old_relation = none -%}
{%- endif %}

{{run_hooks(pre_hooks, inside_transaction=False)}}

-- `begin` happens here, so `commit` after it to finish the transaction
{{run_hooks(pre_hooks, inside_transaction=True)}}
{% call statement() -%}
begin; -- make extra sure we've closed out the transaction
commit;
{%- endcall %}

-- build model
{% if force_create or old_relation is none -%}
{# Create an empty target table -#}
{% call statement('main') -%}
{%- set empty_sql = sql | replace("__PERIOD_FILTER__", 'false') -%}
{{create_table_as(False, target_relation, empty_sql)}};
{%- endcall %}
{%- endif %}

{% set _ = dbt_utils.get_period_boundaries(schema,
identifier,
timestamp_field,
start_date,
stop_date,
period) %}
{%- set start_timestamp = load_result('period_boundaries')['data'][0][0] | string -%}
{%- set stop_timestamp = load_result('period_boundaries')['data'][0][1] | string -%}
{%- set num_periods = load_result('period_boundaries')['data'][0][2] | int -%}

{% set target_columns = adapter.get_columns_in_table(schema, identifier) %}
{%- set target_cols_csv = target_columns | map(attribute='quoted') | join(', ') -%}
{%- set loop_vars = {'sum_rows_inserted': 0} -%}

-- commit each period as a separate transaction
{% for i in range(num_periods) -%}
{%- set msg = "Running for " ~ period ~ " " ~ (i + 1) ~ " of " ~ (num_periods) -%}
{{log(" + " ~ modules.datetime.datetime.now().strftime('%H:%M:%S') ~ " " ~ msg, info=True)}}

{%- set tmp_identifier = model['name'] ~ '__dbt_incremental_period' ~ i ~ '_tmp' -%}
{%- set tmp_relation = api.Relation.create(identifier=tmp_identifier,
schema=schema, type='table') -%}
{% call statement() -%}
{% set tmp_table_sql = dbt_utils.get_period_sql(target_cols_csv,
sql,
timestamp_field,
period,
start_timestamp,
stop_timestamp,
i) %}
{{dbt.create_table_as(True, tmp_relation, tmp_table_sql)}}
{%- endcall %}

{{adapter.expand_target_column_types(temp_table=tmp_identifier,
to_schema=schema,
to_table=identifier)}}
{%- set name = 'main-' ~ i -%}
{% call statement(name, fetch_result=True) -%}
insert into {{target_relation}} ({{target_cols_csv}})
(
select
{{target_cols_csv}}
from {{tmp_relation.include(schema=False)}}
);
{%- endcall %}
{%- set rows_inserted = (load_result('main-' ~ i)['status'].split(" "))[2] | int -%}
{%- set sum_rows_inserted = loop_vars['sum_rows_inserted'] + rows_inserted -%}
{%- if loop_vars.update({'sum_rows_inserted': sum_rows_inserted}) %} {% endif -%}

{%- set msg = "Ran for " ~ period ~ " " ~ (i + 1) ~ " of " ~ (num_periods) ~ "; " ~ rows_inserted ~ " records inserted" -%}
{{log(" + " ~ modules.datetime.datetime.now().strftime('%H:%M:%S') ~ " " ~ msg, info=True)}}

{%- endfor %}

{% call statement() -%}
begin;
{%- endcall %}

{{run_hooks(post_hooks, inside_transaction=True)}}

{% call statement() -%}
commit;
{%- endcall %}

{{run_hooks(post_hooks, inside_transaction=False)}}

{%- set status_string = "INSERT " ~ loop_vars['sum_rows_inserted'] -%}

{% call noop_statement(name='main', status=status_string) -%}
-- no-op
{%- endcall %}

{%- endmaterialization %}