Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fixes related to model settings. #214

Merged
merged 1 commit into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,26 @@
### Release [1.5.2], 2023-11-28
#### Bug Fix
#### Bug Fixes
- The `ON CLUSTER` clause was in the incorrect place for legacy incremental materializations. This has been fixed. Thanks to
[Steven Reitsma](https://github.com/StevenReitsma) for the fix!
- The `ON CLUSTER` DDL for drop tables did not include a SYNC modifier, which might be the cause of some "table already exists"
errors
errors. The `SYNC` modifier has been added to the `on_cluster` macro when dropping relations.
- Fixed a bug where using table settings such as `allow_nullable_key` would break "legacy" incremental materializations. Closes
https://github.com/ClickHouse/dbt-clickhouse/issues/209. Also see the new model `config` property `insert_settings` described
below.
- Fixed an issue where incremental materializations would incorrectly exclude duplicated inserted elements due to "automatic"
ClickHouse deduplication on replicated tables. Closes https://github.com/ClickHouse/dbt-clickhouse/issues/213. The fix consists
of always sending a `replicated_deduplication_window=0` table setting when creating the incremental relations. This
behavior can be overridden by setting the new profile parameter `allow_automatic_deduplication` to `True`, although for
general dbt operations this is probably not necessary and not recommended. Finally thanks to Andy(https://github.com/andy-miracl)
for the report and debugging help!

#### Improvements
- Added a new profile property `allow_automatic_deduplication`, which defaults to `False`. ClickHouse Replicated deduplication is
now disable for incremental inserts, but this property can be set to true if for some reason the default ClickHouse behavior
for inserted blocks is desired.
- Added a new model `config` property `query_settings` for any ClickHouse settings that should be sent with the `INSERT INTO`
or `DELETE_FROM` queries used with materializations. Note this is distinct from the existing property `settings` which is
used for ClickHouse "table" settings in DDL statements like `CREATE TABLE ... AS`.

### Release [1.5.1], 2023-11-27
#### Bug Fix
Expand Down
33 changes: 22 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ your_profile_name:
use_lw_deletes: [False] Use the strategy `delete+insert` as the default incremental strategy.
check_exchange: [True] # Validate that clickhouse support the atomic EXCHANGE TABLES command. (Not needed for most ClickHouse versions)
local_suffix [_local] # Table suffix of local tables on shards for distributed materializations.
allow_automatic_deduplication [False] # Enable ClickHouse automatic deduplication for Replicated tables
custom_settings: [{}] # A dictionary/mapping of custom ClickHouse settings for the connection - default is empty.

# Native (clickhouse-driver) connection settings
Expand All @@ -87,17 +88,27 @@ your_profile_name:

## Model Configuration

| Option | Description | Required? |
|------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------|
| engine | The table engine (type of table) to use when creating tables | Optional (default: `MergeTree()`) |
| order_by | A tuple of column names or arbitrary expressions. This allows you to create a small sparse index that helps find data faster. | Optional (default: `tuple()`) |
| partition_by | A partition is a logical combination of records in a table by a specified criterion. The partition key can be any expression from the table columns. | Optional |
| sharding_key | Sharding key determines the destination server when inserting into distributed engine table. The sharding key can be random or as an output of a hash function | Optional (default: `rand()`) |
| primary_key | Like order_by, a ClickHouse primary key expression. If not specified, ClickHouse will use the order by expression as the primary key |
| unique_key | A tuple of column names that uniquely identify rows. Used with incremental models for updates. | Optional |
| inserts_only | If set to True for an incremental model, incremental updates will be inserted directly to the target table without creating intermediate table. It has been deprecated in favor of the `append` incremental `strategy`, which operates in the same way | Optional |
| incremental_strategy | Incremental model update strategy of `delete+insert` or `append`. See the following Incremental Model Strategies | Optional (default: `default`) |
| incremental_predicates | Additional conditions to be applied to the incremental materialization (only applied to `delete+insert` strategy |
| Option | Description | Default if any |
|------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------|
| engine | The table engine (type of table) to use when creating tables | `MergeTree()` |
| order_by | A tuple of column names or arbitrary expressions. This allows you to create a small sparse index that helps find data faster. | `tuple()` |
| partition_by | A partition is a logical combination of records in a table by a specified criterion. The partition key can be any expression from the table columns. | |
| sharding_key | Sharding key determines the destination server when inserting into distributed engine table. The sharding key can be random or as an output of a hash function | `rand()`) |
| primary_key | Like order_by, a ClickHouse primary key expression. If not specified, ClickHouse will use the order by expression as the primary key | |
| unique_key | A tuple of column names that uniquely identify rows. Used with incremental models for updates. | |
| inserts_only | If set to True for an incremental model, incremental updates will be inserted directly to the target table without creating intermediate table. It has been deprecated in favor of the `append` incremental `strategy`, which operates in the same way | |
| incremental_strategy | Incremental model update strategy of `delete+insert` or `append`. See the following Incremental Model Strategies | `default` |
| incremental_predicates | Additional conditions to be applied to the incremental materialization (only applied to `delete+insert` strategy | |
| settings | A map/dictionary of "TABLE" settings to be used to DDL statements like 'CREATE TABLE' with this model | |
| query_settings | A map/dictionary of ClickHouse user level settings to be used with `INSERT` or `DELETE` statements in conjunction with this model | |

## A Note on Model Settings
ClickHouse has several types/levels of "settings". In the model configuration above, two types of these are configurable. `settings` means the `SETTINGS`
clause used in `CREATE TABLE/VIEW` types of DDL statements, so this is generally settings that are specific to the specific ClickHouse table engine. The new
`query_settings` is use to add a `SETTINGS` clause to the `INSERT` and `DELETE` queries used for model materialization (including incremental materializations).
There are hundreds of ClickHouse settings, and it's not always clear which is a "table" setting and which is a "user" setting (although the latter are generally
available in the `system.settings` table.) In general the defaults are recommended, and any use of these properties should be carefully researched and tested.

## ClickHouse Cluster

`cluster` setting in profile enables dbt-clickhouse to run against a ClickHouse cluster.
Expand Down
2 changes: 2 additions & 0 deletions dbt/adapters/clickhouse/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class ClickHouseCredentials(Credentials):
custom_settings: Optional[Dict[str, Any]] = None
use_lw_deletes: bool = False
local_suffix: str = 'local'
allow_automatic_deduplication = False

@property
def type(self):
Expand Down Expand Up @@ -73,4 +74,5 @@ def _connection_keys(self):
'check_exchange',
'custom_settings',
'use_lw_deletes',
'allow_automatic_deduplication',
)
10 changes: 10 additions & 0 deletions dbt/adapters/clickhouse/dbclient.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import uuid
from abc import ABC, abstractmethod
from typing import Dict

from dbt.exceptions import DbtDatabaseError, FailedToConnectError

Expand All @@ -8,6 +9,7 @@

LW_DELETE_SETTING = 'allow_experimental_lightweight_delete'
ND_MUTATION_SETTING = 'allow_nondeterministic_mutations'
DEDUP_WINDOW_SETTING = 'replicated_deduplication_window'


def get_db_client(credentials: ClickHouseCredentials):
Expand Down Expand Up @@ -79,6 +81,9 @@ def __init__(self, credentials: ClickHouseCredentials):
except Exception as ex:
self.close()
raise ex
self._model_settings = {}
if not credentials.allow_automatic_deduplication:
self._model_settings[DEDUP_WINDOW_SETTING] = '0'

@abstractmethod
def query(self, sql: str, **kwargs):
Expand Down Expand Up @@ -115,6 +120,11 @@ def _set_client_database(self):
def _server_version(self):
pass

def update_model_settings(self, model_settings: Dict[str, str]):
for key, value in self._model_settings.items():
if key not in model_settings:
model_settings[key] = value

def _check_lightweight_deletes(self, requested: bool):
lw_deletes = self.get_ch_setting(LW_DELETE_SETTING)
nd_mutations = self.get_ch_setting(ND_MUTATION_SETTING)
Expand Down
12 changes: 11 additions & 1 deletion dbt/adapters/clickhouse/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,17 @@ def run_sql_for_tests(self, sql, fetch, conn):

@available
def get_model_settings(self, model):
settings = model['config'].get('settings', dict())
settings = model['config'].get('settings', {})
conn = self.connections.get_if_exists()
conn.handle.update_model_settings(settings)
res = []
for key in settings:
res.append(f' {key}={settings[key]}')
return '' if len(res) == 0 else 'SETTINGS ' + ', '.join(res) + '\n'

@available
def get_model_query_settings(self, model):
settings = model['config'].get('query_settings', {})
res = []
for key in settings:
res.append(f' {key}={settings[key]}')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,15 +178,15 @@
select {{ unique_key }}
from {{ inserting_relation }}
)
{{ adapter.get_model_settings(model) }}
{{ adapter.get_model_query_settings(model) }}
{% endcall %}

