diff --git a/airbyte-integrations/bases/base-normalization/Dockerfile b/airbyte-integrations/bases/base-normalization/Dockerfile index e8ee2ddd0354..fa8cb782c98b 100644 --- a/airbyte-integrations/bases/base-normalization/Dockerfile +++ b/airbyte-integrations/bases/base-normalization/Dockerfile @@ -1,4 +1,4 @@ -FROM fishtownanalytics/dbt:1.0.0 +FROM fishtownanalytics/dbt:1.0.0-dev COPY --from=airbyte/base-airbyte-protocol-python:0.1.1 /airbyte /airbyte # Install SSH Tunneling dependencies diff --git a/airbyte-integrations/bases/base-normalization/build.gradle b/airbyte-integrations/bases/base-normalization/build.gradle index d1d2d4c7c7b0..eb0c90b98263 100644 --- a/airbyte-integrations/bases/base-normalization/build.gradle +++ b/airbyte-integrations/bases/base-normalization/build.gradle @@ -58,14 +58,30 @@ def buildAirbyteDocker(String customConnector) { arch = 'linux/arm64' } - def baseCommand = ['docker', 'buildx', 'build', '--load', '--platform', arch, '-f', getDockerfile(customConnector), '-t', getImageNameWithTag(customConnector), '.'] - // println("Building normalization container: " + baseCommand.join(" ")) + def cmdArray = ['docker', 'buildx', 'build', '--load', '--platform', arch, '-f', getDockerfile(customConnector), '-t', getImageNameWithTag(customConnector), '.'] + // println("Building normalization container: " + cmdArray.join(" ")) return { - commandLine baseCommand + commandLine cmdArray } } +task rebuildDbtForLocalArch(type: Exec) { + // we need to rebuild the dbt base image for this system's architecture + + def arch = 'linux/amd64' + if (Os.isArch("aarch_64") || Os.isArch("aarch64")) { + arch = 'linux/arm64' + } + + def cmdArray = ['docker', 'buildx', 'build', '--load', '--platform', arch, '-f', 'dbt.Dockerfile', '-t', 'fishtownanalytics/dbt:1.0.0-dev', '.'] + println("Rebuilding base normalization container: " + cmdArray.join(" ")) + + commandLine cmdArray +} + +airbyteDocker.dependsOn rebuildDbtForLocalArch + task airbyteDockerMSSql(type: Exec, dependsOn: checkSshScriptCopy) { configure buildAirbyteDocker('mssql') dependsOn assemble diff --git a/airbyte-integrations/bases/base-normalization/dbt.Dockerfile b/airbyte-integrations/bases/base-normalization/dbt.Dockerfile new file mode 100644 index 000000000000..09b0e3c94064 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/dbt.Dockerfile @@ -0,0 +1,3 @@ +# This dockerfile only exists to pull and re-export this image converted to the local arch of this machine +# It is then consumed by the Dockerfile in this direcotry as "fishtownanalytics/dbt:1.0.0-dev" +FROM fishtownanalytics/dbt:1.0.0 \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java index 71cf07ac49fa..ec4d3b73436f 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java @@ -224,11 +224,13 @@ public String updateTable(final String finalSuffix, final StreamConfig stream) { } final String insertNewRecords = insertNewRecords(stream.id(), finalSuffix, stream.columns()); String dedupFinalTable = ""; + String cdcDeletes = ""; String dedupRawTable = ""; if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) { - dedupRawTable = dedupRawTable(stream.id(), finalSuffix, stream.columns()); + dedupRawTable = dedupRawTable(stream.id(), finalSuffix); // If we're in dedup mode, then we must have a cursor - dedupFinalTable = dedupFinalTable(stream.id(), finalSuffix, stream.primaryKey(), stream.cursor().get(), stream.columns()); + dedupFinalTable = dedupFinalTable(stream.id(), finalSuffix, stream.primaryKey(), stream.cursor().get()); + cdcDeletes = cdcDeletes(stream, finalSuffix, stream.columns()); } final String commitRawTable = commitRawTable(stream.id()); @@ -236,12 +238,14 @@ public String updateTable(final String finalSuffix, final StreamConfig stream) { "validate_primary_keys", validatePrimaryKeys, "insert_new_records", insertNewRecords, "dedup_final_table", dedupFinalTable, + "cdc_deletes", cdcDeletes, "dedupe_raw_table", dedupRawTable, "commit_raw_table", commitRawTable)).replace( """ DECLARE missing_pk_count INT64; BEGIN TRANSACTION; + ${validate_primary_keys} ${insert_new_records} @@ -249,6 +253,8 @@ public String updateTable(final String finalSuffix, final StreamConfig stream) { ${dedup_final_table} ${dedupe_raw_table} + + ${cdc_deletes} ${commit_raw_table} @@ -280,7 +286,8 @@ SELECT COUNT(1) IF missing_pk_count > 0 THEN RAISE USING message = FORMAT("Raw table has %s rows missing a primary key", CAST(missing_pk_count AS STRING)); - END IF;"""); + END IF + ;"""); } @VisibleForTesting @@ -304,68 +311,64 @@ String insertNewRecords(final StreamId id, final String finalSuffix, final Linke .collect(joining(",\n")); final String columnList = streamColumns.keySet().stream().map(quotedColumnId -> quotedColumnId.name(QUOTE) + ",").collect(joining("\n")); - // Note that we intentionally excluded deleted records from this insert. See dedupRawRecords for an - // explanation of how CDC deletes work. + String cdcConditionalOrIncludeStatement = ""; + if (streamColumns.containsKey(CDC_DELETED_AT_COLUMN)){ + cdcConditionalOrIncludeStatement = """ + OR ( + _airbyte_loaded_at IS NOT NULL + AND JSON_VALUE(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NOT NULL + ) + """; + } + return new StringSubstitutor(Map.of( "raw_table_id", id.rawTableId(QUOTE), "final_table_id", id.finalTableId(finalSuffix, QUOTE), "column_casts", columnCasts, "column_errors", columnErrors, + "cdcConditionalOrIncludeStatement", cdcConditionalOrIncludeStatement, "column_list", columnList)).replace( """ - INSERT INTO ${final_table_id} - ( - ${column_list} - _airbyte_meta, - _airbyte_raw_id, - _airbyte_extracted_at - ) - WITH intermediate_data AS ( - SELECT - ${column_casts} - array_concat( - ${column_errors} - ) as _airbyte_cast_errors, - _airbyte_raw_id, - _airbyte_extracted_at - FROM ${raw_table_id} - WHERE - _airbyte_loaded_at IS NULL - AND ( - JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NULL - OR JSON_TYPE(JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at')) = 'null' + INSERT INTO ${final_table_id} + ( + ${column_list} + _airbyte_meta, + _airbyte_raw_id, + _airbyte_extracted_at ) - ) - SELECT - ${column_list} - to_json(struct(_airbyte_cast_errors AS errors)) AS _airbyte_meta, - _airbyte_raw_id, - _airbyte_extracted_at - FROM intermediate_data;"""); + WITH intermediate_data AS ( + SELECT + ${column_casts} + array_concat( + ${column_errors} + ) as _airbyte_cast_errors, + _airbyte_raw_id, + _airbyte_extracted_at + FROM ${raw_table_id} + WHERE + _airbyte_loaded_at IS NULL + ${cdcConditionalOrIncludeStatement} + ) + SELECT + ${column_list} + to_json(struct(_airbyte_cast_errors AS errors)) AS _airbyte_meta, + _airbyte_raw_id, + _airbyte_extracted_at + FROM intermediate_data;"""); } @VisibleForTesting String dedupFinalTable(final StreamId id, final String finalSuffix, final List primaryKey, - final ColumnId cursor, - final LinkedHashMap streamColumns) { + final ColumnId cursor) { final String pkList = primaryKey.stream().map(columnId -> columnId.name(QUOTE)).collect(joining(",")); - final String pkCastList = streamColumns.entrySet().stream() - .filter(e -> primaryKey.contains(e.getKey())) - .map(e -> extractAndCast(e.getKey(), e.getValue())) - .collect(joining(",\n ")); - final String cursorCast = extractAndCast(cursor, streamColumns.get(cursor)); - - // See dedupRawTable for an explanation of why we delete records using the raw data rather than the - // final table's _ab_cdc_deleted_at column. + return new StringSubstitutor(Map.of( - "raw_table_id", id.rawTableId(QUOTE), "final_table_id", id.finalTableId(finalSuffix, QUOTE), "pk_list", pkList, - "pk_cast_list", pkCastList, - "cursor_name", cursor.name(QUOTE), - "cursor_cast", cursorCast)).replace( + "cursor_name", cursor.name(QUOTE)) + ).replace( """ DELETE FROM ${final_table_id} WHERE @@ -376,52 +379,54 @@ String dedupFinalTable(final StreamId id, ) as row_number FROM ${final_table_id} ) WHERE row_number != 1 - ); - - DELETE FROM ${final_table_id} - WHERE - ${pk_list} IN ( - SELECT ( - ${pk_cast_list} - ) - FROM ${raw_table_id} - WHERE - JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NOT NULL - AND JSON_TYPE(JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at')) != 'null' - AND ${cursor_name} < ${cursor_cast} - );"""); + ) + ;"""); } @VisibleForTesting - String dedupRawTable(final StreamId id, final String finalSuffix, LinkedHashMap streamColumns) { - /* - * Note that we need to keep the deletion raw records because of how async syncs work. Consider this - * sequence of source events: 1. Insert record id=1 2. Update record id=1 3. Delete record id=1 - * - * It's possible for the destination to receive them out of order, e.g.: 1. Insert 2. Delete 3. - * Update - * - * We can generally resolve this using the cursor column (e.g. multiple updates in the wrong order). - * However, deletions are special because we propagate them as hard deletes to the final table. As a - * result, we need to keep the deletion in the raw table, so that a late-arriving update doesn't - * incorrectly reinsert the final record. - */ - String cdcDeletedAtClause; - if (streamColumns.containsKey(CDC_DELETED_AT_COLUMN)) { - cdcDeletedAtClause = """ - AND ( - JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NULL - OR JSON_TYPE(JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at')) = 'null' - ) - """; - } else { - cdcDeletedAtClause = ""; + String cdcDeletes(final StreamConfig stream, + final String finalSuffix, + final LinkedHashMap streamColumns) { + + if (stream.destinationSyncMode() != DestinationSyncMode.APPEND_DEDUP){ + return ""; + } + + if (!streamColumns.containsKey(CDC_DELETED_AT_COLUMN)){ + return ""; } + final String pkList = stream.primaryKey().stream().map(columnId -> columnId.name(QUOTE)).collect(joining(",")); + String pkCasts = stream.primaryKey().stream().map(pk -> extractAndCast(pk, streamColumns.get(pk))).collect(joining(",\n")); + + // we want to grab IDs for deletion from the raw table (not the final table itself) to hand out-of-order record insertions after the delete has been registered + return new StringSubstitutor(Map.of( + "final_table_id", stream.id().finalTableId(finalSuffix, QUOTE), + "raw_table_id", stream.id().rawTableId(QUOTE), + "pk_list", pkList, + "pk_extracts", pkCasts, + "quoted_cdc_delete_column", QUOTE + "_ab_cdc_deleted_at" + QUOTE) + ).replace( + """ + DELETE FROM ${final_table_id} + WHERE + (${pk_list}) IN ( + SELECT ( + ${pk_extracts} + ) + FROM ${raw_table_id} + WHERE + JSON_VALUE(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NOT NULL + ) + ;""" + ); + } + + @VisibleForTesting + String dedupRawTable(final StreamId id, final String finalSuffix) { return new StringSubstitutor(Map.of( "raw_table_id", id.rawTableId(QUOTE), - "final_table_id", id.finalTableId(finalSuffix, QUOTE), - "cdc_deleted_at_clause", cdcDeletedAtClause)).replace( + "final_table_id", id.finalTableId(finalSuffix, QUOTE))).replace( // Note that this leaves _all_ deletion records in the raw table. We _could_ clear them out, but it // would be painful, // and it only matters in a few edge cases. @@ -432,7 +437,6 @@ OR JSON_TYPE(JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at')) = 'null' `_airbyte_raw_id` NOT IN ( SELECT `_airbyte_raw_id` FROM ${final_table_id} ) - ${cdc_deleted_at_clause} ;"""); } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java index 3be1cecb0875..947c7da32f65 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java @@ -10,8 +10,19 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.auth.oauth2.GoogleCredentials; -import com.google.cloud.bigquery.*; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.Field.Mode; +import com.google.cloud.bigquery.FieldValue; +import com.google.cloud.bigquery.FieldValueList; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableResult; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; @@ -30,8 +41,18 @@ import java.nio.file.Path; import java.time.Duration; import java.time.Instant; -import java.util.*; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.commons.text.StringSubstitutor; @@ -291,7 +312,7 @@ public void testDedupFinalTable() throws InterruptedException { """)) .build()); - final String sql = GENERATOR.dedupFinalTable(streamId, "", PRIMARY_KEY, CURSOR, COLUMNS); + final String sql = GENERATOR.dedupFinalTable(streamId, "", PRIMARY_KEY, CURSOR); logAndExecute(sql); final TableResult result = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.finalTableId(QUOTE)).build()); @@ -343,7 +364,7 @@ public void testDedupRawTable() throws InterruptedException { """)) .build()); - final String sql = GENERATOR.dedupRawTable(streamId, "", CDC_COLUMNS); + final String sql = GENERATOR.dedupRawTable(streamId, ""); logAndExecute(sql); final TableResult result = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.rawTableId(QUOTE)).build()); @@ -618,6 +639,32 @@ public void testRenameFinalTable() throws InterruptedException { assertNotNull(table); } + @Test + public void testCdcBasics() throws InterruptedException { + createRawTable(); + createFinalTableCdc(); + bq.query(QueryJobConfiguration.newBuilder( + new StringSubstitutor(Map.of( + "dataset", testDataset)).replace( + """ + INSERT INTO ${dataset}.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`, `_airbyte_loaded_at`) VALUES + (JSON'{"id": 1, "_ab_cdc_lsn": 10001, "_ab_cdc_deleted_at": "2023-01-01T00:01:00Z"}', generate_uuid(), '2023-01-01T00:00:00Z', NULL); + """)) + .build()); + + final String sql = GENERATOR.updateTable("", cdcStreamConfig()); + logAndExecute(sql); + + // TODO better asserts + final long finalRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.finalTableId("", QUOTE)).build()).getTotalRows(); + assertEquals(0, finalRows); + final long rawRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.rawTableId(QUOTE)).build()).getTotalRows(); + assertEquals(1, rawRows); + final long rawUntypedRows = bq.query(QueryJobConfiguration.newBuilder( + "SELECT * FROM " + streamId.rawTableId(QUOTE) + " WHERE _airbyte_loaded_at IS NULL").build()).getTotalRows(); + assertEquals(0, rawUntypedRows); + } + @Test public void testCdcUpdate() throws InterruptedException { createRawTable(); @@ -630,7 +677,7 @@ public void testCdcUpdate() throws InterruptedException { INSERT INTO ${dataset}.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`, `_airbyte_loaded_at`) VALUES (JSON'{"id": 1, "_ab_cdc_lsn": 900, "string": "spooky ghost", "_ab_cdc_deleted_at": null}', '64f4390f-3da1-4b65-b64a-a6c67497f18d', '2022-12-31T00:00:00Z', '2022-12-31T00:00:01Z'), (JSON'{"id": 0, "_ab_cdc_lsn": 901, "string": "zombie", "_ab_cdc_deleted_at": "2022-12-31T00:O0:00Z"}', generate_uuid(), '2022-12-31T00:00:00Z', '2022-12-31T00:00:01Z'), - (JSON'{"id": 5, "_ab_cdc_lsn": 902, "string": "will be deleted", "_ab_cdc_deleted_at": null}', 'b6139181-a42c-45c3-89f2-c4b4bb3a8c9d', '2022-12-31T00:00:00Z', '2022-12-31T00:00:01Z'); + (JSON'{"id": 5, "_ab_cdc_lsn": 902, "string": "will not be deleted", "_ab_cdc_deleted_at": null}', 'b6139181-a42c-45c3-89f2-c4b4bb3a8c9d', '2022-12-31T00:00:00Z', '2022-12-31T00:00:01Z'); INSERT INTO ${dataset}.users_final (_airbyte_raw_id, _airbyte_extracted_at, _airbyte_meta, `id`, `_ab_cdc_lsn`, `string`, `struct`, `integer`) values ('64f4390f-3da1-4b65-b64a-a6c67497f18d', '2022-12-31T00:00:00Z', JSON'{}', 1, 900, 'spooky ghost', NULL, NULL), ('b6139181-a42c-45c3-89f2-c4b4bb3a8c9d', '2022-12-31T00:00:00Z', JSON'{}', 5, 901, 'will be deleted', NULL, NULL); @@ -652,18 +699,10 @@ public void testCdcUpdate() throws InterruptedException { final String sql = GENERATOR.updateTable("", cdcStreamConfig()); logAndExecute(sql); - // TODO final long finalRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.finalTableId("", QUOTE)).build()).getTotalRows(); - assertEquals(4, finalRows); + assertEquals(5, finalRows); final long rawRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.rawTableId(QUOTE)).build()).getTotalRows(); - // Explanation: - // id=0 has two raw records (the old deletion record + zombie_returned) - // id=1 has one raw record (the new deletion record; the old raw record was deleted) - // id=2 has one raw record (the newer alice2 record) - // id=3 has one raw record - // id=4 has one raw record - // id=5 has one raw deletion record - assertEquals(7, rawRows); + assertEquals(6, rawRows); // we only keep the newest raw record for reach PK final long rawUntypedRows = bq.query(QueryJobConfiguration.newBuilder( "SELECT * FROM " + streamId.rawTableId(QUOTE) + " WHERE _airbyte_loaded_at IS NULL").build()).getTotalRows(); assertEquals(0, rawUntypedRows); @@ -704,7 +743,6 @@ -- insert raw record from the second record batch - this is an outdated record t final String sql = GENERATOR.updateTable("", cdcStreamConfig()); logAndExecute(sql); - // TODO better asserts final long finalRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.finalTableId("", QUOTE)).build()).getTotalRows(); assertEquals(0, finalRows); final long rawRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.rawTableId(QUOTE)).build()).getTotalRows(); @@ -755,7 +793,7 @@ -- second record batch final long finalRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.finalTableId("", QUOTE)).build()).getTotalRows(); assertEquals(1, finalRows); final long rawRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.rawTableId(QUOTE)).build()).getTotalRows(); - assertEquals(2, rawRows); + assertEquals(1, rawRows); final long rawUntypedRows = bq.query(QueryJobConfiguration.newBuilder( "SELECT * FROM " + streamId.rawTableId(QUOTE) + " WHERE _airbyte_loaded_at IS NULL").build()).getTotalRows(); assertEquals(0, rawUntypedRows);