From 236627f307afdbb1500cb118fc64f1c074d58a11 Mon Sep 17 00:00:00 2001 From: Can Bekleyici Date: Thu, 1 Aug 2024 12:05:10 +0200 Subject: [PATCH 01/10] add support for sync_all_columns --- dbt/adapters/clickhouse/column.py | 34 +++++++- dbt/adapters/clickhouse/errors.py | 18 ++--- dbt/adapters/clickhouse/impl.py | 42 +++++----- dbt/adapters/clickhouse/util.py | 6 -- .../incremental/distributed_incremental.sql | 12 +-- .../incremental/incremental.sql | 8 +- .../incremental/schema_changes.sql | 66 ++++++++++++++++ .../adapter/incremental/test_schema_change.py | 77 +++++++++++++++++++ 8 files changed, 213 insertions(+), 50 deletions(-) create mode 100644 dbt/include/clickhouse/macros/materializations/incremental/schema_changes.sql diff --git a/dbt/adapters/clickhouse/column.py b/dbt/adapters/clickhouse/column.py index 835b0f0d..3e25e7bb 100644 --- a/dbt/adapters/clickhouse/column.py +++ b/dbt/adapters/clickhouse/column.py @@ -1,6 +1,6 @@ import re -from dataclasses import dataclass -from typing import Any, TypeVar +from dataclasses import dataclass, field +from typing import Any, TypeVar, Literal, List from dbt.adapters.base.column import Column from dbt_common.exceptions import DbtRuntimeError @@ -134,3 +134,33 @@ def _inner_dtype(self, dtype) -> str: inner_dtype = null_match.group(1) return inner_dtype + + +@dataclass(frozen=True) +class ClickHouseColumnChanges: + on_schema_change: Literal['ignore', 'fail', 'append_new_columns', 'sync_all_columns'] + columns_to_add: List[Column] = field(default_factory=list) + columns_to_drop: List[Column] = field(default_factory=list) + columns_to_modify: List[Column] = field(default_factory=list) + + def __bool__(self) -> bool: + return bool(self.columns_to_add or self.columns_to_drop or self.columns_to_modify) + + @property + def has_schema_changes(self) -> bool: + return bool(self) + + @property + def has_sync_changes(self) -> bool: + return bool(self.columns_to_drop or self.columns_to_modify) + + @property + def has_conflicting_changes(self) -> bool: + if self.on_schema_change == 'fail' and self.has_schema_changes: + return True + + if self.on_schema_change != 'sync_all_columns' and self.has_sync_changes: + return True + + return False + diff --git a/dbt/adapters/clickhouse/errors.py b/dbt/adapters/clickhouse/errors.py index bfcd5f95..29a572f3 100644 --- a/dbt/adapters/clickhouse/errors.py +++ b/dbt/adapters/clickhouse/errors.py @@ -1,14 +1,14 @@ schema_change_fail_error = """ The source and target schemas on this incremental model are out of sync. - They can be reconciled in several ways: - - set the `on_schema_change` config to `append_new_columns`. (ClickHouse does not support `sync_all_columns`) - - Re-run the incremental model with `full_refresh: True` to update the target schema. - - update the schema manually and re-run the process. - - Additional troubleshooting context: - Source columns not in target: {0} - Target columns not in source: {1} - New column types: {2} +They can be reconciled in several ways: + - set the `on_schema_change` config to `append_new_columns` or `sync_all_columns`. + - Re-run the incremental model with `full_refresh: True` to update the target schema. + - update the schema manually and re-run the process. + +Additional troubleshooting context: + Source columns not in target: {0} + Target columns not in source: {1} + New column types: {2} """ schema_change_datatype_error = """ diff --git a/dbt/adapters/clickhouse/impl.py b/dbt/adapters/clickhouse/impl.py index 067c719a..6ec3d10c 100644 --- a/dbt/adapters/clickhouse/impl.py +++ b/dbt/adapters/clickhouse/impl.py @@ -29,7 +29,7 @@ from dbt_common.utils import filter_null_values from dbt.adapters.clickhouse.cache import ClickHouseRelationsCache -from dbt.adapters.clickhouse.column import ClickHouseColumn +from dbt.adapters.clickhouse.column import ClickHouseColumn, ClickHouseColumnChanges from dbt.adapters.clickhouse.connections import ClickHouseConnectionManager from dbt.adapters.clickhouse.errors import ( schema_change_datatype_error, @@ -39,7 +39,7 @@ from dbt.adapters.clickhouse.logger import logger from dbt.adapters.clickhouse.query import quote_identifier from dbt.adapters.clickhouse.relation import ClickHouseRelation, ClickHouseRelationType -from dbt.adapters.clickhouse.util import NewColumnDataType, compare_versions +from dbt.adapters.clickhouse.util import compare_versions if TYPE_CHECKING: import agate @@ -193,35 +193,33 @@ def calculate_incremental_strategy(self, strategy: str) -> str: @available.parse_none def check_incremental_schema_changes( self, on_schema_change, existing, target_sql - ) -> List[ClickHouseColumn]: - if on_schema_change not in ('fail', 'ignore', 'append_new_columns'): - raise DbtRuntimeError( - "Only `fail`, `ignore`, and `append_new_columns` supported for `on_schema_change`" - ) + ) -> ClickHouseColumnChanges: source = self.get_columns_in_relation(existing) source_map = {column.name: column for column in source} target = self.get_column_schema_from_query(target_sql) - target_map = {column.name: column for column in source} + target_map = {column.name: column for column in target} + source_not_in_target = [column for column in source if column.name not in target_map.keys()] target_not_in_source = [column for column in target if column.name not in source_map.keys()] - new_column_data_types = [] - for target_column in target: - source_column = source_map.get(target_column.name) - if source_column and source_column.dtype != target_column.dtype: - new_column_data_types.append( - NewColumnDataType(source_column.name, target_column.dtype) - ) - if new_column_data_types: - raise DbtRuntimeError(schema_change_datatype_error.format(new_column_data_types)) - if source_not_in_target: - raise DbtRuntimeError(schema_change_missing_source_error.format(source_not_in_target)) - if target_not_in_source and on_schema_change == 'fail': + target_in_source = [column for column in target if column.name in source_map.keys()] + changed_data_types = [column for column in target_in_source + if column.dtype != source_map.get(column.name).dtype] + + clickhouse_column_changes = ClickHouseColumnChanges( + columns_to_add=target_not_in_source, + columns_to_drop=source_not_in_target, + columns_to_modify=changed_data_types, + on_schema_change=on_schema_change, + ) + + if clickhouse_column_changes.has_conflicting_changes: raise DbtRuntimeError( schema_change_fail_error.format( - source_not_in_target, target_not_in_source, new_column_data_types + source_not_in_target, target_not_in_source, changed_data_types ) ) - return target_not_in_source + + return clickhouse_column_changes @available.parse_none def s3source_clause( diff --git a/dbt/adapters/clickhouse/util.py b/dbt/adapters/clickhouse/util.py index b730b9fd..9410ad7d 100644 --- a/dbt/adapters/clickhouse/util.py +++ b/dbt/adapters/clickhouse/util.py @@ -13,9 +13,3 @@ def compare_versions(v1: str, v2: str) -> int: except ValueError: raise DbtRuntimeError("Version must consist of only numbers separated by '.'") return 0 - - -@dataclass -class NewColumnDataType: - column_name: str - new_type: str diff --git a/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql index a8aba321..ef31a76c 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql @@ -76,18 +76,18 @@ {% else %} {% set incremental_strategy = adapter.calculate_incremental_strategy(config.get('incremental_strategy')) %} {% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %} - {% if on_schema_change != 'ignore' %} - {%- set schema_changes = check_for_schema_changes(existing_relation, target_relation) -%} - {% if schema_changes['schema_changed'] and incremental_strategy in ('append', 'delete_insert') %} - {% set incremental_strategy = 'legacy' %} - {% do log('Schema changes detected, switching to legacy incremental strategy') %} + {%- if on_schema_change != 'ignore' %} + {%- set local_column_changes = adapter.check_incremental_schema_changes(on_schema_change, existing_relation_local, sql) -%} + {% if local_column_changes and incremental_strategy != 'legacy' %} + {% do clickhouse__apply_column_changes(local_column_changes, existing_relation, True) %} + {% set existing_relation = load_cached_relation(this) %} {% endif %} {% endif %} {% if incremental_strategy != 'delete_insert' and incremental_predicates %} {% do exceptions.raise_compiler_error('Cannot apply incremental predicates with ' + incremental_strategy + ' strategy.') %} {% endif %} {% if incremental_strategy == 'legacy' %} - {% do clickhouse__incremental_legacy(existing_relation, intermediate_relation, schema_changes, unique_key, True) %} + {% do clickhouse__incremental_legacy(existing_relation, intermediate_relation, local_column_changes, unique_key, True) %} {% set need_swap = true %} {% elif incremental_strategy == 'delete_insert' %} {% do clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates, True) %} diff --git a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql index b5ac37ab..2ea51868 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql @@ -55,11 +55,9 @@ {% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %} {%- if on_schema_change != 'ignore' %} {%- set column_changes = adapter.check_incremental_schema_changes(on_schema_change, existing_relation, sql) -%} - {%- if column_changes %} - {%- if incremental_strategy in ('append', 'delete_insert') %} - {% set incremental_strategy = 'legacy' %} - {{ log('Schema changes detected, switching to legacy incremental strategy') }} - {%- endif %} + {% if column_changes and incremental_strategy != 'legacy' %} + {% do clickhouse__apply_column_changes(column_changes, existing_relation) %} + {% set existing_relation = load_cached_relation(this) %} {% endif %} {% endif %} {% if incremental_strategy != 'delete_insert' and incremental_predicates %} diff --git a/dbt/include/clickhouse/macros/materializations/incremental/schema_changes.sql b/dbt/include/clickhouse/macros/materializations/incremental/schema_changes.sql new file mode 100644 index 00000000..c8e982ff --- /dev/null +++ b/dbt/include/clickhouse/macros/materializations/incremental/schema_changes.sql @@ -0,0 +1,66 @@ +{% macro clickhouse__apply_column_changes(column_changes, existing_relation, is_distributed=False) %} + {{ log('Schema changes detected. Trying to apply the following changes: ' ~ column_changes) }} + {%- set existing_local = none -%} + {% if is_distributed %} + {%- set local_suffix = adapter.get_clickhouse_local_suffix() -%} + {%- set local_db_prefix = adapter.get_clickhouse_local_db_prefix() -%} + {%- set existing_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix, "schema": local_db_prefix + this.schema}) if existing_relation is not none else none -%} + {% endif %} + + {% if column_changes.on_schema_change == 'append_new_columns' %} + {% do clickhouse__add_columns(column_changes.columns_to_add, existing_relation, existing_local, is_distributed) %} + + {% elif column_changes.on_schema_change == 'sync_all_columns' %} + {% do clickhouse__drop_columns(column_changes.columns_to_drop, existing_relation, existing_local, is_distributed) %} + {% do clickhouse__add_columns(column_changes.columns_to_add, existing_relation, existing_local, is_distributed) %} + {% do clickhouse__modify_columns(column_changes.columns_to_modify, existing_relation, existing_local, is_distributed) %} + {% endif %} + +{% endmacro %} + +{% macro clickhouse__add_columns(columns, existing_relation, existing_local=none, is_distributed=False) %} + {% for column in columns %} + {% set alter_action -%} + add column if not exists `{{ column.name }}` {{ column.data_type }} + {%- endset %} + {% do clickhouse__run_alter_table_command(alter_action, existing_relation, existing_local, is_distributed) %} + {% endfor %} + +{% endmacro %} + +{% macro clickhouse__drop_columns(columns, existing_relation, existing_local=none, is_distributed=False) %} + {% for column in columns %} + {% set alter_action -%} + drop column if exists `{{ column.name }}` + {%- endset %} + {% do clickhouse__run_alter_table_command(alter_action, existing_relation, existing_local, is_distributed) %} + {% endfor %} + +{% endmacro %} + +{% macro clickhouse__modify_columns(columns, existing_relation, existing_local=none, is_distributed=False) %} + {% for column in columns %} + {% set alter_action -%} + modify column if exists `{{ column.name }}` {{ column.data_type }} + {%- endset %} + {% do clickhouse__run_alter_table_command(alter_action, existing_relation, existing_local, is_distributed) %} + {% endfor %} + +{% endmacro %} + +{% macro clickhouse__run_alter_table_command(alter_action, existing_relation, existing_local=none, is_distributed=False) %} + {% if is_distributed %} + {% call statement('alter_table') %} + alter table {{ existing_local }} {{ on_cluster_clause(existing_relation) }} {{ alter_action }} + {% endcall %} + {% call statement('alter_table') %} + alter table {{ existing_relation }} {{ on_cluster_clause(existing_relation) }} {{ alter_action }} + {% endcall %} + + {% else %} + {% call statement('alter_table') %} + alter table {{ existing_relation }} {{ alter_action }} + {% endcall %} + {% endif %} + +{% endmacro %} \ No newline at end of file diff --git a/tests/integration/adapter/incremental/test_schema_change.py b/tests/integration/adapter/incremental/test_schema_change.py index 9bccaf4e..7089ae5b 100644 --- a/tests/integration/adapter/incremental/test_schema_change.py +++ b/tests/integration/adapter/incremental/test_schema_change.py @@ -69,3 +69,80 @@ def test_append(self, project): result = project.run_sql("select * from schema_change_append order by col_1", fetch="all") assert result[0][2] == 0 assert result[3][2] == 5 + + +# contains dropped, added, and (type) changed columns +complex_schema_change_sql = """ +{{ + config( + materialized='incremental', + unique_key='col_1', + on_schema_change='%schema_change%' + ) +}} + +{% if not is_incremental() %} +select + toUInt8(number) as col_1, + number + 1 as col_2 +from numbers(3) +{% else %} +select + toFloat32(number) as col_1, + number + 2 as col_3 +from numbers(2, 3) +{% endif %} +""" + + +class TestComplexSchemaChange: + @pytest.fixture(scope="class") + def models(self): + return { + "complex_schema_change_fail.sql": complex_schema_change_sql.replace("%schema_change%", "fail"), + "complex_schema_change_append.sql": complex_schema_change_sql.replace("%schema_change%", + "append_new_columns"), + "complex_schema_change_sync.sql": complex_schema_change_sql.replace("%schema_change%", "sync_all_columns"), + } + + def test_fail(self, project): + run_dbt(["run", "--select", "complex_schema_change_fail"]) + result = project.run_sql("select * from complex_schema_change_fail order by col_1", fetch="all") + assert len(result) == 3 + assert result[0][1] == 1 + _, log_output = run_dbt_and_capture( + [ + "run", + "--select", + "complex_schema_change_fail", + ], + expect_pass=False, + ) + assert 'out of sync' in log_output.lower() + + def test_append(self, project): + run_dbt(["run", "--select", "complex_schema_change_append"]) + result = project.run_sql("select * from complex_schema_change_append order by col_1", fetch="all") + assert len(result) == 3 + assert result[0][1] == 1 + _, log_output = run_dbt_and_capture( + [ + "run", + "--select", + "complex_schema_change_append", + ], + expect_pass=False, + ) + assert 'out of sync' in log_output.lower() + + def test_sync(self, project): + run_dbt(["run", "--select", "complex_schema_change_sync"]) + result = project.run_sql("select * from complex_schema_change_sync order by col_1", fetch="all") + assert len(result) == 3 + assert result[0][1] == 1 + run_dbt(["run", "--select", "complex_schema_change_sync"]) + result = project.run_sql("select * from complex_schema_change_sync order by col_1", fetch="all") + result_types = project.run_sql("select toColumnTypeName(col_1) from complex_schema_change_sync", fetch="one") + assert result_types[0] == 'Float32' + assert result[0][1] == 0 + assert result[3][1] == 5 From 613324b83644594003803f5b112b516e97b866d1 Mon Sep 17 00:00:00 2001 From: Can Bekleyici Date: Fri, 2 Aug 2024 16:49:14 +0200 Subject: [PATCH 02/10] extend schema change test cases --- .../adapter/incremental/test_schema_change.py | 114 +++++++++--------- 1 file changed, 55 insertions(+), 59 deletions(-) diff --git a/tests/integration/adapter/incremental/test_schema_change.py b/tests/integration/adapter/incremental/test_schema_change.py index 7089ae5b..e0e269d8 100644 --- a/tests/integration/adapter/incremental/test_schema_change.py +++ b/tests/integration/adapter/incremental/test_schema_change.py @@ -4,24 +4,24 @@ schema_change_sql = """ {{ config( - materialized='incremental', + materialized='%s', unique_key='col_1', - on_schema_change='%schema_change%' + on_schema_change='%s' ) }} -{% if not is_incremental() %} +{%% if not is_incremental() %%} select number as col_1, number + 1 as col_2 from numbers(3) -{% else %} +{%% else %%} select number as col_1, number + 1 as col_2, number + 2 as col_3 from numbers(2, 3) -{% endif %} +{%% endif %%} """ @@ -29,69 +29,73 @@ class TestOnSchemaChange: @pytest.fixture(scope="class") def models(self): return { - "schema_change_ignore.sql": schema_change_sql.replace("%schema_change%", "ignore"), - "schema_change_fail.sql": schema_change_sql.replace("%schema_change%", "fail"), - "schema_change_append.sql": schema_change_sql.replace( - "%schema_change%", "append_new_columns" - ), + "schema_change_ignore.sql": schema_change_sql % ("incremental", "ignore"), + "schema_change_fail.sql": schema_change_sql % ("incremental", "fail"), + "schema_change_append.sql": schema_change_sql % ("incremental", "append_new_columns"), + "schema_change_distributed_ignore.sql": schema_change_sql % ("distributed_incremental", "ignore"), + "schema_change_distributed_fail.sql": schema_change_sql % ("distributed_incremental", "fail"), + "schema_change_distributed_append.sql": schema_change_sql % ("distributed_incremental", "append_new_columns"), } - def test_ignore(self, project): - run_dbt(["run", "--select", "schema_change_ignore"]) - result = project.run_sql("select * from schema_change_ignore order by col_1", fetch="all") + @pytest.mark.parametrize("model", ("schema_change_ignore", "schema_change_distributed_ignore")) + def test_ignore(self, project, model): + run_dbt(["run", "--select", model]) + result = project.run_sql(f"select * from {model} order by col_1", fetch="all") assert len(result) == 3 assert result[0][1] == 1 - run_dbt(["run", "--select", "schema_change_ignore"]) - result = project.run_sql("select * from schema_change_ignore", fetch="all") + run_dbt(["run", "--select", model]) + result = project.run_sql(f"select * from {model}", fetch="all") assert len(result) == 5 - def test_fail(self, project): - run_dbt(["run", "--select", "schema_change_fail"]) - result = project.run_sql("select * from schema_change_fail order by col_1", fetch="all") + @pytest.mark.parametrize("model", ("schema_change_fail", "schema_change_distributed_fail")) + def test_fail(self, project, model): + run_dbt(["run", "--select", model]) + result = project.run_sql(f"select * from {model} order by col_1", fetch="all") assert len(result) == 3 assert result[0][1] == 1 _, log_output = run_dbt_and_capture( [ "run", "--select", - "schema_change_fail", + model, ], expect_pass=False, ) assert 'out of sync' in log_output.lower() - def test_append(self, project): - run_dbt(["run", "--select", "schema_change_append"]) - result = project.run_sql("select * from schema_change_append order by col_1", fetch="all") + @pytest.mark.parametrize("model", ("schema_change_append", "schema_change_distributed_append")) + def test_append(self, project, model): + run_dbt(["run", "--select", model]) + result = project.run_sql(f"select * from {model} order by col_1", fetch="all") assert len(result) == 3 assert result[0][1] == 1 - run_dbt(["--debug", "run", "--select", "schema_change_append"]) - result = project.run_sql("select * from schema_change_append order by col_1", fetch="all") + run_dbt(["--debug", "run", "--select", model]) + result = project.run_sql(f"select * from {model} order by col_1", fetch="all") assert result[0][2] == 0 assert result[3][2] == 5 -# contains dropped, added, and (type) changed columns +# contains dropped, added, and changed (type) columns complex_schema_change_sql = """ {{ config( - materialized='incremental', + materialized='%s', unique_key='col_1', - on_schema_change='%schema_change%' + on_schema_change='%s' ) }} -{% if not is_incremental() %} +{%% if not is_incremental() %%} select toUInt8(number) as col_1, number + 1 as col_2 from numbers(3) -{% else %} +{%% else %%} select toFloat32(number) as col_1, number + 2 as col_3 from numbers(2, 3) -{% endif %} +{%% endif %%} """ @@ -99,50 +103,42 @@ class TestComplexSchemaChange: @pytest.fixture(scope="class") def models(self): return { - "complex_schema_change_fail.sql": complex_schema_change_sql.replace("%schema_change%", "fail"), - "complex_schema_change_append.sql": complex_schema_change_sql.replace("%schema_change%", - "append_new_columns"), - "complex_schema_change_sync.sql": complex_schema_change_sql.replace("%schema_change%", "sync_all_columns"), + "complex_schema_change_fail.sql": complex_schema_change_sql % ("incremental", "fail"), + "complex_schema_change_append.sql": complex_schema_change_sql % ("incremental", "append_new_columns"), + "complex_schema_change_sync.sql": complex_schema_change_sql % ("incremental", "sync_all_columns"), + "complex_schema_change_distributed_fail.sql": complex_schema_change_sql % ("distributed_incremental", "fail"), + "complex_schema_change_distributed_append.sql": complex_schema_change_sql % ("distributed_incremental", "append_new_columns"), + "complex_schema_change_distributed_sync.sql": complex_schema_change_sql % ("distributed_incremental", "sync_all_columns"), } - def test_fail(self, project): - run_dbt(["run", "--select", "complex_schema_change_fail"]) - result = project.run_sql("select * from complex_schema_change_fail order by col_1", fetch="all") + @pytest.mark.parametrize("model", ("complex_schema_change_fail", + "complex_schema_change_distributed_fail", + "complex_schema_change_append", + "complex_schema_change_distributed_append")) + def test_fail(self, project, model): + run_dbt(["run", "--select", model]) + result = project.run_sql(f"select * from {model} order by col_1", fetch="all") assert len(result) == 3 assert result[0][1] == 1 _, log_output = run_dbt_and_capture( [ "run", "--select", - "complex_schema_change_fail", + model, ], expect_pass=False, ) assert 'out of sync' in log_output.lower() - def test_append(self, project): - run_dbt(["run", "--select", "complex_schema_change_append"]) - result = project.run_sql("select * from complex_schema_change_append order by col_1", fetch="all") + @pytest.mark.parametrize("model", ("complex_schema_change_sync", "complex_schema_change_distributed_sync")) + def test_sync(self, project, model): + run_dbt(["run", "--select", model]) + result = project.run_sql(f"select * from {model} order by col_1", fetch="all") assert len(result) == 3 assert result[0][1] == 1 - _, log_output = run_dbt_and_capture( - [ - "run", - "--select", - "complex_schema_change_append", - ], - expect_pass=False, - ) - assert 'out of sync' in log_output.lower() - - def test_sync(self, project): - run_dbt(["run", "--select", "complex_schema_change_sync"]) - result = project.run_sql("select * from complex_schema_change_sync order by col_1", fetch="all") - assert len(result) == 3 - assert result[0][1] == 1 - run_dbt(["run", "--select", "complex_schema_change_sync"]) - result = project.run_sql("select * from complex_schema_change_sync order by col_1", fetch="all") - result_types = project.run_sql("select toColumnTypeName(col_1) from complex_schema_change_sync", fetch="one") + run_dbt(["run", "--select", model]) + result = project.run_sql(f"select * from {model} order by col_1", fetch="all") + result_types = project.run_sql(f"select toColumnTypeName(col_1) from {model}", fetch="one") assert result_types[0] == 'Float32' assert result[0][1] == 0 assert result[3][1] == 5 From e56b61379b27406ab62afeabd01f9609105d71b6 Mon Sep 17 00:00:00 2001 From: Can Bekleyici Date: Fri, 2 Aug 2024 17:09:06 +0200 Subject: [PATCH 03/10] add test case for out of order columns --- .../adapter/incremental/test_schema_change.py | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/tests/integration/adapter/incremental/test_schema_change.py b/tests/integration/adapter/incremental/test_schema_change.py index e0e269d8..6b17fe53 100644 --- a/tests/integration/adapter/incremental/test_schema_change.py +++ b/tests/integration/adapter/incremental/test_schema_change.py @@ -142,3 +142,45 @@ def test_sync(self, project, model): assert result_types[0] == 'Float32' assert result[0][1] == 0 assert result[3][1] == 5 + + +out_of_order_columns_sql = """ +{{ + config( + materialized='%s', + unique_key='col_1', + on_schema_change='fail' + ) +}} + +{%% if not is_incremental() %%} +select + number as col_1, + number + 1 as col_2 +from numbers(3) +{%% else %%} +select + number + 1 as col_2, + number as col_1 +from numbers(2, 3) +{%% endif %%} +""" + + +class TestReordering: + @pytest.fixture(scope="class") + def models(self): + return { + "out_of_order_columns.sql": out_of_order_columns_sql % "incremental", + "out_of_order_columns_distributed.sql": out_of_order_columns_sql % "distributed_incremental", + } + + @pytest.mark.parametrize("model", ("out_of_order_columns", "out_of_order_columns_distributed")) + def test_reordering(self, project, model): + run_dbt(["run", "--select", model]) + result = project.run_sql(f"select * from {model} order by col_1", fetch="all") + assert result[0][1] == 1 + run_dbt(["run", "--select", model]) + result = project.run_sql(f"select * from {model} order by col_1", fetch="all") + assert result[0][1] == 1 + assert result[3][1] == 4 From 40db8aa460ae82d0bcde85e103e2fc7f61b31c2e Mon Sep 17 00:00:00 2001 From: Can Bekleyici Date: Fri, 2 Aug 2024 17:13:08 +0200 Subject: [PATCH 04/10] fix columns out of order issue in distributed_incremental --- .../macros/materializations/distributed_table.sql | 12 +++++++----- .../materializations/incremental/incremental.sql | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/dbt/include/clickhouse/macros/materializations/distributed_table.sql b/dbt/include/clickhouse/macros/materializations/distributed_table.sql index 3181d40f..43b2d7ff 100644 --- a/dbt/include/clickhouse/macros/materializations/distributed_table.sql +++ b/dbt/include/clickhouse/macros/materializations/distributed_table.sql @@ -96,11 +96,13 @@ ) {% endmacro %} -{% macro create_empty_table_from_relation(relation, source_relation) -%} +{% macro create_empty_table_from_relation(relation, source_relation, sql=none) -%} {%- set sql_header = config.get('sql_header', none) -%} - {%- set columns = adapter.get_columns_in_relation(source_relation) | list -%} - - + {%- if sql -%} + {%- set columns = adapter.get_column_schema_from_query(sql) | list -%} + {%- else -%} + {%- set columns = adapter.get_columns_in_relation(source_relation) | list -%} + {%- endif -%} {%- set col_list = [] -%} {% for col in columns %} {{col_list.append(col.name + ' ' + col.data_type) or '' }} @@ -123,7 +125,7 @@ {{ drop_relation_if_exists(shard_relation) }} {{ drop_relation_if_exists(distributed_relation) }} {{ create_schema(shard_relation) }} - {% do run_query(create_empty_table_from_relation(shard_relation, structure_relation)) or '' %} + {% do run_query(create_empty_table_from_relation(shard_relation, structure_relation, sql_query)) or '' %} {% do run_query(create_distributed_table(distributed_relation, shard_relation)) or '' %} {% if sql_query is not none %} {% do run_query(clickhouse__insert_into(distributed_relation, sql_query)) or '' %} diff --git a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql index 2ea51868..b1fed230 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql @@ -191,6 +191,7 @@ + '__dbt_new_data_' + invocation_id.replace('-', '_')}) %} {{ drop_relation_if_exists(new_data_relation) }} {%- set distributed_new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_distributed_new_data'}) -%} + {{ drop_relation_if_exists(distributed_new_data_relation) }} {%- set inserting_relation = new_data_relation -%} @@ -230,5 +231,4 @@ insert into {{ existing_relation }} select {{ dest_cols_csv }} from {{ inserting_relation }} {{ adapter.get_model_query_settings(model) }} {% endcall %} {% do adapter.drop_relation(new_data_relation) %} - {{ drop_relation_if_exists(distributed_new_data_relation) }} {% endmacro %} From d699b469bb5f6fc157a5482363ec96c22bb83204 Mon Sep 17 00:00:00 2001 From: Can Bekleyici Date: Mon, 5 Aug 2024 20:51:12 +0200 Subject: [PATCH 05/10] added revision changes --- dbt/adapters/clickhouse/impl.py | 5 +++++ tests/integration/adapter/incremental/test_schema_change.py | 6 ++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/dbt/adapters/clickhouse/impl.py b/dbt/adapters/clickhouse/impl.py index 6ec3d10c..0859e67a 100644 --- a/dbt/adapters/clickhouse/impl.py +++ b/dbt/adapters/clickhouse/impl.py @@ -194,6 +194,11 @@ def calculate_incremental_strategy(self, strategy: str) -> str: def check_incremental_schema_changes( self, on_schema_change, existing, target_sql ) -> ClickHouseColumnChanges: + if on_schema_change not in ('fail', 'ignore', 'append_new_columns', 'sync_all_columns'): + raise DbtRuntimeError( + "Only `fail`, `ignore`, `append_new_columns`, and `sync_all_columns` supported for `on_schema_change`." + ) + source = self.get_columns_in_relation(existing) source_map = {column.name: column for column in source} target = self.get_column_schema_from_query(target_sql) diff --git a/tests/integration/adapter/incremental/test_schema_change.py b/tests/integration/adapter/incremental/test_schema_change.py index 6b17fe53..30d92225 100644 --- a/tests/integration/adapter/incremental/test_schema_change.py +++ b/tests/integration/adapter/incremental/test_schema_change.py @@ -1,5 +1,6 @@ import pytest from dbt.tests.util import run_dbt, run_dbt_and_capture +from functools import reduce schema_change_sql = """ {{ @@ -138,10 +139,11 @@ def test_sync(self, project, model): assert result[0][1] == 1 run_dbt(["run", "--select", model]) result = project.run_sql(f"select * from {model} order by col_1", fetch="all") - result_types = project.run_sql(f"select toColumnTypeName(col_1) from {model}", fetch="one") - assert result_types[0] == 'Float32' + assert all(len(row) == 2 for row in result) assert result[0][1] == 0 assert result[3][1] == 5 + result_types = project.run_sql(f"select toColumnTypeName(col_1) from {model}", fetch="one") + assert result_types[0] == 'Float32' out_of_order_columns_sql = """ From 1e544608741d7d58bb24646e08d2c4c69fccfbad Mon Sep 17 00:00:00 2001 From: Can Bekleyici Date: Mon, 5 Aug 2024 21:04:57 +0200 Subject: [PATCH 06/10] revert moving drop_replation macro call --- .../macros/materializations/incremental/incremental.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql index b1fed230..2ea51868 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql @@ -191,7 +191,6 @@ + '__dbt_new_data_' + invocation_id.replace('-', '_')}) %} {{ drop_relation_if_exists(new_data_relation) }} {%- set distributed_new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_distributed_new_data'}) -%} - {{ drop_relation_if_exists(distributed_new_data_relation) }} {%- set inserting_relation = new_data_relation -%} @@ -231,4 +230,5 @@ insert into {{ existing_relation }} select {{ dest_cols_csv }} from {{ inserting_relation }} {{ adapter.get_model_query_settings(model) }} {% endcall %} {% do adapter.drop_relation(new_data_relation) %} + {{ drop_relation_if_exists(distributed_new_data_relation) }} {% endmacro %} From f5b93a619ef3393046e79e7101b19dde1d8b5ae3 Mon Sep 17 00:00:00 2001 From: bentsileviav Date: Wed, 21 Aug 2024 11:13:32 +0300 Subject: [PATCH 07/10] fix lint --- dbt/adapters/clickhouse/column.py | 1 - dbt/adapters/clickhouse/impl.py | 7 ++- .../adapter/incremental/test_schema_change.py | 44 +++++++++++++------ 3 files changed, 35 insertions(+), 17 deletions(-) diff --git a/dbt/adapters/clickhouse/column.py b/dbt/adapters/clickhouse/column.py index 3e25e7bb..d3087574 100644 --- a/dbt/adapters/clickhouse/column.py +++ b/dbt/adapters/clickhouse/column.py @@ -163,4 +163,3 @@ def has_conflicting_changes(self) -> bool: return True return False - diff --git a/dbt/adapters/clickhouse/impl.py b/dbt/adapters/clickhouse/impl.py index bb829bb6..533a3439 100644 --- a/dbt/adapters/clickhouse/impl.py +++ b/dbt/adapters/clickhouse/impl.py @@ -207,8 +207,11 @@ def check_incremental_schema_changes( source_not_in_target = [column for column in source if column.name not in target_map.keys()] target_not_in_source = [column for column in target if column.name not in source_map.keys()] target_in_source = [column for column in target if column.name in source_map.keys()] - changed_data_types = [column for column in target_in_source - if column.dtype != source_map.get(column.name).dtype] + changed_data_types = [ + column + for column in target_in_source + if column.dtype != source_map.get(column.name).dtype + ] clickhouse_column_changes = ClickHouseColumnChanges( columns_to_add=target_not_in_source, diff --git a/tests/integration/adapter/incremental/test_schema_change.py b/tests/integration/adapter/incremental/test_schema_change.py index 30d92225..642e3496 100644 --- a/tests/integration/adapter/incremental/test_schema_change.py +++ b/tests/integration/adapter/incremental/test_schema_change.py @@ -33,9 +33,12 @@ def models(self): "schema_change_ignore.sql": schema_change_sql % ("incremental", "ignore"), "schema_change_fail.sql": schema_change_sql % ("incremental", "fail"), "schema_change_append.sql": schema_change_sql % ("incremental", "append_new_columns"), - "schema_change_distributed_ignore.sql": schema_change_sql % ("distributed_incremental", "ignore"), - "schema_change_distributed_fail.sql": schema_change_sql % ("distributed_incremental", "fail"), - "schema_change_distributed_append.sql": schema_change_sql % ("distributed_incremental", "append_new_columns"), + "schema_change_distributed_ignore.sql": schema_change_sql + % ("distributed_incremental", "ignore"), + "schema_change_distributed_fail.sql": schema_change_sql + % ("distributed_incremental", "fail"), + "schema_change_distributed_append.sql": schema_change_sql + % ("distributed_incremental", "append_new_columns"), } @pytest.mark.parametrize("model", ("schema_change_ignore", "schema_change_distributed_ignore")) @@ -105,17 +108,27 @@ class TestComplexSchemaChange: def models(self): return { "complex_schema_change_fail.sql": complex_schema_change_sql % ("incremental", "fail"), - "complex_schema_change_append.sql": complex_schema_change_sql % ("incremental", "append_new_columns"), - "complex_schema_change_sync.sql": complex_schema_change_sql % ("incremental", "sync_all_columns"), - "complex_schema_change_distributed_fail.sql": complex_schema_change_sql % ("distributed_incremental", "fail"), - "complex_schema_change_distributed_append.sql": complex_schema_change_sql % ("distributed_incremental", "append_new_columns"), - "complex_schema_change_distributed_sync.sql": complex_schema_change_sql % ("distributed_incremental", "sync_all_columns"), + "complex_schema_change_append.sql": complex_schema_change_sql + % ("incremental", "append_new_columns"), + "complex_schema_change_sync.sql": complex_schema_change_sql + % ("incremental", "sync_all_columns"), + "complex_schema_change_distributed_fail.sql": complex_schema_change_sql + % ("distributed_incremental", "fail"), + "complex_schema_change_distributed_append.sql": complex_schema_change_sql + % ("distributed_incremental", "append_new_columns"), + "complex_schema_change_distributed_sync.sql": complex_schema_change_sql + % ("distributed_incremental", "sync_all_columns"), } - @pytest.mark.parametrize("model", ("complex_schema_change_fail", - "complex_schema_change_distributed_fail", - "complex_schema_change_append", - "complex_schema_change_distributed_append")) + @pytest.mark.parametrize( + "model", + ( + "complex_schema_change_fail", + "complex_schema_change_distributed_fail", + "complex_schema_change_append", + "complex_schema_change_distributed_append", + ), + ) def test_fail(self, project, model): run_dbt(["run", "--select", model]) result = project.run_sql(f"select * from {model} order by col_1", fetch="all") @@ -131,7 +144,9 @@ def test_fail(self, project, model): ) assert 'out of sync' in log_output.lower() - @pytest.mark.parametrize("model", ("complex_schema_change_sync", "complex_schema_change_distributed_sync")) + @pytest.mark.parametrize( + "model", ("complex_schema_change_sync", "complex_schema_change_distributed_sync") + ) def test_sync(self, project, model): run_dbt(["run", "--select", model]) result = project.run_sql(f"select * from {model} order by col_1", fetch="all") @@ -174,7 +189,8 @@ class TestReordering: def models(self): return { "out_of_order_columns.sql": out_of_order_columns_sql % "incremental", - "out_of_order_columns_distributed.sql": out_of_order_columns_sql % "distributed_incremental", + "out_of_order_columns_distributed.sql": out_of_order_columns_sql + % "distributed_incremental", } @pytest.mark.parametrize("model", ("out_of_order_columns", "out_of_order_columns_distributed")) From 3a705d0673697ddd9a760f53a2b63d163ba4953e Mon Sep 17 00:00:00 2001 From: bentsileviav Date: Wed, 21 Aug 2024 11:14:10 +0300 Subject: [PATCH 08/10] fix imports order --- dbt/adapters/clickhouse/column.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/clickhouse/column.py b/dbt/adapters/clickhouse/column.py index d3087574..c90a22ec 100644 --- a/dbt/adapters/clickhouse/column.py +++ b/dbt/adapters/clickhouse/column.py @@ -1,6 +1,6 @@ import re from dataclasses import dataclass, field -from typing import Any, TypeVar, Literal, List +from typing import Any, List, Literal, TypeVar from dbt.adapters.base.column import Column from dbt_common.exceptions import DbtRuntimeError From dac0d69ecc91fb3d12e4e48ff95226440273dfdf Mon Sep 17 00:00:00 2001 From: bentsileviav Date: Wed, 21 Aug 2024 11:15:58 +0300 Subject: [PATCH 09/10] fix imports order --- tests/integration/adapter/incremental/test_schema_change.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/integration/adapter/incremental/test_schema_change.py b/tests/integration/adapter/incremental/test_schema_change.py index 642e3496..93ca1828 100644 --- a/tests/integration/adapter/incremental/test_schema_change.py +++ b/tests/integration/adapter/incremental/test_schema_change.py @@ -1,6 +1,7 @@ +from functools import reduce + import pytest from dbt.tests.util import run_dbt, run_dbt_and_capture -from functools import reduce schema_change_sql = """ {{ From 05e43f021e9b993dcb8cdf598632d7dfe0d592d4 Mon Sep 17 00:00:00 2001 From: bentsileviav Date: Wed, 21 Aug 2024 11:54:04 +0300 Subject: [PATCH 10/10] remove list comprehension to handle None in source_map.get() to prevent mypy error --- dbt/adapters/clickhouse/impl.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dbt/adapters/clickhouse/impl.py b/dbt/adapters/clickhouse/impl.py index 533a3439..bd65ad57 100644 --- a/dbt/adapters/clickhouse/impl.py +++ b/dbt/adapters/clickhouse/impl.py @@ -207,11 +207,11 @@ def check_incremental_schema_changes( source_not_in_target = [column for column in source if column.name not in target_map.keys()] target_not_in_source = [column for column in target if column.name not in source_map.keys()] target_in_source = [column for column in target if column.name in source_map.keys()] - changed_data_types = [ - column - for column in target_in_source - if column.dtype != source_map.get(column.name).dtype - ] + changed_data_types = [] + for column in target_in_source: + source_column = source_map.get(column.name) + if source_column is not None and column.dtype != source_column.dtype: + changed_data_types.append(column) clickhouse_column_changes = ClickHouseColumnChanges( columns_to_add=target_not_in_source,