Skip to content

Commit

Permalink
🎉 Destination Snowflake and RedShift: Implement the Byte-buffered log…
Browse files Browse the repository at this point in the history
…ic (#8869)

* airbyte-8336: Byte based approach.

* test-commit

* airbyte-8336: Split file by cnhunks.

* airbyte-8336: Renamed variable.

* airbyte-8336: make snowflake DEFAULT_MAX_BATCH_SIZE_BYTES_SNOWFLAKE constant.

* airbyte-8336: make snowflake DEFAULT_MAX_BATCH_SIZE_BYTES_SNOWFLAKE constant.

* airbyte-8336: make snowflake DEFAULT_MAX_BATCH_SIZE_BYTES_SNOWFLAKE constant.

* airbyte-8336: fix of unit tests

* airbyte-8336: Changed to default buffer size in SnowFlake.

* airbyte-8336: Changed 15 GB to 1 GB for max size.

* airbyte-8336: Changed to default buffer size in SnowFlake.

* airbyte-8336: Bumped connector version.

* airbyte-8336: Bumped connector version.

* airbyte-8336: Bumped connector version.
  • Loading branch information
alexandertsukanov authored Dec 24, 2021
1 parent 0bce8c8 commit eea41b4
Show file tree
Hide file tree
Showing 22 changed files with 97 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,20 @@

package io.airbyte.commons.bytes;

import java.nio.charset.StandardCharsets;

public class ByteUtils {

public static long getSizeInBytes(String s) {
// TODO use a smarter way of estimating byte size rather than always multiply by two
return s.length() * 2; // by default UTF-8 encoding takes 2bytes per char
/**
* Encodes this String into a sequence of bytes using the given charset. UTF-8 is based on 8-bit
* code units. Each character is encoded as 1 to 4 bytes. The first 128 Unicode code points are
* encoded as 1 byte in UTF-8.
*
* @param s - string where charset length will be counted
* @return length of bytes for charset
*/
public static long getSizeInBytesForUTF8CharSet(String s) {
return s.getBytes(StandardCharsets.UTF_8).length;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import static org.junit.jupiter.api.Assertions.*;

import java.nio.charset.StandardCharsets;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.Test;

Expand All @@ -16,7 +17,7 @@ public void testIt() {
for (int i = 1; i < 1000; i++) {
String s = RandomStringUtils.random(i);
// for now the formula is just hardcoded to str length * 2
assertEquals(i * 2, ByteUtils.getSizeInBytes(s));
assertEquals(s.getBytes(StandardCharsets.UTF_8).length, ByteUtils.getSizeInBytesForUTF8CharSet(s), "The bytes length should be equal.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@
- name: Snowflake
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.3.23
dockerImageTag: 0.3.24
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
icon: snowflake.svg
- name: MariaDB ColumnStore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3746,7 +3746,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-snowflake:0.3.23"
- dockerImage: "airbyte/destination-snowflake:0.3.24"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/snowflake"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,9 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception {
// TODO use a more efficient way to compute bytes that doesn't require double serialization (records
// are serialized again when writing to
// the destination
final long messageSizeInBytes = ByteUtils.getSizeInBytes(Jsons.serialize(recordMessage.getData()));
long messageSizeInBytes = ByteUtils.getSizeInBytesForUTF8CharSet(Jsons.serialize(recordMessage.getData()));
if (bufferSizeInBytes + messageSizeInBytes >= maxQueueSizeInBytes) {
LOGGER.info("Flushing buffer...");
flushQueueToDestination();
bufferSizeInBytes = 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ private static List<AirbyteMessage> generateRecords(final long targetSizeInBytes
long bytesCounter = 0;
for (int i = 0;; i++) {
JsonNode payload = Jsons.jsonNode(ImmutableMap.of("id", RandomStringUtils.randomAscii(7), "name", "human " + String.format("%5d", i)));
long sizeInBytes = ByteUtils.getSizeInBytes(Jsons.serialize(payload));
long sizeInBytes = ByteUtils.getSizeInBytesForUTF8CharSet(Jsons.serialize(payload));
bytesCounter += sizeInBytes;
AirbyteMessage airbyteMessage = new AirbyteMessage()
.withType(Type.RECORD)
Expand Down
4 changes: 4 additions & 0 deletions airbyte-integrations/connectors/destination-jdbc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ dependencies {
implementation 'com.github.alexmojaki:s3-stream-upload:2.2.2'
implementation 'com.fasterxml.jackson.core:jackson-databind'

// A small utility library for working with units of digital information
// https://github.com/aesy/datasize
implementation "io.aesy:datasize:1.0.0"

testImplementation "org.testcontainers:postgresql:1.15.3"
testImplementation "org.mockito:mockito-inline:4.1.0"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.destination.jdbc;

import static io.airbyte.integrations.destination.jdbc.constants.GlobalDataSizeConstants.DEFAULT_MAX_BATCH_SIZE_BYTES;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -43,8 +45,6 @@ public class JdbcBufferedConsumerFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(JdbcBufferedConsumerFactory.class);

private static final int MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 1024 / 4; // 256mib

public static AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
final SqlOperations sqlOperations,
Expand All @@ -60,7 +60,7 @@ public static AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outpu
onCloseFunction(database, sqlOperations, writeConfigs),
catalog,
sqlOperations::isValidData,
MAX_BATCH_SIZE_BYTES);
DEFAULT_MAX_BATCH_SIZE_BYTES);
}

private static List<WriteConfig> createWriteConfigs(final NamingConventionTransformer namingResolver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@

package io.airbyte.integrations.destination.jdbc;

import static io.airbyte.integrations.destination.jdbc.constants.GlobalDataSizeConstants.MAX_FILE_SIZE;

/**
* The staging file is uploaded to cloud storage in multiple parts. This class keeps track of the
* filename, and returns a new one when the old file has had enough parts.
*/
public class StagingFilenameGenerator {

private final String streamName;
private final int maxPartsPerFile;

// the file suffix will change after the max number of file
// parts have been generated for the current suffix;
Expand All @@ -21,9 +22,18 @@ public class StagingFilenameGenerator {
// file suffix; its value range will be [1, maxPartsPerFile]
private int currentFileSuffixPartCount = 0;

public StagingFilenameGenerator(final String streamName, final int maxPartsPerFile) {
// This variable is responsible to set the size of chunks size (In MB). After chunks created in
// S3 or GCS they will be uploaded to Snowflake or Redshift. These service have some limitations for the uploading file.
// So we make the calculation to determine how many parts we can put to the single chunk file.
private final long iterations;

/**
* @param streamName - the name of table will be processed
* @param chunkSize - the number of optimal chunk size for the service.
*/
public StagingFilenameGenerator(final String streamName, final long chunkSize) {
this.streamName = streamName;
this.maxPartsPerFile = maxPartsPerFile;
this.iterations = MAX_FILE_SIZE / chunkSize;
}

/**
Expand All @@ -32,7 +42,7 @@ public StagingFilenameGenerator(final String streamName, final int maxPartsPerFi
* maxPartsPerFile.
*/
public String getStagingFilename() {
if (currentFileSuffixPartCount < maxPartsPerFile) {
if (currentFileSuffixPartCount < iterations) {
// when the number of parts for the file has not reached the max,
// keep using the same file (i.e. keep the suffix)
currentFileSuffixPartCount += 1;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.airbyte.integrations.destination.jdbc.constants;

import io.aesy.datasize.ByteUnit.IEC;
import io.aesy.datasize.DataSize;

public interface GlobalDataSizeConstants {
/** 256 MB to BYTES as comparison will be done in BYTES */
int DEFAULT_MAX_BATCH_SIZE_BYTES = DataSize.of(256L, IEC.MEBIBYTE).toUnit(IEC.BYTE).getValue().intValue();
/** This constant determines the max possible size of file(e.g. 1 GB / 256 megabytes ≈ 4 chunks of file)
see StagingFilenameGenerator.java:28
*/
long MAX_FILE_SIZE = DataSize.of(1L, IEC.GIBIBYTE).toUnit(IEC.BYTE).getValue().longValue();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.destination.jdbc.copy;

import static io.airbyte.integrations.destination.jdbc.constants.GlobalDataSizeConstants.DEFAULT_MAX_BATCH_SIZE_BYTES;

import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
Expand All @@ -29,8 +31,6 @@ public class CopyConsumerFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(CopyConsumerFactory.class);

private static final int MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 1024 / 4; // 256 MiB

public static <T> AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
final SqlOperations sqlOperations,
Expand All @@ -56,7 +56,7 @@ public static <T> AirbyteMessageConsumer create(final Consumer<AirbyteMessage> o
onCloseFunction(pairToCopier, database, sqlOperations, pairToIgnoredRecordCount),
catalog,
sqlOperations::isValidData,
MAX_BATCH_SIZE_BYTES);
DEFAULT_MAX_BATCH_SIZE_BYTES);
}

private static <T> Map<AirbyteStreamNameNamespacePair, StreamCopier> createWriteConfigs(final ExtendedNameTransformer namingResolver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.StagingFilenameGenerator;
import io.airbyte.integrations.destination.jdbc.constants.GlobalDataSizeConstants;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.DestinationSyncMode;
Expand Down Expand Up @@ -63,7 +64,7 @@ public abstract class GcsStreamCopier implements StreamCopier {
private final HashMap<String, WriteChannel> channels = new HashMap<>();
private final HashMap<String, CSVPrinter> csvPrinters = new HashMap<>();
private final String stagingFolder;
private final StagingFilenameGenerator filenameGenerator;
protected StagingFilenameGenerator filenameGenerator;

public GcsStreamCopier(final String stagingFolder,
final DestinationSyncMode destSyncMode,
Expand All @@ -84,7 +85,7 @@ public GcsStreamCopier(final String stagingFolder,
this.tmpTableName = nameTransformer.getTmpTableName(streamName);
this.storageClient = storageClient;
this.gcsConfig = gcsConfig;
this.filenameGenerator = new StagingFilenameGenerator(streamName, MAX_PARTS_PER_FILE);
this.filenameGenerator = new StagingFilenameGenerator(streamName, GlobalDataSizeConstants.DEFAULT_MAX_BATCH_SIZE_BYTES);
}

private String prepareGcsStagingFile() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.StagingFilenameGenerator;
import io.airbyte.integrations.destination.jdbc.constants.GlobalDataSizeConstants;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.AirbyteRecordMessage;
Expand Down Expand Up @@ -43,14 +44,6 @@ public abstract class LegacyS3StreamCopier implements StreamCopier {

private static final int DEFAULT_UPLOAD_THREADS = 10; // The S3 cli uses 10 threads by default.
private static final int DEFAULT_QUEUE_CAPACITY = DEFAULT_UPLOAD_THREADS;
// It is optimal to write every 10,000,000 records (BATCH_SIZE * DEFAULT_PART) to a new file.
// The BATCH_SIZE is defined in CopyConsumerFactory.
// The average size of such a file will be about 1 GB.
// This will make it easier to work with files and speed up the recording of large amounts of data.
// In addition, for a large number of records, we will not get a drop in the copy request to
// QUERY_TIMEOUT when
// the records from the file are copied to the staging table.
public static final int MAX_PARTS_PER_FILE = 1000;

protected final AmazonS3 s3Client;
protected final S3DestinationConfig s3Config;
Expand Down Expand Up @@ -87,7 +80,7 @@ public LegacyS3StreamCopier(final String stagingFolder,
this.tmpTableName = nameTransformer.getTmpTableName(streamName);
this.s3Client = client;
this.s3Config = s3Config;
this.filenameGenerator = new StagingFilenameGenerator(streamName, MAX_PARTS_PER_FILE);
this.filenameGenerator = new StagingFilenameGenerator(streamName, GlobalDataSizeConstants.DEFAULT_MAX_BATCH_SIZE_BYTES);
}

private String prepareS3StagingFile() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public abstract class S3StreamCopier implements StreamCopier {
* be up to 256 MiB (see CopyConsumerFactory#MAX_BATCH_SIZE_BYTES). For example, Redshift
* recommends at most 1 GiB per file, so you would want maxPartsPerFile = 4 (because 4 *
* 256MiB = 1 GiB).
* {@link io.airbyte.integrations.destination.jdbc.constants.GlobalDataSizeConstants#DEFAULT_MAX_BATCH_SIZE_BYTES}
*/
public S3StreamCopier(final String stagingFolder,
final String schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,31 @@

import static org.junit.jupiter.api.Assertions.*;

import io.aesy.datasize.ByteUnit.IEC;
import io.aesy.datasize.DataSize;
import io.airbyte.integrations.destination.jdbc.constants.GlobalDataSizeConstants;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.Test;

class StagingFilenameGeneratorTest {

private static final String STREAM_NAME = RandomStringUtils.randomAlphabetic(5).toLowerCase();
private static final int MAX_PARTS_PER_FILE = 3;
// Equal to GlobalDataSizeConstants.MAX_BYTE_PARTS_PER_FILE / GlobalDataSizeConstants.DEFAULT_MAX_BATCH_SIZE_BYTES
// because <insert explanation here>
private static final int EXPECTED_ITERATIONS_WITH_STANDARD_BYTE_BUFFER = 4;
private static final StagingFilenameGenerator FILENAME_GENERATOR =
new StagingFilenameGenerator(STREAM_NAME, MAX_PARTS_PER_FILE);
new StagingFilenameGenerator(STREAM_NAME, GlobalDataSizeConstants.DEFAULT_MAX_BATCH_SIZE_BYTES);

@Test
public void testGetStagingFilename() {
// the file suffix increments after every MAX_PARTS_PER_FILE method calls
for (int suffix = 0; suffix < 10; ++suffix) {
for (int part = 0; part < MAX_PARTS_PER_FILE; ++part) {
for (long part = 0; part < EXPECTED_ITERATIONS_WITH_STANDARD_BYTE_BUFFER; ++part) {
assertEquals(STREAM_NAME + "_0000" + suffix, FILENAME_GENERATOR.getStagingFilename());
}
}
for (int suffix = 10; suffix < 20; ++suffix) {
for (int part = 0; part < MAX_PARTS_PER_FILE; ++part) {
for (int part = 0; part < EXPECTED_ITERATIONS_WITH_STANDARD_BYTE_BUFFER; ++part) {
assertEquals(STREAM_NAME + "_000" + suffix, FILENAME_GENERATOR.getStagingFilename());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class LegacyS3StreamCopierTest {

private static final int PART_SIZE = 5;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final int EXPECTED_ITERATIONS_WITH_STANDARD_BYTE_BUFFER = 4;

private AmazonS3Client s3Client;
private JdbcDatabase db;
Expand Down Expand Up @@ -158,7 +159,7 @@ public void teardown() {
public void createSequentialStagingFiles_when_multipleFilesRequested() {
// When we call prepareStagingFile() the first time, it should create exactly one upload manager.
// The next (MAX_PARTS_PER_FILE - 1) invocations should reuse that same upload manager.
for (var i = 0; i < LegacyS3StreamCopier.MAX_PARTS_PER_FILE; i++) {
for (var i = 0; i < EXPECTED_ITERATIONS_WITH_STANDARD_BYTE_BUFFER; i++) {
final String file = copier.prepareStagingFile();
assertEquals("fake-staging-folder/fake-schema/fake-stream_00000", file, "preparing file number " + i);
final List<StreamTransferManager> firstManagers = streamTransferManagerMockedConstruction.constructed();
Expand Down Expand Up @@ -211,7 +212,7 @@ public void deletesStagingFiles() throws Exception {
@Test
public void writesContentsCorrectly() throws Exception {
final String file1 = copier.prepareStagingFile();
for (int i = 0; i < LegacyS3StreamCopier.MAX_PARTS_PER_FILE - 1; i++) {
for (int i = 0; i < EXPECTED_ITERATIONS_WITH_STANDARD_BYTE_BUFFER; i++) {
copier.prepareStagingFile();
}
copier.write(
Expand Down Expand Up @@ -257,7 +258,7 @@ public void writesContentsCorrectly() throws Exception {
@Test
public void copiesCorrectFilesToTable() throws Exception {
// Generate two files
for (int i = 0; i < LegacyS3StreamCopier.MAX_PARTS_PER_FILE + 1; i++) {
for (int i = 0; i < 5; i++) {
copier.prepareStagingFile();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.23
LABEL io.airbyte.version=0.3.24
LABEL io.airbyte.name=airbyte/destination-snowflake
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies {
implementation 'net.snowflake:snowflake-jdbc:3.13.9'
implementation 'org.apache.commons:commons-csv:1.4'
implementation 'com.github.alexmojaki:s3-stream-upload:2.2.2'
implementation "io.aesy:datasize:1.0.0"

implementation project(':airbyte-config:models')
implementation project(':airbyte-db:lib')
Expand All @@ -37,6 +38,7 @@ dependencies {
implementation project(':airbyte-integrations:connectors:destination-s3')
implementation project(':airbyte-protocol:models')


integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-snowflake')
integrationTestJavaImplementation 'org.apache.commons:commons-lang3:3.11'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.StagingFilenameGenerator;
import io.airbyte.integrations.destination.jdbc.copy.gcs.GcsConfig;
import io.airbyte.integrations.destination.jdbc.copy.gcs.GcsStreamCopier;
import io.airbyte.protocol.models.DestinationSyncMode;
Expand All @@ -23,8 +24,10 @@ public SnowflakeGcsStreamCopier(final String stagingFolder,
final JdbcDatabase db,
final GcsConfig gcsConfig,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations) {
final SqlOperations sqlOperations,
final StagingFilenameGenerator stagingFilenameGenerator) {
super(stagingFolder, destSyncMode, schema, streamName, storageClient, db, gcsConfig, nameTransformer, sqlOperations);
this.filenameGenerator = stagingFilenameGenerator;
}

@Override
Expand Down
Loading

0 comments on commit eea41b4

Please sign in to comment.