Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement 1s1t DATs for destination-bigquery #27852

Merged
merged 54 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
448dd28
intiial implementation
edgao Jun 29, 2023
639f77a
Automated Commit - Formatting Changes
edgao Jun 29, 2023
3653da4
add second sync to test
edgao Jun 30, 2023
a4ba44a
do concurrent things
edgao Jun 30, 2023
8298bff
Automated Commit - Formatting Changes
edgao Jun 30, 2023
24d0ca4
clarify comment
edgao Jun 30, 2023
d23ba3f
minor tweaks
edgao Jun 30, 2023
665fb3a
more stuff
edgao Jun 30, 2023
bd61d13
Automated Commit - Formatting Changes
edgao Jun 30, 2023
9911b5b
minor cleanup
edgao Jun 30, 2023
f06815e
lots of fixes
edgao Jun 30, 2023
d05365a
Automated Commit - Formatting Changes
edgao Jun 30, 2023
5588768
add tests for the remaining sync modes
edgao Jun 30, 2023
80f8d90
Automated Commit - Formatting Changes
edgao Jun 30, 2023
73b9e90
readability stuff
edgao Jun 30, 2023
a40935b
Automated Commit - Formatting Changes
edgao Jun 30, 2023
7bb4b2c
add test for gcs mode
edgao Jun 30, 2023
4da21d1
remove static fields
edgao Jun 30, 2023
a8fa7d4
Automated Commit - Formatting Changes
octavia-approvington Jun 30, 2023
067ee0d
add more test cases, tweak test scaffold
edgao Jun 30, 2023
c008915
cleanup
edgao Jun 30, 2023
1b376a2
Automated Commit - Formatting Changes
edgao Jun 30, 2023
d86dd30
extract recorddiffer
edgao Jul 3, 2023
9c136f7
and use it in the sql generator test
edgao Jul 3, 2023
844bba6
fix
edgao Jul 3, 2023
0f1e7ab
comment
edgao Jul 3, 2023
dc5ab24
naming+comment
edgao Jul 3, 2023
a607793
one more comment
edgao Jul 3, 2023
84c387d
better assert
edgao Jul 3, 2023
ffa9df0
remove unnecessary thing
edgao Jul 3, 2023
ffd3e3f
one last thing
edgao Jul 3, 2023
3dbaa16
Automated Commit - Formatting Changes
edgao Jul 3, 2023
97f8c19
enable concurrent execution on all java integration tests
edgao Jul 5, 2023
0796dae
Merge branch 'edgao/1s1t_redeploy' into edgao/1s1t/dats
edgao Jul 5, 2023
8bd8fd1
Merge branch 'edgao/1s1t_redeploy' into edgao/1s1t/dats
edgao Jul 5, 2023
9934901
add test for default namespace
edgao Jul 6, 2023
760f829
Automated Commit - Formatting Changes
edgao Jul 6, 2023
c82cadc
implement a 2-stream test
edgao Jul 7, 2023
0fdea84
Automated Commit - Formatting Changes
edgao Jul 7, 2023
984fb1f
Merge branch 'edgao/1s1t_redeploy' into edgao/1s1t/dats
edgao Jul 7, 2023
4442b06
extract methods
edgao Jul 10, 2023
6d437da
invert jsonNodesNotEquivalent
edgao Jul 10, 2023
de3c2e9
Automated Commit - Formatting Changes
edgao Jul 10, 2023
58e5d10
fix conditional
edgao Jul 10, 2023
6ef65b1
pull out diffSingleRecord
edgao Jul 10, 2023
797b60f
Automated Commit - Formatting Changes
edgao Jul 10, 2023
060d30f
handle nulls correctly
edgao Jul 10, 2023
7539094
remove raw-specific handling; break up methods
edgao Jul 11, 2023
144970b
Automated Commit - Formatting Changes
edgao Jul 11, 2023
c751f7e
Merge branch 'edgao/1s1t_redeploy' into edgao/1s1t/dats
edgao Jul 11, 2023
d9fd22a
Merge branch 'edgao/1s1t_redeploy' into edgao/1s1t/dats
edgao Jul 11, 2023
4efa242
Merge branch 'edgao/1s1t_redeploy' into edgao/1s1t/dats
edgao Jul 12, 2023
9d3275a
Merge branch 'edgao/1s1t_redeploy' into edgao/1s1t/dats
edgao Jul 12, 2023
b32f17a
Merge branch 'edgao/1s1t_redeploy' into edgao/1s1t/dats
edgao Jul 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions airbyte-integrations/bases/base-typing-deduping-test/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
plugins {
id 'java-library'
}

