Skip to content

Commit

Permalink
Add materialization macro for materialized view (ClickHouse#207)
Browse files Browse the repository at this point in the history
* Add materialization macro for materialized view

* fix isort issues in materialized view test
  • Loading branch information
SoryRawyer authored and Savid committed Jan 17, 2024
1 parent 7f5a27a commit 3282123
Show file tree
Hide file tree
Showing 2 changed files with 284 additions and 0 deletions.
120 changes: 120 additions & 0 deletions dbt/include/clickhouse/macros/materializations/materialized_view.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
{#-
Create or update a materialized view in ClickHouse.
This involves creating both the materialized view itself and a
target table that the materialized view writes to.
-#}
{%- materialization materialized_view, adapter='clickhouse' -%}

{%- set target_relation = this.incorporate(type='table') -%}
{%- set mv_name = target_relation.name + '_mv' -%}
{%- set target_mv = api.Relation.create(identifier=mv_name, schema=schema, database=database, type='materializedview') -%}
{%- set cluster_clause = on_cluster_clause(target_relation) -%}

{# look for an existing relation for the target table and create backup relations if necessary #}
{%- set existing_relation = load_cached_relation(this) -%}
{%- set backup_relation = none -%}
{%- set preexisting_backup_relation = none -%}
{%- set preexisting_intermediate_relation = none -%}
{% if existing_relation is not none %}
{%- set backup_relation_type = existing_relation.type -%}
{%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}
{%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%}
{% if not existing_relation.can_exchange %}
{%- set intermediate_relation = make_intermediate_relation(target_relation) -%}
{%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation) -%}
{% endif %}
{% endif %}

{% set grant_config = config.get('grants') %}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- drop the temp relations if they exist already in the database
{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
{{ drop_relation_if_exists(preexisting_backup_relation) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% if backup_relation is none %}
{{ log('Creating new materialized view ' + target_relation.name )}}
{% call statement('main') -%}
{{ clickhouse__get_create_materialized_view_as_sql(target_relation, sql) }}
{%- endcall %}
{% elif existing_relation.can_exchange %}
{{ log('Replacing existing materialized view' + target_relation.name) }}
{% call statement('drop existing materialized view') %}
drop view if exists {{ mv_name }} {{ cluster_clause }}
{% endcall %}
{% call statement('main') -%}
{{ get_create_table_as_sql(False, backup_relation, sql) }}
{%- endcall %}
{% do exchange_tables_atomic(backup_relation, existing_relation) %}
{% call statement('create new materialized view') %}
{{ clickhouse__create_mv_sql(mv_name, existing_relation.name, cluster_clause, sql) }}
{% endcall %}
{% else %}
{{ log('Replacing existing materialized view' + target_relation.name) }}
{{ clickhouse__replace_mv(target_relation, existing_relation, intermediate_relation, backup_relation, sql) }}
{% endif %}

-- cleanup
{% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{{ adapter.commit() }}

{{ drop_relation_if_exists(backup_relation) }}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation, target_mv]}) }}

{%- endmaterialization -%}