-- Insert all of the new data into the temporary table
{% call statement('insert_new_data') %}
insert into {{ inserted_relation }} ({{ dest_cols_csv }})
select {{ dest_cols_csv }}
from {{ inserting_relation }}
{{ adapter.get_model_settings(model) }}
{{ adapter.get_model_query_settings(model) }}
{% endcall %}

{% do adapter.drop_relation(new_data_relation) %}
Expand Down Expand Up @@ -228,13 +228,14 @@
{% for predicate in incremental_predicates %}
and {{ predicate }}
{% endfor %}
{%- endif -%};
{%- endif -%}
{{ adapter.get_model_query_settings(model) }}
{% endcall %}

{%- set dest_columns = adapter.get_columns_in_relation(existing_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
{% call statement('insert_new_data') %}
insert into {{ existing_relation }} select {{ dest_cols_csv }} from {{ inserting_relation }}
insert into {{ existing_relation }} {{ adapter.get_model_query_settings(model) }} select {{ dest_cols_csv }} from {{ inserting_relation }}
{% endcall %}
{% do adapter.drop_relation(new_data_relation) %}
{{ drop_relation_if_exists(distributed_new_data_relation) }}
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/clickhouse/macros/materializations/seed.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

{% set sql -%}
insert into {{ this.render() }} ({{ cols_sql }})
{{ adapter.get_model_settings(model) }}
{{ adapter.get_model_query_settings(model) }}
format CSV
{{ data_sql }}
{%- endset %}
Expand Down
4 changes: 3 additions & 1 deletion dbt/include/clickhouse/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,13 @@
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}

insert into {{ target_relation }} ({{ dest_cols_csv }})
insert into {{ target_relation }}
({{ dest_cols_csv }})
{%- if has_contract -%}
-- Use a subquery to get columns in the right order
SELECT {{ dest_cols_csv }} FROM ( {{ sql }} )
{%- else -%}
{{ sql }}
{{ adapter.get_model_query_settings(model) }}
{%- endif -%}
{%- endmacro %}
2 changes: 2 additions & 0 deletions tests/integration/adapter/basic/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
column_types:
val2: Nullable(UInt32)
str1: Nullable(String)
settings:
allow_nullable_key: 1
"""

replicated_seeds_schema_yml = """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@ class TestMergeTreeTableMaterialization(BaseSimpleMaterializations):
@pytest.fixture(scope="class")
def models(self):
config_materialized_table = """
{{ config(order_by='(some_date, id, name)', engine='MergeTree()', materialized='table',
settings={'allow_nullable_key': 1}) }}
{{ config(
order_by='(some_date, id, name)',
engine='MergeTree()',
materialized='table',
settings={'allow_nullable_key': 1},
query_settings={'allow_nondeterministic_mutations': 1})
}}
"""
base_table_sql = config_materialized_table + model_base
return {
Expand Down Expand Up @@ -204,7 +209,7 @@ def assert_total_count_correct(self, project):
os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster'
)
def test_base(self, project):
# cluster setting must exists
# cluster setting must exist
cluster = project.test_config['cluster']
assert cluster

Expand Down
4 changes: 2 additions & 2 deletions tests/integration/adapter/constraints/test_constraints.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def test__contract_wrong_column_names(self, project):
assert all([(exp in log_output or exp.upper() in log_output) for exp in expected])

def test__contract_wrong_column_data_types(self, project, data_types):
for (sql_column_value, schema_data_type, error_data_type) in data_types:
for sql_column_value, schema_data_type, error_data_type in data_types:
# Write parametrized data_type to sql file
write_file(
my_model_data_type_sql.format(sql_value=sql_column_value),
Expand Down Expand Up @@ -91,7 +91,7 @@ def test__contract_wrong_column_data_types(self, project, data_types):
assert all([(exp in log_output or exp.upper() in log_output) for exp in expected])

def test__contract_correct_column_data_types(self, project, data_types):
for (sql_column_value, schema_data_type, _) in data_types:
for sql_column_value, schema_data_type, _ in data_types:
# Write parametrized data_type to sql file
write_file(
my_model_data_type_sql.format(sql_value=sql_column_value),
Expand Down
Loading
Loading