Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination bigquery: rerelease 1s1t behind gate #27936

Merged
merged 30 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
0ed9c0b
Revert "Revert "Destination Bigquery: Scaffolding for destinations v2…
edgao Jul 3, 2023
308422e
version bumps+changelog
edgao Jul 3, 2023
151e675
Speed up BQ by having 2 queries, and not an OR (#27981)
evantahler Jul 5, 2023
6196133
🐛 Destination Bigquery: fix bug in standard inserts for syncs >10K re…
edgao Jul 5, 2023
452df77
Destinations V2: handle optional fields for `object` and `array` type…
cynthiaxyin Jul 5, 2023
c915af0
Automated Commit - Formatting Changes
cynthiaxyin Jul 5, 2023
8969f73
remove todo
edgao Jul 6, 2023
3ca51b6
destination bigquery: misc updates to 1s1t code (#28057)
edgao Jul 7, 2023
57422c1
more type-parsing fixes (#28100)
edgao Jul 10, 2023
2a63e8b
Automated Commit - Formatting Changes
edgao Jul 10, 2023
353db7b
Merge branch 'master' into edgao/1s1t_redeploy
edgao Jul 10, 2023
a65eff3
Improve protocol type parsing (#28126)
cynthiaxyin Jul 10, 2023
9660632
Automated Commit - Formatting Changes
cynthiaxyin Jul 10, 2023
2dc7d77
Change from T&D every 10k records to an increasing time based interva…
jbfbell Jul 11, 2023
7efc294
Simplify and speed up CDC delete support [DestinationsV2] (#28029)
evantahler Jul 11, 2023
57b54ec
update method name
jbfbell Jul 11, 2023
aa9b318
Automated Commit - Formatting Changes
jbfbell Jul 11, 2023
a16881e
Merge branch 'master' into edgao/1s1t_redeploy
edgao Jul 11, 2023
7d23572
Merge branch 'master' into edgao/1s1t_redeploy
edgao Jul 12, 2023
610ea35
Merge branch 'master' into edgao/1s1t_redeploy
edgao Jul 12, 2023
8792461
depend on pinned normalization version
edgao Jul 12, 2023
344908f
Merge branch 'master' into edgao/1s1t_redeploy
edgao Jul 12, 2023
f2da68c
Merge branch 'master' into edgao/1s1t_redeploy
edgao Jul 13, 2023
bf65992
implement 1s1t DATs for destination-bigquery (#27852)
edgao Jul 13, 2023
1b10cdb
Destinations V2: move create raw tables earlier (#28255)
cynthiaxyin Jul 13, 2023
f8cd863
stop building normalization (#28256)
edgao Jul 13, 2023
5a93265
Merge branch 'master' into edgao/1s1t_redeploy
edgao Jul 13, 2023
aa7b029
fix ability to run tests
edgao Jul 13, 2023
ae9a5b5
disable incremental t+d for now
edgao Jul 14, 2023
105517f
Automated Commit - Formatting Changes
edgao Jul 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.jackson.MoreMappers;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.TypingAndDedupingFlag;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import java.util.UUID;
import org.apache.avro.Schema;
Expand All @@ -32,8 +33,14 @@ public AvroRecordFactory(final Schema schema, final JsonAvroConverter converter)

public GenericData.Record getAvroRecord(final UUID id, final AirbyteRecordMessage recordMessage) throws JsonProcessingException {
final ObjectNode jsonRecord = MAPPER.createObjectNode();
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_ID, id.toString());
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, recordMessage.getEmittedAt());
if (TypingAndDedupingFlag.isDestinationV2()) {
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, id.toString());
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, recordMessage.getEmittedAt());
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, (Long) null);
} else {
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_ID, id.toString());
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, recordMessage.getEmittedAt());
}
jsonRecord.setAll((ObjectNode) recordMessage.getData());

return converter.convertToGenericDataRecord(WRITER.writeValueAsBytes(jsonRecord), schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.DestinationConfig;
import io.airbyte.integrations.destination.record_buffer.BufferStorage;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.record_buffer.InMemoryBuffer;
Expand All @@ -27,6 +28,7 @@
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class AvroSerializedBufferTest {
Expand All @@ -49,6 +51,11 @@ public class AvroSerializedBufferTest {
Field.of("nested_column", JsonSchemaType.OBJECT));
private static final ConfiguredAirbyteCatalog catalog = CatalogHelpers.createConfiguredAirbyteCatalog(STREAM, null, FIELDS);

@BeforeAll
public static void setup() {
DestinationConfig.initialize(Jsons.deserialize("{}"));
}

@Test
public void testSnappyAvroWriter() throws Exception {
final S3AvroFormatConfig config = new S3AvroFormatConfig(Jsons.jsonNode(Map.of("compression_codec", Map.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.amazonaws.util.IOUtils;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.DestinationConfig;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.Field;
Expand All @@ -33,6 +34,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class ParquetSerializedBufferTest {
Expand Down Expand Up @@ -60,6 +62,11 @@ public class ParquetSerializedBufferTest {
Field.of("datetime_with_timezone", JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE));
private static final ConfiguredAirbyteCatalog catalog = CatalogHelpers.createConfiguredAirbyteCatalog(STREAM, null, FIELDS);

@BeforeAll
public static void setup() {
DestinationConfig.initialize(Jsons.deserialize("{}"));
}

@Test
public void testUncompressedParquetWriter() throws Exception {
final S3DestinationConfig config = S3DestinationConfig.getS3DestinationConfig(Jsons.jsonNode(Map.of(
Expand Down
26 changes: 25 additions & 1 deletion airbyte-integrations/bases/base-java/run_with_normalization.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,34 @@ set -o pipefail
destination_exit_code=$?
echo '{"type": "LOG","log":{"level":"INFO","message":"Destination process done (exit code '"$destination_exit_code"')"}}'

# store original args
args=$@

while [ $# -ne 0 ]; do
case "$1" in
--config)
CONFIG_FILE="$2"
shift 2
;;
*)
# move on
shift
;;
esac
done

# restore original args after shifts
set -- $args

USE_1S1T_FORMAT="false"
if [[ -s "$CONFIG_FILE" ]]; then
USE_1S1T_FORMAT=$(jq -r '.use_1s1t_format' "$CONFIG_FILE")
fi

if test "$1" != 'write'
then
normalization_exit_code=0
elif test "$NORMALIZATION_TECHNIQUE" = 'LEGACY'
elif test "$NORMALIZATION_TECHNIQUE" = 'LEGACY' && test "$USE_1S1T_FORMAT" != "true"
then
echo '{"type": "LOG","log":{"level":"INFO","message":"Starting in-connector normalization"}}'
# the args in a write command are `write --catalog foo.json --config bar.json`
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Singleton of destination config for easy lookup of values.
*/
@Singleton
public class DestinationConfig {

private static final Logger LOGGER = LoggerFactory.getLogger(DestinationConfig.class);

private static DestinationConfig config;

@VisibleForTesting
protected JsonNode root;

private DestinationConfig() {}

public static void initialize(final JsonNode root) {
if (config == null) {
if (root == null) {
throw new IllegalArgumentException("Cannot create DestinationConfig from null.");
}
config = new DestinationConfig();
config.root = root;
} else {
LOGGER.warn("Singleton was already initialized.");
}
}

public static DestinationConfig getInstance() {
if (config == null) {
throw new IllegalStateException("Singleton not initialized.");
}
return config;
}

public JsonNode getNodeValue(final String key) {
final JsonNode node = config.root.get(key);
if (node == null) {
LOGGER.debug("Cannot find node with key {} ", key);
}
return node;
}

// string value, otherwise empty string
public String getTextValue(final String key) {
final JsonNode node = getNodeValue(key);
if (node == null || !node.isTextual()) {
LOGGER.debug("Cannot retrieve text value for node with key {}", key);
return "";
}
return node.asText();
}

// boolean value, otherwise false
public Boolean getBooleanValue(final String key) {
final JsonNode node = getNodeValue(key);
if (node == null || !node.isBoolean()) {
LOGGER.debug("Cannot retrieve boolean value for node with key {}", key);
return false;
}
return node.asBoolean();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
case WRITE -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE");
// save config to singleton
DestinationConfig.initialize(config);
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);

final Procedure consumeWriteStreamCallable = () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,11 @@ private JavaBaseConstants() {}
public static final String COLUMN_NAME_EMITTED_AT = "_airbyte_emitted_at";
public static final String COLUMN_NAME_DATA = "_airbyte_data";

// destination v2
public static final String COLUMN_NAME_AB_RAW_ID = "_airbyte_raw_id";
public static final String COLUMN_NAME_AB_LOADED_AT = "_airbyte_loaded_at";
public static final String COLUMN_NAME_AB_EXTRACTED_AT = "_airbyte_extracted_at";

public static final String AIRBYTE_NAMESPACE_SCHEMA = "airbyte";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base;

public class TypingAndDedupingFlag {

public static final boolean isDestinationV2() {
return DestinationConfig.getInstance().getBooleanValue("use_1s1t_format");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
Expand Down Expand Up @@ -92,7 +93,13 @@ public class BufferedStreamConsumer extends FailureTrackingAirbyteMessageConsume

private Instant nextFlushDeadline;
private final Duration bufferFlushFrequency;
private final String defaultNamespace;

/**
* Feel free to continue using this in non-1s1t destinations - it may be easier to use. However,
* 1s1t destinations should prefer the version which accepts a {@code defaultNamespace}.
*/
@Deprecated
public BufferedStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
final OnStartFunction onStart,
final BufferingStrategy bufferingStrategy,
Expand All @@ -105,7 +112,27 @@ public BufferedStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollect
onClose,
catalog,
isValidRecord,
Duration.ofMinutes(15));
Duration.ofMinutes(15),
// This is purely for backwards compatibility. Many older destinations handle this internally.
// Starting with Destinations V2, we recommend passing in an explicit namespace.
null);
}

public BufferedStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
final OnStartFunction onStart,
final BufferingStrategy bufferingStrategy,
final OnCloseFunction onClose,
final ConfiguredAirbyteCatalog catalog,
final CheckedFunction<JsonNode, Boolean, Exception> isValidRecord,
final String defaultNamespace) {
this(outputRecordCollector,
onStart,
bufferingStrategy,
onClose,
catalog,
isValidRecord,
Duration.ofMinutes(15),
defaultNamespace);
}

/*
Expand All @@ -119,7 +146,8 @@ public BufferedStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollect
final OnCloseFunction onClose,
final ConfiguredAirbyteCatalog catalog,
final CheckedFunction<JsonNode, Boolean, Exception> isValidRecord,
final Duration flushFrequency) {
final Duration flushFrequency,
final String defaultNamespace) {
this.outputRecordCollector = outputRecordCollector;
this.hasStarted = false;
this.hasClosed = false;
Expand All @@ -132,6 +160,7 @@ public BufferedStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollect
this.bufferingStrategy = bufferingStrategy;
this.stateManager = new DefaultDestStateLifecycleManager();
this.bufferFlushFrequency = flushFrequency;
this.defaultNamespace = defaultNamespace;
}

@Override
Expand All @@ -157,7 +186,11 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception {
Preconditions.checkState(hasStarted, "Cannot accept records until consumer has started");
if (message.getType() == Type.RECORD) {
final AirbyteRecordMessage record = message.getRecord();
final AirbyteStreamNameNamespacePair stream = AirbyteStreamNameNamespacePair.fromRecordMessage(record);
if (Strings.isNullOrEmpty(record.getNamespace())) {
record.setNamespace(defaultNamespace);
}
final AirbyteStreamNameNamespacePair stream;
stream = AirbyteStreamNameNamespacePair.fromRecordMessage(record);

// if stream is not part of list of streams to sync to then throw invalid stream exception
if (!streamNames.contains(stream)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import org.junit.jupiter.api.Test;

public class DestinationConfigTest {

private static final String JSON = """
{
"foo": "bar",
"baz": true
}
""";

private static final JsonNode NODE = Jsons.deserialize(JSON);

@Test
public void testInitialization() {
// bad initialization
assertThrows(IllegalArgumentException.class, () -> DestinationConfig.initialize(null));
assertThrows(IllegalStateException.class, DestinationConfig::getInstance);

// good initialization
DestinationConfig.initialize(NODE);
assertNotNull(DestinationConfig.getInstance());
assertEquals(NODE, DestinationConfig.getInstance().root);

// initializing again doesn't change the config
final JsonNode nodeUnused = Jsons.deserialize("{}");
DestinationConfig.initialize(nodeUnused);
assertEquals(NODE, DestinationConfig.getInstance().root);
}

@Test
public void testValues() {
DestinationConfig.initialize(NODE);

assertEquals("bar", DestinationConfig.getInstance().getTextValue("foo"));
assertEquals("", DestinationConfig.getInstance().getTextValue("baz"));

assertFalse(DestinationConfig.getInstance().getBooleanValue("foo"));
assertTrue(DestinationConfig.getInstance().getBooleanValue("baz"));

// non-existent key
assertEquals("", DestinationConfig.getInstance().getTextValue("blah"));
assertFalse(DestinationConfig.getInstance().getBooleanValue("blah"));

assertEquals(Jsons.deserialize("\"bar\""), DestinationConfig.getInstance().getNodeValue("foo"));
assertEquals(Jsons.deserialize("true"), DestinationConfig.getInstance().getNodeValue("baz"));
assertNull(DestinationConfig.getInstance().getNodeValue("blah"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,8 @@ private BufferedStreamConsumer getConsumerWithFlushFrequency() {
onClose,
CATALOG,
isValidRecord,
Duration.ofSeconds(PERIODIC_BUFFER_FREQUENCY));
Duration.ofSeconds(PERIODIC_BUFFER_FREQUENCY),
null);
return flushFrequencyConsumer;
}

Expand Down
Loading