diff --git a/CHANGELOG.md b/CHANGELOG.md index bd0431d3..5f4e6162 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,26 @@ ### Release [1.5.2], 2023-11-28 -#### Bug Fix +#### Bug Fixes - The `ON CLUSTER` clause was in the incorrect place for legacy incremental materializations. This has been fixed. Thanks to [Steven Reitsma](https://github.com/StevenReitsma) for the fix! - The `ON CLUSTER` DDL for drop tables did not include a SYNC modifier, which might be the cause of some "table already exists" -errors +errors. The `SYNC` modifier has been added to the `on_cluster` macro when dropping relations. +- Fixed a bug where using table settings such as `allow_nullable_key` would break "legacy" incremental materializations. Closes +https://github.com/ClickHouse/dbt-clickhouse/issues/209. Also see the new model `config` property `insert_settings` described +below. +- Fixed an issue where incremental materializations would incorrectly exclude duplicated inserted elements due to "automatic" +ClickHouse deduplication on replicated tables. Closes https://github.com/ClickHouse/dbt-clickhouse/issues/213. The fix consists +of always sending a `replicated_deduplication_window=0` table setting when creating the incremental relations. This +behavior can be overridden by setting the new profile parameter `allow_automatic_deduplication` to `True`, although for +general dbt operations this is probably not necessary and not recommended. Finally thanks to Andy(https://github.com/andy-miracl) +for the report and debugging help! + +#### Improvements +- Added a new profile property `allow_automatic_deduplication`, which defaults to `False`. ClickHouse Replicated deduplication is +now disable for incremental inserts, but this property can be set to true if for some reason the default ClickHouse behavior +for inserted blocks is desired. +- Added a new model `config` property `query_settings` for any ClickHouse settings that should be sent with the `INSERT INTO` +or `DELETE_FROM` queries used with materializations. Note this is distinct from the existing property `settings` which is +used for ClickHouse "table" settings in DDL statements like `CREATE TABLE ... AS`. ### Release [1.5.1], 2023-11-27 #### Bug Fix diff --git a/README.md b/README.md index 8e008998..b5c8b8b8 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,7 @@ your_profile_name: use_lw_deletes: [False] Use the strategy `delete+insert` as the default incremental strategy. check_exchange: [True] # Validate that clickhouse support the atomic EXCHANGE TABLES command. (Not needed for most ClickHouse versions) local_suffix [_local] # Table suffix of local tables on shards for distributed materializations. + allow_automatic_deduplication [False] # Enable ClickHouse automatic deduplication for Replicated tables custom_settings: [{}] # A dictionary/mapping of custom ClickHouse settings for the connection - default is empty. # Native (clickhouse-driver) connection settings @@ -87,17 +88,27 @@ your_profile_name: ## Model Configuration -| Option | Description | Required? | -|------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------| -| engine | The table engine (type of table) to use when creating tables | Optional (default: `MergeTree()`) | -| order_by | A tuple of column names or arbitrary expressions. This allows you to create a small sparse index that helps find data faster. | Optional (default: `tuple()`) | -| partition_by | A partition is a logical combination of records in a table by a specified criterion. The partition key can be any expression from the table columns. | Optional | -| sharding_key | Sharding key determines the destination server when inserting into distributed engine table. The sharding key can be random or as an output of a hash function | Optional (default: `rand()`) | -| primary_key | Like order_by, a ClickHouse primary key expression. If not specified, ClickHouse will use the order by expression as the primary key | -| unique_key | A tuple of column names that uniquely identify rows. Used with incremental models for updates. | Optional | -| inserts_only | If set to True for an incremental model, incremental updates will be inserted directly to the target table without creating intermediate table. It has been deprecated in favor of the `append` incremental `strategy`, which operates in the same way | Optional | -| incremental_strategy | Incremental model update strategy of `delete+insert` or `append`. See the following Incremental Model Strategies | Optional (default: `default`) | -| incremental_predicates | Additional conditions to be applied to the incremental materialization (only applied to `delete+insert` strategy | +| Option | Description | Default if any | +|------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------| +| engine | The table engine (type of table) to use when creating tables | `MergeTree()` | +| order_by | A tuple of column names or arbitrary expressions. This allows you to create a small sparse index that helps find data faster. | `tuple()` | +| partition_by | A partition is a logical combination of records in a table by a specified criterion. The partition key can be any expression from the table columns. | | +| sharding_key | Sharding key determines the destination server when inserting into distributed engine table. The sharding key can be random or as an output of a hash function | `rand()`) | +| primary_key | Like order_by, a ClickHouse primary key expression. If not specified, ClickHouse will use the order by expression as the primary key | | +| unique_key | A tuple of column names that uniquely identify rows. Used with incremental models for updates. | | +| inserts_only | If set to True for an incremental model, incremental updates will be inserted directly to the target table without creating intermediate table. It has been deprecated in favor of the `append` incremental `strategy`, which operates in the same way | | +| incremental_strategy | Incremental model update strategy of `delete+insert` or `append`. See the following Incremental Model Strategies | `default` | +| incremental_predicates | Additional conditions to be applied to the incremental materialization (only applied to `delete+insert` strategy | | +| settings | A map/dictionary of "TABLE" settings to be used to DDL statements like 'CREATE TABLE' with this model | | +| query_settings | A map/dictionary of ClickHouse user level settings to be used with `INSERT` or `DELETE` statements in conjunction with this model | | + +## A Note on Model Settings +ClickHouse has several types/levels of "settings". In the model configuration above, two types of these are configurable. `settings` means the `SETTINGS` +clause used in `CREATE TABLE/VIEW` types of DDL statements, so this is generally settings that are specific to the specific ClickHouse table engine. The new +`query_settings` is use to add a `SETTINGS` clause to the `INSERT` and `DELETE` queries used for model materialization (including incremental materializations). +There are hundreds of ClickHouse settings, and it's not always clear which is a "table" setting and which is a "user" setting (although the latter are generally +available in the `system.settings` table.) In general the defaults are recommended, and any use of these properties should be carefully researched and tested. + ## ClickHouse Cluster `cluster` setting in profile enables dbt-clickhouse to run against a ClickHouse cluster. diff --git a/dbt/adapters/clickhouse/credentials.py b/dbt/adapters/clickhouse/credentials.py index 427b94c1..cbf65069 100644 --- a/dbt/adapters/clickhouse/credentials.py +++ b/dbt/adapters/clickhouse/credentials.py @@ -33,6 +33,7 @@ class ClickHouseCredentials(Credentials): custom_settings: Optional[Dict[str, Any]] = None use_lw_deletes: bool = False local_suffix: str = 'local' + allow_automatic_deduplication = False @property def type(self): @@ -73,4 +74,5 @@ def _connection_keys(self): 'check_exchange', 'custom_settings', 'use_lw_deletes', + 'allow_automatic_deduplication', ) diff --git a/dbt/adapters/clickhouse/dbclient.py b/dbt/adapters/clickhouse/dbclient.py index ef069ab3..ab5567e8 100644 --- a/dbt/adapters/clickhouse/dbclient.py +++ b/dbt/adapters/clickhouse/dbclient.py @@ -1,5 +1,6 @@ import uuid from abc import ABC, abstractmethod +from typing import Dict from dbt.exceptions import DbtDatabaseError, FailedToConnectError @@ -8,6 +9,7 @@ LW_DELETE_SETTING = 'allow_experimental_lightweight_delete' ND_MUTATION_SETTING = 'allow_nondeterministic_mutations' +DEDUP_WINDOW_SETTING = 'replicated_deduplication_window' def get_db_client(credentials: ClickHouseCredentials): @@ -79,6 +81,9 @@ def __init__(self, credentials: ClickHouseCredentials): except Exception as ex: self.close() raise ex + self._model_settings = {} + if not credentials.allow_automatic_deduplication: + self._model_settings[DEDUP_WINDOW_SETTING] = '0' @abstractmethod def query(self, sql: str, **kwargs): @@ -115,6 +120,11 @@ def _set_client_database(self): def _server_version(self): pass + def update_model_settings(self, model_settings: Dict[str, str]): + for key, value in self._model_settings.items(): + if key not in model_settings: + model_settings[key] = value + def _check_lightweight_deletes(self, requested: bool): lw_deletes = self.get_ch_setting(LW_DELETE_SETTING) nd_mutations = self.get_ch_setting(ND_MUTATION_SETTING) diff --git a/dbt/adapters/clickhouse/impl.py b/dbt/adapters/clickhouse/impl.py index ec4b3c07..6cc6055f 100644 --- a/dbt/adapters/clickhouse/impl.py +++ b/dbt/adapters/clickhouse/impl.py @@ -367,7 +367,17 @@ def run_sql_for_tests(self, sql, fetch, conn): @available def get_model_settings(self, model): - settings = model['config'].get('settings', dict()) + settings = model['config'].get('settings', {}) + conn = self.connections.get_if_exists() + conn.handle.update_model_settings(settings) + res = [] + for key in settings: + res.append(f' {key}={settings[key]}') + return '' if len(res) == 0 else 'SETTINGS ' + ', '.join(res) + '\n' + + @available + def get_model_query_settings(self, model): + settings = model['config'].get('query_settings', {}) res = [] for key in settings: res.append(f' {key}={settings[key]}') diff --git a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql index 042a94e8..ca15991b 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql @@ -178,7 +178,7 @@ select {{ unique_key }} from {{ inserting_relation }} ) - {{ adapter.get_model_settings(model) }} + {{ adapter.get_model_query_settings(model) }} {% endcall %} -- Insert all of the new data into the temporary table @@ -186,7 +186,7 @@ insert into {{ inserted_relation }} ({{ dest_cols_csv }}) select {{ dest_cols_csv }} from {{ inserting_relation }} - {{ adapter.get_model_settings(model) }} + {{ adapter.get_model_query_settings(model) }} {% endcall %} {% do adapter.drop_relation(new_data_relation) %} @@ -228,13 +228,14 @@ {% for predicate in incremental_predicates %} and {{ predicate }} {% endfor %} - {%- endif -%}; + {%- endif -%} + {{ adapter.get_model_query_settings(model) }} {% endcall %} {%- set dest_columns = adapter.get_columns_in_relation(existing_relation) -%} {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} {% call statement('insert_new_data') %} - insert into {{ existing_relation }} select {{ dest_cols_csv }} from {{ inserting_relation }} + insert into {{ existing_relation }} {{ adapter.get_model_query_settings(model) }} select {{ dest_cols_csv }} from {{ inserting_relation }} {% endcall %} {% do adapter.drop_relation(new_data_relation) %} {{ drop_relation_if_exists(distributed_new_data_relation) }} diff --git a/dbt/include/clickhouse/macros/materializations/seed.sql b/dbt/include/clickhouse/macros/materializations/seed.sql index 120e6c48..f05a5ac4 100644 --- a/dbt/include/clickhouse/macros/materializations/seed.sql +++ b/dbt/include/clickhouse/macros/materializations/seed.sql @@ -4,7 +4,7 @@ {% set sql -%} insert into {{ this.render() }} ({{ cols_sql }}) - {{ adapter.get_model_settings(model) }} + {{ adapter.get_model_query_settings(model) }} format CSV {{ data_sql }} {%- endset %} diff --git a/dbt/include/clickhouse/macros/materializations/table.sql b/dbt/include/clickhouse/macros/materializations/table.sql index ca07cdbe..22537f5c 100644 --- a/dbt/include/clickhouse/macros/materializations/table.sql +++ b/dbt/include/clickhouse/macros/materializations/table.sql @@ -188,11 +188,13 @@ {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} - insert into {{ target_relation }} ({{ dest_cols_csv }}) + insert into {{ target_relation }} + ({{ dest_cols_csv }}) {%- if has_contract -%} -- Use a subquery to get columns in the right order SELECT {{ dest_cols_csv }} FROM ( {{ sql }} ) {%- else -%} {{ sql }} + {{ adapter.get_model_query_settings(model) }} {%- endif -%} {%- endmacro %} diff --git a/tests/integration/adapter/basic/test_basic.py b/tests/integration/adapter/basic/test_basic.py index e340b5e1..75936f0b 100644 --- a/tests/integration/adapter/basic/test_basic.py +++ b/tests/integration/adapter/basic/test_basic.py @@ -33,6 +33,8 @@ column_types: val2: Nullable(UInt32) str1: Nullable(String) + settings: + allow_nullable_key: 1 """ replicated_seeds_schema_yml = """ diff --git a/tests/integration/adapter/clickhouse/test_clickhouse_table_materializations.py b/tests/integration/adapter/clickhouse/test_clickhouse_table_materializations.py index 0c0f1bbb..c7f20e00 100644 --- a/tests/integration/adapter/clickhouse/test_clickhouse_table_materializations.py +++ b/tests/integration/adapter/clickhouse/test_clickhouse_table_materializations.py @@ -18,8 +18,13 @@ class TestMergeTreeTableMaterialization(BaseSimpleMaterializations): @pytest.fixture(scope="class") def models(self): config_materialized_table = """ - {{ config(order_by='(some_date, id, name)', engine='MergeTree()', materialized='table', - settings={'allow_nullable_key': 1}) }} + {{ config( + order_by='(some_date, id, name)', + engine='MergeTree()', + materialized='table', + settings={'allow_nullable_key': 1}, + query_settings={'allow_nondeterministic_mutations': 1}) + }} """ base_table_sql = config_materialized_table + model_base return { @@ -204,7 +209,7 @@ def assert_total_count_correct(self, project): os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster' ) def test_base(self, project): - # cluster setting must exists + # cluster setting must exist cluster = project.test_config['cluster'] assert cluster diff --git a/tests/integration/adapter/constraints/test_constraints.py b/tests/integration/adapter/constraints/test_constraints.py index 2fe35537..f18a7ca9 100644 --- a/tests/integration/adapter/constraints/test_constraints.py +++ b/tests/integration/adapter/constraints/test_constraints.py @@ -60,7 +60,7 @@ def test__contract_wrong_column_names(self, project): assert all([(exp in log_output or exp.upper() in log_output) for exp in expected]) def test__contract_wrong_column_data_types(self, project, data_types): - for (sql_column_value, schema_data_type, error_data_type) in data_types: + for sql_column_value, schema_data_type, error_data_type in data_types: # Write parametrized data_type to sql file write_file( my_model_data_type_sql.format(sql_value=sql_column_value), @@ -91,7 +91,7 @@ def test__contract_wrong_column_data_types(self, project, data_types): assert all([(exp in log_output or exp.upper() in log_output) for exp in expected]) def test__contract_correct_column_data_types(self, project, data_types): - for (sql_column_value, schema_data_type, _) in data_types: + for sql_column_value, schema_data_type, _ in data_types: # Write parametrized data_type to sql file write_file( my_model_data_type_sql.format(sql_value=sql_column_value), diff --git a/tests/integration/adapter/incremental/test_base_incremental.py b/tests/integration/adapter/incremental/test_base_incremental.py index aa9812aa..24635db5 100644 --- a/tests/integration/adapter/incremental/test_base_incremental.py +++ b/tests/integration/adapter/incremental/test_base_incremental.py @@ -33,7 +33,8 @@ materialized='incremental', engine='MergeTree()', order_by=['ts'], - unique_key=['impid'] + unique_key=['impid'], + settings={'allow_nullable_key':'1'} ) }} select ts, impid from unique_source_one @@ -57,25 +58,18 @@ def test_simple_incremental(self, project): run_dbt(["run", "--select", "unique_incremental_one"]) -lw_delete_schema = """ -version: 2 - -models: - - name: "lw_delete_inc" - description: "Incremental table" -""" - lw_delete_inc = """ {{ config( materialized='incremental', order_by=['key1'], unique_key='key1', - incremental_strategy='delete+insert' + incremental_strategy='delete+insert', + settings={'allow_nullable_key':1} ) }} {% if is_incremental() %} - WITH (SELECT max(key1) - 20 FROM lw_delete_inc) as old_max - SELECT assumeNotNull(toUInt64(number + old_max + 1)) as key1, toInt64(-(number + old_max)) as key2, toString(number + 30) as value FROM numbers(100) + select 2 as key1, 500 as key2, 'test' as value UNION ALL + select 102 as key1, 400 as key2, 'test2' as value {% else %} SELECT toUInt64(number) as key1, toInt64(-number) as key2, toString(number) as value FROM numbers(100) {% endif %} @@ -93,7 +87,45 @@ def test_lw_delete(self, project): assert result[0] == 100 run_dbt() result = project.run_sql("select count(*) as num_rows from lw_delete_inc", fetch="one") - assert result[0] == 180 + assert result[0] == 101 + run_dbt() + result = project.run_sql("select count(*) as num_rows from lw_delete_inc", fetch="one") + assert result[0] == 101 + + +legacy_inc = """ +{{ config( + materialized='incremental', + order_by=['key1'], + unique_key='key1', + incremental_strategy='legacy', + settings={'allow_nullable_key':1} + ) +}} +{% if is_incremental() %} + select 2 as key1, 500 as key2, 'test' as value UNION ALL + select 102 as key1, 400 as key2, 'test2' as value +{% else %} + SELECT toUInt64(number) as key1, toInt64(-number) as key2, toString(number) as value FROM numbers(100) +{% endif %} +""" + + +class TestLegacyIncremental: + @pytest.fixture(scope="class") + def models(self): + return {"legacy_inc.sql": legacy_inc} + + def test_legacy(self, project): + run_dbt() + result = project.run_sql("select count(*) as num_rows from legacy_inc", fetch="one") + assert result[0] == 100 + run_dbt() + result = project.run_sql("select count(*) as num_rows from legacy_inc", fetch="one") + assert result[0] == 101 + run_dbt() + result = project.run_sql("select count(*) as num_rows from legacy_inc", fetch="one") + assert result[0] == 101 compound_key_schema = """ @@ -133,6 +165,9 @@ def test_compound_key(self, project): run_dbt() result = project.run_sql("select count(*) as num_rows from compound_key_inc", fetch="one") assert result[0] == 180 + run_dbt() + result = project.run_sql("select count(*) as num_rows from compound_key_inc", fetch="one") + assert result[0] == 260 class TestInsertsOnlyIncrementalMaterialization(BaseIncremental):