diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java index 7540ab490fb8..777463467454 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java @@ -395,20 +395,24 @@ String cdcDeletes(final StreamConfig stream, return ""; } + final String pkList = stream.primaryKey().stream().map(columnId -> columnId.name(QUOTE)).collect(joining(",")); + String pkCasts = stream.primaryKey().stream().map(pk -> extractAndCast(pk, streamColumns.get(pk))).collect(joining(",\n")); + // we want to grab IDs for deletion from the raw table (not the final table itself) to hand out-of-order record insertions after the delete has been registered return new StringSubstitutor(Map.of( "final_table_id", stream.id().finalTableId(finalSuffix, QUOTE), "raw_table_id", stream.id().rawTableId(QUOTE), + "pk_list", pkList, + "pk_extracts", pkCasts, "quoted_cdc_delete_column", QUOTE + "_ab_cdc_deleted_at" + QUOTE) ).replace( - // TODO replace `id`, `$.id` with PK - // TODO replace `INT64` with PK's type """ - DELETE FROM ${final_table_id} - WHERE - `id` IN ( - SELECT - SAFE_CAST(JSON_VALUE(`_airbyte_data`, '$.id') as INT64) as id + DELETE FROM ${final_table_id} + WHERE + (${pk_list}) IN ( + SELECT ( + ${pk_extracts} + ) FROM ${raw_table_id} WHERE JSON_VALUE(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NOT NULL diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java index 24f78cd56942..947c7da32f65 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java @@ -10,8 +10,19 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.auth.oauth2.GoogleCredentials; -import com.google.cloud.bigquery.*; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.Field.Mode; +import com.google.cloud.bigquery.FieldValue; +import com.google.cloud.bigquery.FieldValueList; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableResult; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; @@ -30,8 +41,18 @@ import java.nio.file.Path; import java.time.Duration; import java.time.Instant; -import java.util.*; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.commons.text.StringSubstitutor; @@ -656,7 +677,7 @@ public void testCdcUpdate() throws InterruptedException { INSERT INTO ${dataset}.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`, `_airbyte_loaded_at`) VALUES (JSON'{"id": 1, "_ab_cdc_lsn": 900, "string": "spooky ghost", "_ab_cdc_deleted_at": null}', '64f4390f-3da1-4b65-b64a-a6c67497f18d', '2022-12-31T00:00:00Z', '2022-12-31T00:00:01Z'), (JSON'{"id": 0, "_ab_cdc_lsn": 901, "string": "zombie", "_ab_cdc_deleted_at": "2022-12-31T00:O0:00Z"}', generate_uuid(), '2022-12-31T00:00:00Z', '2022-12-31T00:00:01Z'), - (JSON'{"id": 5, "_ab_cdc_lsn": 902, "string": "will be deleted", "_ab_cdc_deleted_at": null}', 'b6139181-a42c-45c3-89f2-c4b4bb3a8c9d', '2022-12-31T00:00:00Z', '2022-12-31T00:00:01Z'); + (JSON'{"id": 5, "_ab_cdc_lsn": 902, "string": "will not be deleted", "_ab_cdc_deleted_at": null}', 'b6139181-a42c-45c3-89f2-c4b4bb3a8c9d', '2022-12-31T00:00:00Z', '2022-12-31T00:00:01Z'); INSERT INTO ${dataset}.users_final (_airbyte_raw_id, _airbyte_extracted_at, _airbyte_meta, `id`, `_ab_cdc_lsn`, `string`, `struct`, `integer`) values ('64f4390f-3da1-4b65-b64a-a6c67497f18d', '2022-12-31T00:00:00Z', JSON'{}', 1, 900, 'spooky ghost', NULL, NULL), ('b6139181-a42c-45c3-89f2-c4b4bb3a8c9d', '2022-12-31T00:00:00Z', JSON'{}', 5, 901, 'will be deleted', NULL, NULL);