Skip to content

Commit

Permalink
Destination Redshift: Introduces configurable value for file buffer c…
Browse files Browse the repository at this point in the history
…ount (#20879)

* Increased default buffer count and introduces configurable value for destination redshift

* Updates logic to ensure filebuffers are at minimum the default number

* Reverted changes to increase the file buffer default but allow for users to increase with adequate warnings

* Bumps version number and updates changelog

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
ryankfu and octavia-squidington-iii authored Jan 3, 2023
1 parent 2022f7d commit 64254a4
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@
- name: Redshift
destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.3.51
dockerImageTag: 0.3.52
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
icon: redshift.svg
normalizationConfig:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4941,7 +4941,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-redshift:0.3.51"
- dockerImage: "airbyte/destination-redshift:0.3.52"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/redshift"
connectionSpecification:
Expand Down Expand Up @@ -5155,6 +5155,19 @@
\ or 256 bits. Leave blank to have Airbyte generate an ephemeral\
\ key for each sync."
airbyte_secret: true
file_buffer_size:
title: "File Buffer Count"
type: "integer"
minimum: 15
maximum: 50
default: 15
description: "Number of file buffers allocated for writing data. Increasing\
\ this number is beneficial for connections using Change Data Capture\
\ (CDC) and up to the number of streams within a connection. Increasing\
\ the number of file buffers past the maximum number of streams\
\ has deteriorating effects"
examples:
- "15"
supportsIncremental: true
supportsNormalization: true
supportsDBT: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,33 @@ public class FileBuffer implements BufferStorage {
// "To optimize the number of parallel operations for a load,
// we recommend aiming to produce data files roughly 100-250 MB (or larger) in size compressed."
public static final long MAX_PER_STREAM_BUFFER_SIZE_BYTES = 200 * 1024 * 1024; // mb
// Other than the per-file size limit, we also limit the total size (which would limit how many
// concurrent streams we can buffer simultaneously too)
// Since this class is storing data on disk, the buffer size limits below are tied to the
// necessary disk storage space.
/*
* Other than the per-file size limit, we also limit the total size (which would limit how many
* concurrent streams we can buffer simultaneously too) Since this class is storing data on disk,
* the buffer size limits below are tied to the necessary disk storage space.
*/
public static final long MAX_TOTAL_BUFFER_SIZE_BYTES = 1024 * 1024 * 1024; // mb
// we limit number of stream being buffered simultaneously anyway (limit how many files are
// stored/open for writing)
/*
* We limit number of stream being buffered simultaneously anyway (limit how many files are
* stored/open for writing)
*
* Note: This value can be tuned to increase performance with the tradeoff of increased memory usage
* (~31 MB per buffer). See {@link StreamTransferManager}
*
* For connections with interleaved data (e.g. Change Data Capture), having less buffers than the
* number of streams being synced will cause buffer thrashing where buffers will need to be flushed
* before another stream's buffer can be created. Increasing the default max will reduce likelihood
* of thrashing but not entirely eliminate unless number of buffers equals streams to be synced
*/
public static final int DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER = 10;
public static final String FILE_BUFFER_COUNT_KEY = "file_buffer_count";
// This max is subject to change as no proper load testing has been done to verify the side effects
public static final int MAX_CONCURRENT_STREAM_IN_BUFFER = 50;
/*
* Use this soft cap as a guidance for customers to not exceed the recommended number of buffers
* which is 1 GB (total buffer size) / 31 MB (rough size of each buffer) ~= 32 buffers
*/
public static final int SOFT_CAP_CONCURRENT_STREAM_IN_BUFFER = 20;

private final String fileExtension;
private File tempFile;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.51
LABEL io.airbyte.version=0.3.52
LABEL io.airbyte.name=airbyte/destination-redshift
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
package io.airbyte.integrations.destination.redshift;

import static io.airbyte.integrations.base.errors.messages.ErrorMessage.getErrorMessage;
import static io.airbyte.integrations.destination.redshift.constants.RedshiftDestinationConstants.UPLOADING_METHOD;
import static io.airbyte.integrations.destination.redshift.RedshiftInsertDestination.SSL_JDBC_PARAMETERS;
import static io.airbyte.integrations.destination.redshift.RedshiftInsertDestination.getJdbcConfig;
import static io.airbyte.integrations.destination.redshift.util.RedshiftUtil.findS3Options;
import static io.airbyte.integrations.destination.s3.S3DestinationConfig.getS3DestinationConfig;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.factory.DataSourceFactory;
Expand Down Expand Up @@ -60,7 +62,7 @@ private boolean isEphemeralKeysAndPurgingStagingData(final JsonNode config, fina
public AirbyteConnectionStatus check(final JsonNode config) {
final S3DestinationConfig s3Config = getS3DestinationConfig(findS3Options(config));
final EncryptionConfig encryptionConfig =
config.has("uploading_method") ? EncryptionConfig.fromJson(config.get("uploading_method").get(JdbcUtils.ENCRYPTION_KEY)) : new NoEncryption();
config.has(UPLOADING_METHOD) ? EncryptionConfig.fromJson(config.get(UPLOADING_METHOD).get(JdbcUtils.ENCRYPTION_KEY)) : new NoEncryption();
if (isEphemeralKeysAndPurgingStagingData(config, encryptionConfig)) {
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
Expand All @@ -76,7 +78,7 @@ public AirbyteConnectionStatus check(final JsonNode config) {
final DataSource dataSource = getDataSource(config);
try {
final JdbcDatabase database = new DefaultJdbcDatabase(dataSource);
final String outputSchema = super.getNamingResolver().getIdentifier(config.get("schema").asText());
final String outputSchema = super.getNamingResolver().getIdentifier(config.get(JdbcUtils.SCHEMA_KEY).asText());
attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, redshiftS3StagingSqlOperations);
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
} catch (final ConnectionErrorException e) {
Expand Down Expand Up @@ -131,20 +133,51 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final EncryptionConfig encryptionConfig =
config.has("uploading_method") ? EncryptionConfig.fromJson(config.get("uploading_method").get(JdbcUtils.ENCRYPTION_KEY)) : new NoEncryption();
config.has(UPLOADING_METHOD) ?
EncryptionConfig.fromJson(config.get(UPLOADING_METHOD).get(JdbcUtils.ENCRYPTION_KEY)) : new NoEncryption();
final JsonNode s3Options = findS3Options(config);
final S3DestinationConfig s3Config = getS3DestinationConfig(s3Options);
final int numberOfFileBuffers = getNumberOfFileBuffers(s3Options);

if (numberOfFileBuffers > FileBuffer.SOFT_CAP_CONCURRENT_STREAM_IN_BUFFER) {
LOGGER.warn("""
Increasing the number of file buffers past {} can lead to increased performance but
leads to increased memory usage. If the number of file buffers exceeds the number
of streams {} this will create more buffers than necessary, leading to nonexistent gains
""", FileBuffer.SOFT_CAP_CONCURRENT_STREAM_IN_BUFFER, catalog.getStreams().size());
}

return new StagingConsumerFactory().create(
outputRecordCollector,
getDatabase(getDataSource(config)),
new RedshiftS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config, encryptionConfig),
getNamingResolver(),
CsvSerializedBuffer.createFunction(null, () -> new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX)),
CsvSerializedBuffer.createFunction(null, () -> new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX, numberOfFileBuffers)),
config,
catalog,
isPurgeStagingData(s3Options));
}

/**
* Retrieves user configured file buffer amount so as long it doesn't exceed the maximum number
* of file buffers and sets the minimum number to the default
*
* NOTE: If Out Of Memory Exceptions (OOME) occur, this can be a likely cause as this hard limit
* has not been thoroughly load tested across all instance sizes
*
* @param config user configurations
* @return number of file buffers if configured otherwise default
*/
@VisibleForTesting
public int getNumberOfFileBuffers(final JsonNode config) {
int numOfFileBuffers = 1;
if (config.has(FileBuffer.FILE_BUFFER_COUNT_KEY)) {
numOfFileBuffers = Math.min(config.get(FileBuffer.FILE_BUFFER_COUNT_KEY).asInt(), FileBuffer.MAX_CONCURRENT_STREAM_IN_BUFFER);
}
// Only allows for values 10 <= numOfFileBuffers <= 50
return Math.max(numOfFileBuffers, FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER);
}

private boolean isPurgeStagingData(final JsonNode config) {
return !config.has("purge_staging_data") || config.get("purge_staging_data").asBoolean();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ public class RedshiftUtil {

private RedshiftUtil() {}

// We check whether config located in root of node. (This check is done for Backward compatibility)
/**
* We check whether config located in root of node. (This check is done for Backward compatibility)
*
* @param config Configuration parameters
* @return JSON representation of the configuration
*/
public static JsonNode findS3Options(final JsonNode config) {
return config.has(UPLOADING_METHOD) ? config.get(UPLOADING_METHOD) : config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,15 @@
}
}
]
},
"file_buffer_size": {
"title": "File Buffer Count",
"type": "integer",
"minimum": 15,
"maximum": 50,
"default": 15,
"description": "Number of file buffers allocated for writing data. Increasing this number is beneficial for connections using Change Data Capture (CDC) and up to the number of streams within a connection. Increasing the number of file buffers past the maximum number of streams has deteriorating effects",
"examples": ["15"]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations;
import io.airbyte.integrations.standardtest.destination.JdbcDestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
Expand Down Expand Up @@ -111,6 +112,33 @@ public void testCheckIncorrectDataBaseFailure() throws Exception {
assertTrue(status.getMessage().contains("State code: 3D000; Error code: 500310;"));
}

/*
* FileBuffer Default Tests
*/
@Test
public void testGetFileBufferDefault() {
final RedshiftStagingS3Destination destination = new RedshiftStagingS3Destination();
assertEquals(destination.getNumberOfFileBuffers(config),
FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER);
}

@Test
public void testGetFileBufferMaxLimited() {
final JsonNode defaultConfig = Jsons.clone(config);
((ObjectNode) defaultConfig).put(FileBuffer.FILE_BUFFER_COUNT_KEY, 100);
final RedshiftStagingS3Destination destination = new RedshiftStagingS3Destination();
assertEquals(destination.getNumberOfFileBuffers(defaultConfig), FileBuffer.MAX_CONCURRENT_STREAM_IN_BUFFER);
}

@Test
public void testGetMinimumFileBufferCount() {
final JsonNode defaultConfig = Jsons.clone(config);
((ObjectNode) defaultConfig).put(FileBuffer.FILE_BUFFER_COUNT_KEY, 1);
final RedshiftStagingS3Destination destination = new RedshiftStagingS3Destination();
// User cannot set number of file counts below the default file buffer count, which is existing behavior
assertEquals(destination.getNumberOfFileBuffers(defaultConfig), FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER);
}

@Override
protected TestDataComparator getTestDataComparator() {
return new RedshiftTestDataComparator();
Expand Down
9 changes: 5 additions & 4 deletions docs/integrations/destinations/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,11 @@ Each stream will be output into its own raw table in Redshift. Each table will c

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.3.51 | 2022-10-26 | [\#18434](https://github.com/airbytehq/airbyte/pull/18434) | Fix empty S3 bucket path handling |
| 0.3.50 | 2022-09-14 | [\#15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage |
| 0.3.49 | 2022-09-01 | [\#16243](https://github.com/airbytehq/airbyte/pull/16243) | Fix Json to Avro conversion when there is field name clash from combined restrictions (`anyOf`, `oneOf`, `allOf` fields) |
| 0.3.48 | 2022-09-01 | | Added JDBC URL params |
| 0.3.52 | 2022-12-30 | [\#20879](https://github.com/airbytehq/airbyte/pull/20879) | Added configurable parameter for number of file buffers |
| 0.3.51 | 2022-10-26 | [\#18434](https://github.com/airbytehq/airbyte/pull/18434) | Fix empty S3 bucket path handling |
| 0.3.50 | 2022-09-14 | [\#15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage |
| 0.3.49 | 2022-09-01 | [\#16243](https://github.com/airbytehq/airbyte/pull/16243) | Fix Json to Avro conversion when there is field name clash from combined restrictions (`anyOf`, `oneOf`, `allOf` fields) |
| 0.3.48 | 2022-09-01 | | Added JDBC URL params |
| 0.3.47 | 2022-07-15 | [\#14494](https://github.com/airbytehq/airbyte/pull/14494) | Make S3 output filename configurable. |
| 0.3.46 | 2022-06-27 | [\#14190](https://github.com/airbytehq/airbyte/pull/13916) | Correctly cleanup S3 bucket when using a configured bucket path for S3 staging operations. |
| 0.3.45 | 2022-06-25 | [\#13916](https://github.com/airbytehq/airbyte/pull/13916) | Use the configured bucket path for S3 staging operations. |
Expand Down

0 comments on commit 64254a4

Please sign in to comment.