Skip to content

Commit

Permalink
implement 1s1t DATs for destination-bigquery (#27852)
Browse files Browse the repository at this point in the history
* intiial implementation

* Automated Commit - Formatting Changes

* add second sync to test

* do concurrent things

* Automated Commit - Formatting Changes

* clarify comment

* minor tweaks

* more stuff

* Automated Commit - Formatting Changes

* minor cleanup

* lots of fixes

* handle sql vs json null better
* verify extra columns
* only check deleted_at if in DEDUP mode and the column exists
* add full refresh append test case

* Automated Commit - Formatting Changes

* add tests for the remaining sync modes

* Automated Commit - Formatting Changes

* readability stuff

* Automated Commit - Formatting Changes

* add test for gcs mode

* remove static fields

* Automated Commit - Formatting Changes

* add more test cases, tweak test scaffold

* cleanup

* Automated Commit - Formatting Changes

* extract recorddiffer

* and use it in the sql generator test

* fix

* comment

* naming+comment

* one more comment

* better assert

* remove unnecessary thing

* one last thing

* Automated Commit - Formatting Changes

* enable concurrent execution on all java integration tests

* add test for default namespace

* Automated Commit - Formatting Changes

* implement a 2-stream test

* Automated Commit - Formatting Changes

* extract methods

* invert jsonNodesNotEquivalent

* Automated Commit - Formatting Changes

* fix conditional

* pull out diffSingleRecord

* Automated Commit - Formatting Changes

* handle nulls correctly

* remove raw-specific handling; break up methods

* Automated Commit - Formatting Changes

---------

Co-authored-by: edgao <edgao@users.noreply.github.com>
Co-authored-by: octavia-approvington <octavia-approvington@users.noreply.github.com>
  • Loading branch information
3 people authored Jul 13, 2023
1 parent f2da68c commit bf65992
Show file tree
Hide file tree
Showing 26 changed files with 1,519 additions and 572 deletions.
15 changes: 15 additions & 0 deletions airbyte-integrations/bases/base-typing-deduping-test/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
plugins {
id 'java-library'
}

dependencies {
implementation project(':airbyte-config-oss:config-models-oss')
implementation project(':airbyte-connector-test-harnesses:acceptance-test-harness')
implementation project(':airbyte-integrations:bases:base-typing-deduping')
implementation libs.airbyte.protocol

implementation(enforcedPlatform('org.junit:junit-bom:5.8.2'))
implementation 'org.junit.jupiter:junit-jupiter-api'
implementation 'org.junit.jupiter:junit-jupiter-params'
implementation 'org.mockito:mockito-core:4.6.1'
}

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"type": "object",
"properties": {
"id1": { "type": "integer" },
"id2": { "type": "integer" },
"updated_at": {
"type": "string",
"airbyte_type": "timestamp_with_timezone"
},
"_ab_cdc_deleted_at": {
"type": "string",
"airbyte_type": "timestamp_with_timezone"
},
"name": { "type": "string" },
"address": {
"type": "object",
"properties": {
"city": { "type": "string" },
"state": { "type": "string" }
}
},
"age": { "type": "integer" },
"registration_date": {
"type": "string",
"format": "date",
"airbyte_type": "date"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Keep the Alice record with more recent updated_at
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Keep the Alice record with more recent updated_at
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}
// Invalid columns are nulled out (i.e. SQL null, not JSON null)
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}}
// Note the duplicate record. In this sync mode, we don't dedup anything.
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}}
// Invalid data is still allowed in the raw table.
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// emitted_at:1000 is equal to 1970-01-01 00:00:01Z, which is what you'll see in the expected records.
// This obviously makes no sense in relation to updated_at being in the year 2000, but that's OK
// because (from destinations POV) updated_at has no relation to emitted_at.
{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}}}
// Emit a second record for id=(1,200) with a different updated_at. This generally doesn't happen
// in full refresh syncs - but if T+D is implemented correctly, it shouldn't matter
// (i.e. both records should be written to the final table).
{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}}}
// Emit a record with no _ab_cdc_deleted_at field. CDC sources typically emit an explicit null, but we should handle both cases.
{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}}}
// Emit a record with an invalid age.
{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie"}

{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "name": "Bob", "address": {"city": "New York", "state": "NY"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01:00:00:00Z"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// We keep the records from the first sync
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}}
// And append the records from the second sync
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01:00:00:00Z"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "name": "Bob", "address": {"city": "New York", "state": "NY"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01:00:00:00Z"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01:00:00:00Z"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}
// Delete Bob, keep Charlie
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}}
// Keep the record that deleted Bob, but delete the other records associated with id=(1, 201)
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01:00:00:00Z"}}
// And keep Charlie's record, even though it wasn't reemitted in sync2.
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"type": "RECORD", "record": {"emitted_at": 2000, "data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}}}
{"type": "RECORD", "record": {"emitted_at": 2000, "data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}}}
// Set deleted_at to something non-null. Again, T+D doesn't check the actual _value_ of deleted_at (i.e. the fact that it's in the past is irrelevant).
// It only cares whether deleted_at is non-null. So this should delete Bob from the final table (in dedup mode).
{"type": "RECORD", "record": {"emitted_at": 2000, "data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01:00:00:00Z"}}}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies {
implementation ('com.github.airbytehq:json-avro-converter:1.1.0') { exclude group: 'ch.qos.logback', module: 'logback-classic'}

testImplementation project(':airbyte-integrations:bases:standard-destination-test')
testImplementation project(':airbyte-integrations:bases:base-typing-deduping-test')

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-bigquery')
Expand All @@ -45,12 +46,3 @@ configurations.all {
force 'com.google.api-client:google-api-client:1.31.5'
}
}

integrationTestJava {
systemProperties = [
'junit.jupiter.execution.parallel.enabled': 'true'
// TODO what's preventing us from turning this on? (probably a lot of things)
// 'junit.jupiter.execution.parallel.mode.default': 'concurrent'
]
}

Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private AirbyteConnectionStatus checkGcsPermission(final JsonNode config) {
}
}

protected BigQuery getBigQuery(final JsonNode config) {
public static BigQuery getBigQuery(final JsonNode config) {
final String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public String updateTable(final String finalSuffix, final StreamConfig stream) {
if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) {
validatePrimaryKeys = validatePrimaryKeys(stream.id(), stream.primaryKey(), stream.columns());
}
final String insertNewRecords = insertNewRecords(stream.id(), finalSuffix, stream.columns());
final String insertNewRecords = insertNewRecords(stream.id(), finalSuffix, stream.columns(), stream.destinationSyncMode());
String dedupFinalTable = "";
String cdcDeletes = "";
String dedupRawTable = "";
Expand All @@ -245,15 +245,15 @@ public String updateTable(final String finalSuffix, final StreamConfig stream) {
DECLARE missing_pk_count INT64;
BEGIN TRANSACTION;
${validate_primary_keys}
${insert_new_records}
${dedup_final_table}
${dedupe_raw_table}
${cdc_deletes}
${commit_raw_table}
Expand Down Expand Up @@ -291,7 +291,7 @@ SELECT COUNT(1)
}

@VisibleForTesting
String insertNewRecords(final StreamId id, final String finalSuffix, final LinkedHashMap<ColumnId, AirbyteType> streamColumns) {
String insertNewRecords(final StreamId id, final String finalSuffix, final LinkedHashMap<ColumnId, AirbyteType> streamColumns, DestinationSyncMode destinationSyncMode) {
final String columnCasts = streamColumns.entrySet().stream().map(
col -> extractAndCast(col.getKey(), col.getValue()) + " as " + col.getKey().name(QUOTE) + ",")
.collect(joining("\n"));
Expand All @@ -310,6 +310,17 @@ String insertNewRecords(final StreamId id, final String finalSuffix, final Linke
END"""))
.collect(joining(",\n"));
final String columnList = streamColumns.keySet().stream().map(quotedColumnId -> quotedColumnId.name(QUOTE) + ",").collect(joining("\n"));
final String deletionClause;
if (destinationSyncMode == DestinationSyncMode.APPEND_DEDUP && streamColumns.keySet().stream().anyMatch(col -> "_ab_cdc_deleted_at".equals(col.originalName()))) {
deletionClause = """
AND (
JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NULL
OR JSON_TYPE(JSON_QUERY(`_airbyte_data`, '$._ab_cdc_deleted_at')) = 'null'
)
""";
} else {
deletionClause = "";
}

String cdcConditionalOrIncludeStatement = "";
if (streamColumns.containsKey(CDC_DELETED_AT_COLUMN)){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.util.LinkedList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryDestinationTestUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestinationTestUtils.class);

/**
* Parse the config file and replace dataset with datasetId randomly generated by the test
Expand All @@ -33,6 +35,7 @@ public class BigQueryDestinationTestUtils {
* @throws IOException
*/
public static JsonNode createConfig(Path configFile, String datasetId) throws IOException {
LOGGER.info("Setting default dataset to {}", datasetId);
final String tmpConfigAsString = Files.readString(configFile);
final JsonNode tmpConfigJson = Jsons.deserialize(tmpConfigAsString);
return Jsons.jsonNode(((ObjectNode) tmpConfigJson).put(BigQueryConsts.CONFIG_DATASET_ID, datasetId));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.airbyte.integrations.destination.bigquery.typing_deduping;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.BaseTypingDedupingTest;
import io.airbyte.integrations.destination.bigquery.BigQueryDestination;
import io.airbyte.integrations.destination.bigquery.BigQueryDestinationTestUtils;
import io.airbyte.integrations.destination.bigquery.BigQueryUtils;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Instant;
import java.util.LinkedHashMap;
import java.util.List;

public abstract class AbstractBigQueryTypingDedupingTest extends BaseTypingDedupingTest {

private BigQuery bq;

protected abstract String getConfigPath();

@Override
public JsonNode generateConfig() throws IOException {
final String datasetId = "typing_deduping_default_dataset" + getUniqueSuffix();
JsonNode config = BigQueryDestinationTestUtils.createConfig(Path.of(getConfigPath()), datasetId);
bq = BigQueryDestination.getBigQuery(config);
return config;
}

@Override
protected String getImageName() {
return "airbyte/destination-bigquery:dev";
}

@Override
protected List<JsonNode> dumpRawTableRecords(String streamNamespace, String streamName) throws InterruptedException {
if (streamNamespace == null) {
streamNamespace = BigQueryUtils.getDatasetId(getConfig());
}
TableResult result = bq.query(QueryJobConfiguration.of("SELECT * FROM airbyte." + streamNamespace + "_" + streamName));
return BigQuerySqlGeneratorIntegrationTest.toJsonRecords(result);
}

@Override
protected List<JsonNode> dumpFinalTableRecords(String streamNamespace, String streamName) throws InterruptedException {
if (streamNamespace == null) {
streamNamespace = BigQueryUtils.getDatasetId(getConfig());
}
TableResult result = bq.query(QueryJobConfiguration.of("SELECT * FROM " + streamNamespace + "." + streamName));
return BigQuerySqlGeneratorIntegrationTest.toJsonRecords(result);
}

@Override
protected void teardownStreamAndNamespace(String streamNamespace, String streamName) {
if (streamNamespace == null) {
streamNamespace = BigQueryUtils.getDatasetId(getConfig());
}
// bq.delete simply returns false if the table/schema doesn't exist (e.g. if the connector failed to create it)
// so we don't need to do any existence checks here.
bq.delete(TableId.of("airbyte", streamNamespace + "_" + streamName));
bq.delete(DatasetId.of(streamNamespace), BigQuery.DatasetDeleteOption.deleteContents());
}
}
Loading

0 comments on commit bf65992

Please sign in to comment.