dependencies {
implementation project(':airbyte-config-oss:config-models-oss')
implementation project(':airbyte-connector-test-harnesses:acceptance-test-harness')
implementation project(':airbyte-integrations:bases:base-typing-deduping')
implementation libs.airbyte.protocol

implementation(enforcedPlatform('org.junit:junit-bom:5.8.2'))
implementation 'org.junit.jupiter:junit-jupiter-api'
implementation 'org.junit.jupiter:junit-jupiter-params'
implementation 'org.mockito:mockito-core:4.6.1'
}

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"type": "object",
"properties": {
"id1": { "type": "integer" },
"id2": { "type": "integer" },
"updated_at": {
"type": "string",
"airbyte_type": "timestamp_with_timezone"
},
"_ab_cdc_deleted_at": {
"type": "string",
"airbyte_type": "timestamp_with_timezone"
},
"name": { "type": "string" },
"address": {
"type": "object",
"properties": {
"city": { "type": "string" },
"state": { "type": "string" }
}
},
"age": { "type": "integer" },
"registration_date": {
"type": "string",
"format": "date",
"airbyte_type": "date"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Keep the Alice record with more recent updated_at
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Keep the Alice record with more recent updated_at
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}
// Invalid columns are nulled out (i.e. SQL null, not JSON null)
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}}
// Note the duplicate record. In this sync mode, we don't dedup anything.
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}}
// Invalid data is still allowed in the raw table.
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// emitted_at:1000 is equal to 1970-01-01 00:00:01Z, which is what you'll see in the expected records.
// This obviously makes no sense in relation to updated_at being in the year 2000, but that's OK
// because (from destinations POV) updated_at has no relation to emitted_at.
{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}}}
// Emit a second record for id=(1,200) with a different updated_at. This generally doesn't happen
// in full refresh syncs - but if T+D is implemented correctly, it shouldn't matter
// (i.e. both records should be written to the final table).
{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}}}
// Emit a record with no _ab_cdc_deleted_at field. CDC sources typically emit an explicit null, but we should handle both cases.
{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}}}
// Emit a record with an invalid age.
{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie"}

{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "name": "Bob", "address": {"city": "New York", "state": "NY"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01:00:00:00Z"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// We keep the records from the first sync
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}}
// And append the records from the second sync
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01:00:00:00Z"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "name": "Bob", "address": {"city": "New York", "state": "NY"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01:00:00:00Z"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01:00:00:00Z"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}
// Delete Bob, keep Charlie
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}}
// Keep the record that deleted Bob, but delete the other records associated with id=(1, 201)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can have comments in JSONl?!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can if you're the one doing the parsing :P https://github.com/airbytehq/airbyte/pull/27852/files#diff-411f8457a8ec3a20dd71664fb4d47faed575ca681a68d041382d6768cea32aadR427

afaik jsonl doesn't have a "real" standard syntax, but vs code at least has syntax highlighting for this comment style. So I figure it's good enough 🚛
image

