Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Snowflake clustering #1591

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions plugins/snowflake/dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ class SnowflakeAdapter(SQLAdapter):
Relation = SnowflakeRelation
ConnectionManager = SnowflakeConnectionManager

AdapterSpecificConfigs = frozenset({"transient"})
AdapterSpecificConfigs = frozenset(
{"transient", "cluster_by", "automatic_clustering"}
)

@classmethod
def date_function(cls):
return 'CURRENT_TIMESTAMP()'
return "CURRENT_TIMESTAMP()"

@classmethod
def _catalog_filter_table(cls, table, manifest):
Expand All @@ -23,20 +25,21 @@ def _catalog_filter_table(cls, table, manifest):
lowered = table.rename(
column_names=[c.lower() for c in table.column_names]
)
return super(SnowflakeAdapter, cls)._catalog_filter_table(lowered,
manifest)
return super(SnowflakeAdapter, cls)._catalog_filter_table(
lowered, manifest
)

def _make_match_kwargs(self, database, schema, identifier):
quoting = self.config.quoting
if identifier is not None and quoting['identifier'] is False:
if identifier is not None and quoting["identifier"] is False:
identifier = identifier.upper()

if schema is not None and quoting['schema'] is False:
if schema is not None and quoting["schema"] is False:
schema = schema.upper()

if database is not None and quoting['database'] is False:
if database is not None and quoting["database"] is False:
database = database.upper()

return filter_null_values({'identifier': identifier,
'schema': schema,
'database': database})
return filter_null_values(
{"identifier": identifier, "schema": schema, "database": database}
)
35 changes: 27 additions & 8 deletions plugins/snowflake/dbt/include/snowflake/macros/adapters.sql
Original file line number Diff line number Diff line change
@@ -1,14 +1,33 @@
{% macro snowflake__create_table_as(temporary, relation, sql) -%}
{%- set transient = config.get('transient', default=true) -%}
{%- set cluster_by_keys = config.get('cluster_by', default=none) -%}
{%- set enable_automatic_clustering = config.get('automatic_clustering', default=false) -%}
{%- if cluster_by_keys is not none and cluster_by_keys is string -%}
{%- set cluster_by_keys = [cluster_by_keys] -%}
{%- set cluster_by_string = cluster_by_keys|join(", ")-%}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just did one last scan here - I think this won't work properly if cluster_by_keys is provided as a list. If that happens, then the code in this branch won't be executed. I think we should move this line outside of the if block here.

Copy link
Contributor Author

@bastienboutonnet bastienboutonnet Aug 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh dear... good catch! I can certainly fix that! BUT I just realised to cleaned my fork so I don't actually know how to can edit this on my side. I had to fork dbt again as we were experimenting and forgot this was still pending. Do you know what would be the best way to go? I could probably fork again, or do a new PR but then we'd loose the trace of our discussion (although we could refer to it even if the PR is closed). Let me know

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh dear indeed! I've never seen this before!

There's a cool thing you can do on GitHub - if you add .patch to the end of a PR url, you'll get a "patch" for the change. I think you should be able to make a clean branch off of dev/0.14.1, then do something like:

curl https://patch-diff.githubusercontent.com/raw/fishtown-analytics/dbt/pull/1591.patch > 1591.patch
git apply 1591.patch

If git says that it can't apply the patch, you can try applying the patch from the commit 9e36ebd.

Let me know how it goes!

{%- endif -%}

create or replace {% if temporary -%}
temporary
{%- elif transient -%}
transient
{%- endif %} table {{ relation }}
as (
{%- if cluster_by_keys is not none -%}
select * from(
{{ sql }}
) order by ({{ cluster_by_string }})
{%- else -%}
{{ sql }}
{%- endif %}
);
{% if cluster_by_keys is not none -%}
alter table {{relation}} cluster by ({{cluster_by_string}});
{%- endif -%}
{% if enable_automatic_clustering -%}
alter table {{relation}} resume recluster;
{%- endif -%}

create or replace {% if temporary -%}
temporary
{%- elif transient -%}
transient
{%- endif %} table {{ relation }}
as (
{{ sql }}
);
{% endmacro %}

{% macro snowflake__create_view_as(relation, sql) -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

{%- call statement('main') -%}
{{ create_table_as(false, target_relation, sql) }}

{%- endcall -%}

{%- else -%}
Expand Down