{#
There are two steps to creating a materialized view:
1. Create a new table based on the SQL in the model
2. Create a materialized view using the SQL in the model that inserts
data into the table creating during step 1
#}
{% macro clickhouse__get_create_materialized_view_as_sql(relation, sql) -%}
{% call statement('create_target_table') %}
{{ get_create_table_as_sql(False, relation, sql) }}
{% endcall %}
{%- set cluster_clause = on_cluster_clause(relation) -%}
{%- set mv_name = relation.name + '_mv' -%}
{{ clickhouse__create_mv_sql(mv_name, relation.name, cluster_clause, sql) }}
{%- endmacro %}


{% macro clickhouse__create_mv_sql(relation_name, target_table, cluster_clause, sql) -%}
create materialized view if not exists {{ relation_name }} {{ cluster_clause }}
to {{ target_table }}
as {{ sql }}
{%- endmacro %}


{% macro clickhouse__replace_mv(target_relation, existing_relation, intermediate_relation, backup_relation, sql) %}
{# drop existing materialized view while we recreate the target table #}
{%- set cluster_clause = on_cluster_clause(target_relation) -%}
{%- set mv_name = target_relation.name + '_mv' -%}
{% call statement('drop existing mv') -%}
drop view if exists {{ mv_name }} {{ cluster_clause }}
{%- endcall %}

{# recreate the target table #}
{% call statement('main') -%}
{{ get_create_table_as_sql(False, intermediate_relation, sql) }}
{%- endcall %}
{{ adapter.rename_relation(existing_relation, backup_relation) }}
{{ adapter.rename_relation(intermediate_relation, target_relation) }}

{# now that the target table is recreated, we can finally create our new view #}
{{ clickhouse__create_mv_sql(mv_name, target_relation.name, cluster_clause, sql) }}
{% endmacro %}
164 changes: 164 additions & 0 deletions tests/integration/adapter/test_materialized_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
"""
test materialized view creation
"""

import json

import pytest
from dbt.tests.util import check_relation_types, run_dbt

PEOPLE_SEED_CSV = """
id,name,age,department
1231,Dade,33,engineering
6666,Ksenia,48,engineering
8888,Kate,50,engineering
""".lstrip()

# This model is parameterized, in a way, by the "run_type" dbt project variable
# This is to be able to switch between different model definitions within
# the same test run and allow us to test the evolution of a materialized view
MV_MODEL = """
{{ config(
materialized='materialized_view',
engine='MergeTree()',
order_by='(id)',
) }}
{% if var('run_type', '') == '' %}
select
id,
name,
case
when name like 'Dade' then 'crash_override'
when name like 'Kate' then 'acid burn'
else 'N/A'
end as hacker_alias
from {{ source('raw', 'people') }}
where department = 'engineering'
{% else %}
select
id,
name,
case
-- Dade wasn't always known as 'crash override'!
when name like 'Dade' and age = 11 then 'zero cool'
when name like 'Dade' and age != 11 then 'crash override'
when name like 'Kate' then 'acid burn'
else 'N/A'
end as hacker_alias
from {{ source('raw', 'people') }}
where department = 'engineering'
{% endif %}
"""


SEED_SCHEMA_YML = """
version: 2
sources:
- name: raw
schema: "{{ target.schema }}"
tables:
- name: people
"""


class TestBasicMV:
@pytest.fixture(scope="class")
def seeds(self):
"""
we need a base table to pull from
"""
return {
"people.csv": PEOPLE_SEED_CSV,
"schema.yml": SEED_SCHEMA_YML,
}

@pytest.fixture(scope="class")
def models(self):
return {
"hackers.sql": MV_MODEL,
}

def test_create(self, project):
"""
1. create a base table via dbt seed
2. create a model as a materialized view, selecting from the table created in (1)
3. insert data into the base table and make sure it's there in the target table created in (2)
"""
results = run_dbt(["seed"])
assert len(results) == 1
columns = project.run_sql("DESCRIBE TABLE people", fetch="all")
assert columns[0][1] == "Int32"

# create the model
results = run_dbt()
assert len(results) == 1

columns = project.run_sql("DESCRIBE TABLE hackers", fetch="all")
assert columns[0][1] == "Int32"

columns = project.run_sql("DESCRIBE hackers_mv", fetch="all")
assert columns[0][1] == "Int32"

check_relation_types(
project.adapter,
{
"hackers_mv": "view",
"hackers": "table",
},
)

# insert some data and make sure it reaches the target table
project.run_sql(
f"""
insert into {project.test_schema}.people ("id", "name", "age", "department")
values (1232,'Dade',16,'engineering'), (9999,'eugene',40,'malware');
"""
)

result = project.run_sql("select count(*) from hackers", fetch="all")
assert result[0][0] == 4


class TestUpdateMV:
@pytest.fixture(scope="class")
def seeds(self):
"""
we need a base table to pull from
"""
return {
"people.csv": PEOPLE_SEED_CSV,
"schema.yml": SEED_SCHEMA_YML,
}

@pytest.fixture(scope="class")
def models(self):
return {
"hackers.sql": MV_MODEL,
}

def test_update(self, project):
# create our initial materialized view
run_dbt(["seed"])
run_dbt()

# re-run dbt but this time with the new MV SQL
run_vars = {"run_type": "extended_schema"}
run_dbt(["run", "--vars", json.dumps(run_vars)])

project.run_sql(
f"""
insert into {project.test_schema}.people ("id", "name", "age", "department")
values (1232,'Dade',11,'engineering'), (9999,'eugene',40,'malware');
"""
)

# assert that we now have both of Dade's aliases in our hackers table
result = project.run_sql(
"select distinct hacker_alias from hackers where name = 'Dade'", fetch="all"
)
assert len(result) == 2

0 comments on commit 3282123

Please sign in to comment.