{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01:00:00:00Z"}}
// And keep Charlie's record, even though it wasn't reemitted in sync2.
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"type": "RECORD", "record": {"emitted_at": 2000, "data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}}}
{"type": "RECORD", "record": {"emitted_at": 2000, "data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}}}
// Set deleted_at to something non-null. Again, T+D doesn't check the actual _value_ of deleted_at (i.e. the fact that it's in the past is irrelevant).
// It only cares whether deleted_at is non-null. So this should delete Bob from the final table (in dedup mode).
{"type": "RECORD", "record": {"emitted_at": 2000, "data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01:00:00:00Z"}}}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies {
implementation ('com.github.airbytehq:json-avro-converter:1.1.0') { exclude group: 'ch.qos.logback', module: 'logback-classic'}

testImplementation project(':airbyte-integrations:bases:standard-destination-test')
testImplementation project(':airbyte-integrations:bases:base-typing-deduping-test')

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-bigquery')
Expand All @@ -45,12 +46,3 @@ configurations.all {
force 'com.google.api-client:google-api-client:1.31.5'
}
}

integrationTestJava {
systemProperties = [
'junit.jupiter.execution.parallel.enabled': 'true'
// TODO what's preventing us from turning this on? (probably a lot of things)
// 'junit.jupiter.execution.parallel.mode.default': 'concurrent'
]
}

Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private AirbyteConnectionStatus checkGcsPermission(final JsonNode config) {
}
}

protected BigQuery getBigQuery(final JsonNode config) {
public static BigQuery getBigQuery(final JsonNode config) {
final String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public String updateTable(final String finalSuffix, final StreamConfig stream) {
if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) {
validatePrimaryKeys = validatePrimaryKeys(stream.id(), stream.primaryKey(), stream.columns());
}
final String insertNewRecords = insertNewRecords(stream.id(), finalSuffix, stream.columns());
final String insertNewRecords = insertNewRecords(stream.id(), finalSuffix, stream.columns(), stream.destinationSyncMode());
String dedupFinalTable = "";
String cdcDeletes = "";
String dedupRawTable = "";
Expand All @@ -245,15 +245,15 @@ public String updateTable(final String finalSuffix, final StreamConfig stream) {
DECLARE missing_pk_count INT64;

BEGIN TRANSACTION;

${validate_primary_keys}

${insert_new_records}

${dedup_final_table}

${dedupe_raw_table}

${cdc_deletes}

${commit_raw_table}
Expand Down Expand Up @@ -291,7 +291,7 @@ SELECT COUNT(1)
}

@VisibleForTesting
String insertNewRecords(final StreamId id, final String finalSuffix, final LinkedHashMap<ColumnId, AirbyteType> streamColumns) {
String insertNewRecords(final StreamId id, final String finalSuffix, final LinkedHashMap<ColumnId, AirbyteType> streamColumns, DestinationSyncMode destinationSyncMode) {
final String columnCasts = streamColumns.entrySet().stream().map(
col -> extractAndCast(col.getKey(), col.getValue()) + " as " + col.getKey().name(QUOTE) + ",")
.collect(joining("\n"));
Expand All @@ -310,6 +310,17 @@ String insertNewRecords(final StreamId id, final String finalSuffix, final Linke
END"""))
.collect(joining(",\n"));
final String columnList = streamColumns.keySet().stream().map(quotedColumnId -> quotedColumnId.name(QUOTE) + ",").collect(joining("\n"));
final String deletionClause;
if (destinationSyncMode == DestinationSyncMode.APPEND_DEDUP && streamColumns.keySet().stream().anyMatch(col -> "_ab_cdc_deleted_at".equals(col.originalName()))) {
deletionClause = """
AND (
JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NULL
OR JSON_TYPE(JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at')) = 'null'
)
""";
} else {
deletionClause = "";
}

String cdcConditionalOrIncludeStatement = "";
if (streamColumns.containsKey(CDC_DELETED_AT_COLUMN)){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.util.LinkedList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryDestinationTestUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestinationTestUtils.class);

/**
* Parse the config file and replace dataset with datasetId randomly generated by the test
Expand All @@ -33,6 +35,7 @@ public class BigQueryDestinationTestUtils {
* @throws IOException
*/
public static JsonNode createConfig(Path configFile, String datasetId) throws IOException {
LOGGER.info("Setting default dataset to {}", datasetId);
final String tmpConfigAsString = Files.readString(configFile);
final JsonNode tmpConfigJson = Jsons.deserialize(tmpConfigAsString);
return Jsons.jsonNode(((ObjectNode) tmpConfigJson).put(BigQueryConsts.CONFIG_DATASET_ID, datasetId));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.airbyte.integrations.destination.bigquery.typing_deduping;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.BaseTypingDedupingTest;
import io.airbyte.integrations.destination.bigquery.BigQueryDestination;
import io.airbyte.integrations.destination.bigquery.BigQueryDestinationTestUtils;
import io.airbyte.integrations.destination.bigquery.BigQueryUtils;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Instant;
import java.util.LinkedHashMap;
import java.util.List;

public abstract class AbstractBigQueryTypingDedupingTest extends BaseTypingDedupingTest {

private BigQuery bq;

protected abstract String getConfigPath();

@Override
public JsonNode generateConfig() throws IOException {
final String datasetId = "typing_deduping_default_dataset" + getUniqueSuffix();
JsonNode config = BigQueryDestinationTestUtils.createConfig(Path.of(getConfigPath()), datasetId);
bq = BigQueryDestination.getBigQuery(config);
return config;
}

@Override
protected String getImageName() {
return "airbyte/destination-bigquery:dev";
}

@Override
protected List<JsonNode> dumpRawTableRecords(String streamNamespace, String streamName) throws InterruptedException {
if (streamNamespace == null) {
streamNamespace = BigQueryUtils.getDatasetId(getConfig());
}
TableResult result = bq.query(QueryJobConfiguration.of("SELECT * FROM airbyte." + streamNamespace + "_" + streamName));
return BigQuerySqlGeneratorIntegrationTest.toJsonRecords(result);
}

@Override
protected List<JsonNode> dumpFinalTableRecords(String streamNamespace, String streamName) throws InterruptedException {
if (streamNamespace == null) {
streamNamespace = BigQueryUtils.getDatasetId(getConfig());
}
TableResult result = bq.query(QueryJobConfiguration.of("SELECT * FROM " + streamNamespace + "." + streamName));
return BigQuerySqlGeneratorIntegrationTest.toJsonRecords(result);
}

@Override
protected void teardownStreamAndNamespace(String streamNamespace, String streamName) {
if (streamNamespace == null) {
streamNamespace = BigQueryUtils.getDatasetId(getConfig());
}
// bq.delete simply returns false if the table/schema doesn't exist (e.g. if the connector failed to create it)
// so we don't need to do any existence checks here.
bq.delete(TableId.of("airbyte", streamNamespace + "_" + streamName));
bq.delete(DatasetId.of(streamNamespace), BigQuery.DatasetDeleteOption.deleteContents());
}
}
Loading
Loading