Skip to content

Commit

Permalink
Simplify and speed up CDC delete support [DestinationsV2] (#28029)
Browse files Browse the repository at this point in the history
* Simplify and speed up CDC delete support [DestinationsV2]

* better QUOTE

* spotbugs?

* recompile dbt image for local arch and use that when building images

* things compile, but tests fail

* tests working-ish

* comment

* fix logic to re-insert deleted records for cursor comparison.

tests pass!

* remove comment

* Skip CDC re-include logic if there are no CDC columns

* stop hardcoding pk (#28092)

* wip

* remove TODOs

---------

Co-authored-by: Edward Gao <edward.gao@airbyte.io>
  • Loading branch information
evantahler and edgao authored Jul 11, 2023
1 parent 2dc7d77 commit 7efc294
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 108 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM fishtownanalytics/dbt:1.0.0
FROM fishtownanalytics/dbt:1.0.0-dev
COPY --from=airbyte/base-airbyte-protocol-python:0.1.1 /airbyte /airbyte

# Install SSH Tunneling dependencies
Expand Down
22 changes: 19 additions & 3 deletions airbyte-integrations/bases/base-normalization/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,30 @@ def buildAirbyteDocker(String customConnector) {
arch = 'linux/arm64'
}

def baseCommand = ['docker', 'buildx', 'build', '--load', '--platform', arch, '-f', getDockerfile(customConnector), '-t', getImageNameWithTag(customConnector), '.']
// println("Building normalization container: " + baseCommand.join(" "))
def cmdArray = ['docker', 'buildx', 'build', '--load', '--platform', arch, '-f', getDockerfile(customConnector), '-t', getImageNameWithTag(customConnector), '.']
// println("Building normalization container: " + cmdArray.join(" "))

return {
commandLine baseCommand
commandLine cmdArray
}
}

task rebuildDbtForLocalArch(type: Exec) {
// we need to rebuild the dbt base image for this system's architecture

def arch = 'linux/amd64'
if (Os.isArch("aarch_64") || Os.isArch("aarch64")) {
arch = 'linux/arm64'
}

def cmdArray = ['docker', 'buildx', 'build', '--load', '--platform', arch, '-f', 'dbt.Dockerfile', '-t', 'fishtownanalytics/dbt:1.0.0-dev', '.']
println("Rebuilding base normalization container: " + cmdArray.join(" "))

commandLine cmdArray
}

airbyteDocker.dependsOn rebuildDbtForLocalArch

task airbyteDockerMSSql(type: Exec, dependsOn: checkSshScriptCopy) {
configure buildAirbyteDocker('mssql')
dependsOn assemble
Expand Down
3 changes: 3 additions & 0 deletions airbyte-integrations/bases/base-normalization/dbt.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# This dockerfile only exists to pull and re-export this image converted to the local arch of this machine
# It is then consumed by the Dockerfile in this direcotry as "fishtownanalytics/dbt:1.0.0-dev"
FROM fishtownanalytics/dbt:1.0.0
Original file line number Diff line number Diff line change
Expand Up @@ -224,31 +224,37 @@ public String updateTable(final String finalSuffix, final StreamConfig stream) {
}
final String insertNewRecords = insertNewRecords(stream.id(), finalSuffix, stream.columns());
String dedupFinalTable = "";
String cdcDeletes = "";
String dedupRawTable = "";
if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) {
dedupRawTable = dedupRawTable(stream.id(), finalSuffix, stream.columns());
dedupRawTable = dedupRawTable(stream.id(), finalSuffix);
// If we're in dedup mode, then we must have a cursor
dedupFinalTable = dedupFinalTable(stream.id(), finalSuffix, stream.primaryKey(), stream.cursor().get(), stream.columns());
dedupFinalTable = dedupFinalTable(stream.id(), finalSuffix, stream.primaryKey(), stream.cursor().get());
cdcDeletes = cdcDeletes(stream, finalSuffix, stream.columns());
}
final String commitRawTable = commitRawTable(stream.id());

return new StringSubstitutor(Map.of(
"validate_primary_keys", validatePrimaryKeys,
"insert_new_records", insertNewRecords,
"dedup_final_table", dedupFinalTable,
"cdc_deletes", cdcDeletes,
"dedupe_raw_table", dedupRawTable,
"commit_raw_table", commitRawTable)).replace(
"""
DECLARE missing_pk_count INT64;
BEGIN TRANSACTION;
${validate_primary_keys}
${insert_new_records}
${dedup_final_table}
${dedupe_raw_table}
${cdc_deletes}
${commit_raw_table}
Expand Down Expand Up @@ -280,7 +286,8 @@ SELECT COUNT(1)
IF missing_pk_count > 0 THEN
RAISE USING message = FORMAT("Raw table has %s rows missing a primary key", CAST(missing_pk_count AS STRING));
END IF;""");
END IF
;""");
}

@VisibleForTesting
Expand All @@ -304,68 +311,64 @@ String insertNewRecords(final StreamId id, final String finalSuffix, final Linke
.collect(joining(",\n"));
final String columnList = streamColumns.keySet().stream().map(quotedColumnId -> quotedColumnId.name(QUOTE) + ",").collect(joining("\n"));

// Note that we intentionally excluded deleted records from this insert. See dedupRawRecords for an
// explanation of how CDC deletes work.
String cdcConditionalOrIncludeStatement = "";
if (streamColumns.containsKey(CDC_DELETED_AT_COLUMN)){
cdcConditionalOrIncludeStatement = """
OR (
_airbyte_loaded_at IS NOT NULL
AND JSON_VALUE(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NOT NULL
)
""";
}

return new StringSubstitutor(Map.of(
"raw_table_id", id.rawTableId(QUOTE),
"final_table_id", id.finalTableId(finalSuffix, QUOTE),
"column_casts", columnCasts,
"column_errors", columnErrors,
"cdcConditionalOrIncludeStatement", cdcConditionalOrIncludeStatement,
"column_list", columnList)).replace(
"""
INSERT INTO ${final_table_id}
(
${column_list}
_airbyte_meta,
_airbyte_raw_id,
_airbyte_extracted_at
)
WITH intermediate_data AS (
SELECT
${column_casts}
array_concat(
${column_errors}
) as _airbyte_cast_errors,
_airbyte_raw_id,
_airbyte_extracted_at
FROM ${raw_table_id}
WHERE
_airbyte_loaded_at IS NULL
AND (
JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NULL
OR JSON_TYPE(JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at')) = 'null'
INSERT INTO ${final_table_id}
(
${column_list}
_airbyte_meta,
_airbyte_raw_id,
_airbyte_extracted_at
)
)
SELECT
${column_list}
to_json(struct(_airbyte_cast_errors AS errors)) AS _airbyte_meta,
_airbyte_raw_id,
_airbyte_extracted_at
FROM intermediate_data;""");
WITH intermediate_data AS (
SELECT
${column_casts}
array_concat(
${column_errors}
) as _airbyte_cast_errors,
_airbyte_raw_id,
_airbyte_extracted_at
FROM ${raw_table_id}
WHERE
_airbyte_loaded_at IS NULL
${cdcConditionalOrIncludeStatement}
)
SELECT
${column_list}
to_json(struct(_airbyte_cast_errors AS errors)) AS _airbyte_meta,
_airbyte_raw_id,
_airbyte_extracted_at
FROM intermediate_data;""");
}

@VisibleForTesting
String dedupFinalTable(final StreamId id,
final String finalSuffix,
final List<ColumnId> primaryKey,
final ColumnId cursor,
final LinkedHashMap<ColumnId, AirbyteType> streamColumns) {
final ColumnId cursor) {
final String pkList = primaryKey.stream().map(columnId -> columnId.name(QUOTE)).collect(joining(","));
final String pkCastList = streamColumns.entrySet().stream()
.filter(e -> primaryKey.contains(e.getKey()))
.map(e -> extractAndCast(e.getKey(), e.getValue()))
.collect(joining(",\n "));
final String cursorCast = extractAndCast(cursor, streamColumns.get(cursor));

// See dedupRawTable for an explanation of why we delete records using the raw data rather than the
// final table's _ab_cdc_deleted_at column.

return new StringSubstitutor(Map.of(
"raw_table_id", id.rawTableId(QUOTE),
"final_table_id", id.finalTableId(finalSuffix, QUOTE),
"pk_list", pkList,
"pk_cast_list", pkCastList,
"cursor_name", cursor.name(QUOTE),
"cursor_cast", cursorCast)).replace(
"cursor_name", cursor.name(QUOTE))
).replace(
"""
DELETE FROM ${final_table_id}
WHERE
Expand All @@ -376,52 +379,54 @@ String dedupFinalTable(final StreamId id,
) as row_number FROM ${final_table_id}
)
WHERE row_number != 1
);
DELETE FROM ${final_table_id}
WHERE
${pk_list} IN (
SELECT (
${pk_cast_list}
)
FROM ${raw_table_id}
WHERE
JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NOT NULL
AND JSON_TYPE(JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at')) != 'null'
AND ${cursor_name} < ${cursor_cast}
);""");
)
;""");
}

@VisibleForTesting
String dedupRawTable(final StreamId id, final String finalSuffix, LinkedHashMap<ColumnId, AirbyteType> streamColumns) {
/*
* Note that we need to keep the deletion raw records because of how async syncs work. Consider this
* sequence of source events: 1. Insert record id=1 2. Update record id=1 3. Delete record id=1
*
* It's possible for the destination to receive them out of order, e.g.: 1. Insert 2. Delete 3.
* Update
*
* We can generally resolve this using the cursor column (e.g. multiple updates in the wrong order).
* However, deletions are special because we propagate them as hard deletes to the final table. As a
* result, we need to keep the deletion in the raw table, so that a late-arriving update doesn't
* incorrectly reinsert the final record.
*/
String cdcDeletedAtClause;
if (streamColumns.containsKey(CDC_DELETED_AT_COLUMN)) {
cdcDeletedAtClause = """
AND (
JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NULL
OR JSON_TYPE(JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at')) = 'null'
)
""";
} else {
cdcDeletedAtClause = "";
String cdcDeletes(final StreamConfig stream,
final String finalSuffix,
final LinkedHashMap<ColumnId, AirbyteType> streamColumns) {

if (stream.destinationSyncMode() != DestinationSyncMode.APPEND_DEDUP){
return "";
}

if (!streamColumns.containsKey(CDC_DELETED_AT_COLUMN)){
return "";
}

final String pkList = stream.primaryKey().stream().map(columnId -> columnId.name(QUOTE)).collect(joining(","));
String pkCasts = stream.primaryKey().stream().map(pk -> extractAndCast(pk, streamColumns.get(pk))).collect(joining(",\n"));

// we want to grab IDs for deletion from the raw table (not the final table itself) to hand out-of-order record insertions after the delete has been registered
return new StringSubstitutor(Map.of(
"final_table_id", stream.id().finalTableId(finalSuffix, QUOTE),
"raw_table_id", stream.id().rawTableId(QUOTE),
"pk_list", pkList,
"pk_extracts", pkCasts,
"quoted_cdc_delete_column", QUOTE + "_ab_cdc_deleted_at" + QUOTE)
).replace(
"""
DELETE FROM ${final_table_id}
WHERE
(${pk_list}) IN (
SELECT (
${pk_extracts}
)
FROM ${raw_table_id}
WHERE
JSON_VALUE(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NOT NULL
)
;"""
);
}

@VisibleForTesting
String dedupRawTable(final StreamId id, final String finalSuffix) {
return new StringSubstitutor(Map.of(
"raw_table_id", id.rawTableId(QUOTE),
"final_table_id", id.finalTableId(finalSuffix, QUOTE),
"cdc_deleted_at_clause", cdcDeletedAtClause)).replace(
"final_table_id", id.finalTableId(finalSuffix, QUOTE))).replace(
// Note that this leaves _all_ deletion records in the raw table. We _could_ clear them out, but it
// would be painful,
// and it only matters in a few edge cases.
Expand All @@ -432,7 +437,6 @@ OR JSON_TYPE(JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at')) = 'null'
`_airbyte_raw_id` NOT IN (
SELECT `_airbyte_raw_id` FROM ${final_table_id}
)
${cdc_deleted_at_clause}
;""");
}

Expand Down
Loading

0 comments on commit 7efc294

Please sign in to comment.