Skip to content

Commit

Permalink
Destination bigquery: rerelease 1s1t behind gate (#27936)
Browse files Browse the repository at this point in the history
* Revert "Revert "Destination Bigquery: Scaffolding for destinations v2 (#27268)""

This reverts commit 348c577.

* version bumps+changelog

* Speed up BQ by having 2 queries, and not an OR (#27981)

* 🐛 Destination Bigquery: fix bug in standard inserts for syncs >10K records (#27856)

* only run t+d code if it's enabled

* dockerfile+changelog

* remove changelog entry

* Destinations V2: handle optional fields for `object` and `array` types (#27898)

* catch null schema

* fix null properties

* clean up

* consolidate + add more tests

* try catch

* empty json test

* Automated Commit - Formatting Changes

* remove todo

* destination bigquery: misc updates to 1s1t code (#28057)

* switch to checkedconsumer

* add unit test for buildColumnId

* use flag

* restructure prefix check

* fix build

* more type-parsing fixes (#28100)

* more type-parsing fixes

* handle duplicates

* Automated Commit - Format and Process Resources Changes

* add tests for asColumns

* Automated Commit - Format and Process Resources Changes

* log warnings instead of throwing exception

* better log message

* error level

---------

Co-authored-by: edgao <edgao@users.noreply.github.com>

* Automated Commit - Formatting Changes

* Improve protocol type parsing (#28126)

* Automated Commit - Formatting Changes

* Change from T&D every 10k records to an increasing time based interval (#28130)

* fifteen minute t&d

* add typing and deduping operation valve for increased intervals of typing and deduping

* Automated Commit - Format and Process Resources Changes

* resolve bizarre merge conflict

* Automated Commit - Format and Process Resources Changes

---------

Co-authored-by: jbfbell <jbfbell@users.noreply.github.com>

* Simplify and speed up CDC delete support [DestinationsV2] (#28029)

* Simplify and speed up CDC delete support [DestinationsV2]

* better QUOTE

* spotbugs?

* recompile dbt image for local arch and use that when building images

* things compile, but tests fail

* tests working-ish

* comment

* fix logic to re-insert deleted records for cursor comparison.

tests pass!

* remove comment

* Skip CDC re-include logic if there are no CDC columns

* stop hardcoding pk (#28092)

* wip

* remove TODOs

---------

Co-authored-by: Edward Gao <edward.gao@airbyte.io>

* update method name

* Automated Commit - Formatting Changes

* depend on pinned normalization version

* implement 1s1t DATs for destination-bigquery (#27852)

* intiial implementation

* Automated Commit - Formatting Changes

* add second sync to test

* do concurrent things

* Automated Commit - Formatting Changes

* clarify comment

* minor tweaks

* more stuff

* Automated Commit - Formatting Changes

* minor cleanup

* lots of fixes

* handle sql vs json null better
* verify extra columns
* only check deleted_at if in DEDUP mode and the column exists
* add full refresh append test case

* Automated Commit - Formatting Changes

* add tests for the remaining sync modes

* Automated Commit - Formatting Changes

* readability stuff

* Automated Commit - Formatting Changes

* add test for gcs mode

* remove static fields

* Automated Commit - Formatting Changes

* add more test cases, tweak test scaffold

* cleanup

* Automated Commit - Formatting Changes

* extract recorddiffer

* and use it in the sql generator test

* fix

* comment

* naming+comment

* one more comment

* better assert

* remove unnecessary thing

* one last thing

* Automated Commit - Formatting Changes

* enable concurrent execution on all java integration tests

* add test for default namespace

* Automated Commit - Formatting Changes

* implement a 2-stream test

* Automated Commit - Formatting Changes

* extract methods

* invert jsonNodesNotEquivalent

* Automated Commit - Formatting Changes

* fix conditional

* pull out diffSingleRecord

* Automated Commit - Formatting Changes

* handle nulls correctly

* remove raw-specific handling; break up methods

* Automated Commit - Formatting Changes

---------

Co-authored-by: edgao <edgao@users.noreply.github.com>
Co-authored-by: octavia-approvington <octavia-approvington@users.noreply.github.com>

* Destinations V2: move create raw tables earlier (#28255)

* move create raw tables

* better log message

* stop building normalization (#28256)

* fix ability to run tests

* disable incremental t+d for now

* Automated Commit - Formatting Changes

---------

Co-authored-by: Evan Tahler <evan@airbyte.io>
Co-authored-by: Cynthia Yin <cynthia@airbyte.io>
Co-authored-by: cynthiaxyin <cynthiaxyin@users.noreply.github.com>
Co-authored-by: edgao <edgao@users.noreply.github.com>
Co-authored-by: Joe Bell <joseph.bell@airbyte.io>
Co-authored-by: jbfbell <jbfbell@users.noreply.github.com>
Co-authored-by: octavia-approvington <octavia-approvington@users.noreply.github.com>
  • Loading branch information
8 people authored Jul 14, 2023
1 parent 0d185a2 commit 934acaa
Show file tree
Hide file tree
Showing 83 changed files with 5,031 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.jackson.MoreMappers;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.TypingAndDedupingFlag;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import java.util.UUID;
import org.apache.avro.Schema;
Expand All @@ -32,8 +33,14 @@ public AvroRecordFactory(final Schema schema, final JsonAvroConverter converter)

public GenericData.Record getAvroRecord(final UUID id, final AirbyteRecordMessage recordMessage) throws JsonProcessingException {
final ObjectNode jsonRecord = MAPPER.createObjectNode();
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_ID, id.toString());
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, recordMessage.getEmittedAt());
if (TypingAndDedupingFlag.isDestinationV2()) {
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, id.toString());
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, recordMessage.getEmittedAt());
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, (Long) null);
} else {
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_ID, id.toString());
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, recordMessage.getEmittedAt());
}
jsonRecord.setAll((ObjectNode) recordMessage.getData());

return converter.convertToGenericDataRecord(WRITER.writeValueAsBytes(jsonRecord), schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.DestinationConfig;
import io.airbyte.integrations.destination.record_buffer.BufferStorage;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.record_buffer.InMemoryBuffer;
Expand All @@ -27,6 +28,7 @@
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class AvroSerializedBufferTest {
Expand All @@ -49,6 +51,11 @@ public class AvroSerializedBufferTest {
Field.of("nested_column", JsonSchemaType.OBJECT));
private static final ConfiguredAirbyteCatalog catalog = CatalogHelpers.createConfiguredAirbyteCatalog(STREAM, null, FIELDS);

@BeforeAll
public static void setup() {
DestinationConfig.initialize(Jsons.deserialize("{}"));
}

@Test
public void testSnappyAvroWriter() throws Exception {
final S3AvroFormatConfig config = new S3AvroFormatConfig(Jsons.jsonNode(Map.of("compression_codec", Map.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.amazonaws.util.IOUtils;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.DestinationConfig;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.Field;
Expand All @@ -33,6 +34,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class ParquetSerializedBufferTest {
Expand Down Expand Up @@ -60,6 +62,11 @@ public class ParquetSerializedBufferTest {
Field.of("datetime_with_timezone", JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE));
private static final ConfiguredAirbyteCatalog catalog = CatalogHelpers.createConfiguredAirbyteCatalog(STREAM, null, FIELDS);

@BeforeAll
public static void setup() {
DestinationConfig.initialize(Jsons.deserialize("{}"));
}

@Test
public void testUncompressedParquetWriter() throws Exception {
final S3DestinationConfig config = S3DestinationConfig.getS3DestinationConfig(Jsons.jsonNode(Map.of(
Expand Down
26 changes: 25 additions & 1 deletion airbyte-integrations/bases/base-java/run_with_normalization.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,34 @@ set -o pipefail
destination_exit_code=$?
echo '{"type": "LOG","log":{"level":"INFO","message":"Destination process done (exit code '"$destination_exit_code"')"}}'

# store original args
args=$@

while [ $# -ne 0 ]; do
case "$1" in
--config)
CONFIG_FILE="$2"
shift 2
;;
*)
# move on
shift
;;
esac
done

# restore original args after shifts
set -- $args

USE_1S1T_FORMAT="false"
if [[ -s "$CONFIG_FILE" ]]; then
USE_1S1T_FORMAT=$(jq -r '.use_1s1t_format' "$CONFIG_FILE")
fi

if test "$1" != 'write'
then
normalization_exit_code=0
elif test "$NORMALIZATION_TECHNIQUE" = 'LEGACY'
elif test "$NORMALIZATION_TECHNIQUE" = 'LEGACY' && test "$USE_1S1T_FORMAT" != "true"
then
echo '{"type": "LOG","log":{"level":"INFO","message":"Starting in-connector normalization"}}'
# the args in a write command are `write --catalog foo.json --config bar.json`
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Singleton of destination config for easy lookup of values.
*/
@Singleton
public class DestinationConfig {

private static final Logger LOGGER = LoggerFactory.getLogger(DestinationConfig.class);

private static DestinationConfig config;

@VisibleForTesting
protected JsonNode root;

private DestinationConfig() {}

public static void initialize(final JsonNode root) {
if (config == null) {
if (root == null) {
throw new IllegalArgumentException("Cannot create DestinationConfig from null.");
}
config = new DestinationConfig();
config.root = root;
} else {
LOGGER.warn("Singleton was already initialized.");
}
}

public static DestinationConfig getInstance() {
if (config == null) {
throw new IllegalStateException("Singleton not initialized.");
}
return config;
}

public JsonNode getNodeValue(final String key) {
final JsonNode node = config.root.get(key);
if (node == null) {
LOGGER.debug("Cannot find node with key {} ", key);
}
return node;
}

// string value, otherwise empty string
public String getTextValue(final String key) {
final JsonNode node = getNodeValue(key);
if (node == null || !node.isTextual()) {
LOGGER.debug("Cannot retrieve text value for node with key {}", key);
return "";
}
return node.asText();
}

// boolean value, otherwise false
public Boolean getBooleanValue(final String key) {
final JsonNode node = getNodeValue(key);
if (node == null || !node.isBoolean()) {
LOGGER.debug("Cannot retrieve boolean value for node with key {}", key);
return false;
}
return node.asBoolean();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
case WRITE -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE");
// save config to singleton
DestinationConfig.initialize(config);
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);

final Procedure consumeWriteStreamCallable = () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,11 @@ private JavaBaseConstants() {}
public static final String COLUMN_NAME_EMITTED_AT = "_airbyte_emitted_at";
public static final String COLUMN_NAME_DATA = "_airbyte_data";

// destination v2
public static final String COLUMN_NAME_AB_RAW_ID = "_airbyte_raw_id";
public static final String COLUMN_NAME_AB_LOADED_AT = "_airbyte_loaded_at";
public static final String COLUMN_NAME_AB_EXTRACTED_AT = "_airbyte_extracted_at";

public static final String AIRBYTE_NAMESPACE_SCHEMA = "airbyte";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base;

public class TypingAndDedupingFlag {

public static final boolean isDestinationV2() {
return DestinationConfig.getInstance().getBooleanValue("use_1s1t_format");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
Expand Down Expand Up @@ -92,7 +93,13 @@ public class BufferedStreamConsumer extends FailureTrackingAirbyteMessageConsume

private Instant nextFlushDeadline;
private final Duration bufferFlushFrequency;
private final String defaultNamespace;

/**
* Feel free to continue using this in non-1s1t destinations - it may be easier to use. However,
* 1s1t destinations should prefer the version which accepts a {@code defaultNamespace}.
*/
@Deprecated
public BufferedStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
final OnStartFunction onStart,
final BufferingStrategy bufferingStrategy,
Expand All @@ -105,7 +112,27 @@ public BufferedStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollect
onClose,
catalog,
isValidRecord,
Duration.ofMinutes(15));
Duration.ofMinutes(15),
// This is purely for backwards compatibility. Many older destinations handle this internally.
// Starting with Destinations V2, we recommend passing in an explicit namespace.
null);
}

public BufferedStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
final OnStartFunction onStart,
final BufferingStrategy bufferingStrategy,
final OnCloseFunction onClose,
final ConfiguredAirbyteCatalog catalog,
final CheckedFunction<JsonNode, Boolean, Exception> isValidRecord,
final String defaultNamespace) {
this(outputRecordCollector,
onStart,
bufferingStrategy,
onClose,
catalog,
isValidRecord,
Duration.ofMinutes(15),
defaultNamespace);
}

/*
Expand All @@ -119,7 +146,8 @@ public BufferedStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollect
final OnCloseFunction onClose,
final ConfiguredAirbyteCatalog catalog,
final CheckedFunction<JsonNode, Boolean, Exception> isValidRecord,
final Duration flushFrequency) {
final Duration flushFrequency,
final String defaultNamespace) {
this.outputRecordCollector = outputRecordCollector;
this.hasStarted = false;
this.hasClosed = false;
Expand All @@ -132,6 +160,7 @@ public BufferedStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollect
this.bufferingStrategy = bufferingStrategy;
this.stateManager = new DefaultDestStateLifecycleManager();
this.bufferFlushFrequency = flushFrequency;
this.defaultNamespace = defaultNamespace;
}

@Override
Expand All @@ -157,7 +186,11 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception {
Preconditions.checkState(hasStarted, "Cannot accept records until consumer has started");
if (message.getType() == Type.RECORD) {
final AirbyteRecordMessage record = message.getRecord();
final AirbyteStreamNameNamespacePair stream = AirbyteStreamNameNamespacePair.fromRecordMessage(record);
if (Strings.isNullOrEmpty(record.getNamespace())) {
record.setNamespace(defaultNamespace);
}
final AirbyteStreamNameNamespacePair stream;
stream = AirbyteStreamNameNamespacePair.fromRecordMessage(record);

// if stream is not part of list of streams to sync to then throw invalid stream exception
if (!streamNames.contains(stream)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import org.junit.jupiter.api.Test;

public class DestinationConfigTest {

private static final String JSON = """
{
"foo": "bar",
"baz": true
}
""";

private static final JsonNode NODE = Jsons.deserialize(JSON);

@Test
public void testInitialization() {
// bad initialization
assertThrows(IllegalArgumentException.class, () -> DestinationConfig.initialize(null));
assertThrows(IllegalStateException.class, DestinationConfig::getInstance);

// good initialization
DestinationConfig.initialize(NODE);
assertNotNull(DestinationConfig.getInstance());
assertEquals(NODE, DestinationConfig.getInstance().root);

// initializing again doesn't change the config
final JsonNode nodeUnused = Jsons.deserialize("{}");
DestinationConfig.initialize(nodeUnused);
assertEquals(NODE, DestinationConfig.getInstance().root);
}

@Test
public void testValues() {
DestinationConfig.initialize(NODE);

assertEquals("bar", DestinationConfig.getInstance().getTextValue("foo"));
assertEquals("", DestinationConfig.getInstance().getTextValue("baz"));

assertFalse(DestinationConfig.getInstance().getBooleanValue("foo"));
assertTrue(DestinationConfig.getInstance().getBooleanValue("baz"));

// non-existent key
assertEquals("", DestinationConfig.getInstance().getTextValue("blah"));
assertFalse(DestinationConfig.getInstance().getBooleanValue("blah"));

assertEquals(Jsons.deserialize("\"bar\""), DestinationConfig.getInstance().getNodeValue("foo"));
assertEquals(Jsons.deserialize("true"), DestinationConfig.getInstance().getNodeValue("baz"));
assertNull(DestinationConfig.getInstance().getNodeValue("blah"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,8 @@ private BufferedStreamConsumer getConsumerWithFlushFrequency() {
onClose,
CATALOG,
isValidRecord,
Duration.ofSeconds(PERIODIC_BUFFER_FREQUENCY));
Duration.ofSeconds(PERIODIC_BUFFER_FREQUENCY),
null);
return flushFrequencyConsumer;
}

Expand Down
Loading

0 comments on commit 934acaa

Please sign in to comment.