Skip to content

Commit

Permalink
🐛 Destination Redshift: Fix DATs blind spot - connection check (#13090)
Browse files Browse the repository at this point in the history
* airbyte-12459: Removed RedshiftDataTmpTableMode.java as not needed enum. Code refactoring and clean-up.

* airbyte-12459: Removed RedshiftDataTmpTableMode.java as not needed enum. Code refactoring and clean-up.

* airbyte-12459: Merged master.

* airbyte-12459: Merged master.

* auto-bump connector version

Co-authored-by: Oleksandr Sheheda <alexandr-shegeda@users.noreply.github.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
3 people authored May 25, 2022
1 parent 524605c commit 5371f87
Show file tree
Hide file tree
Showing 12 changed files with 49 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@
- name: Redshift
destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.3.35
dockerImageTag: 0.3.37
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
icon: redshift.svg
resourceRequirements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3619,7 +3619,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-redshift:0.3.35"
- dockerImage: "airbyte/destination-redshift:0.3.37"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.36
LABEL io.airbyte.version=0.3.37
LABEL io.airbyte.name=airbyte/destination-redshift
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.destination.jdbc.copy.SwitchingDestination;
import io.airbyte.integrations.destination.redshift.enums.RedshiftDataTmpTableMode;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -27,8 +26,8 @@ public class RedshiftDestination extends SwitchingDestination<RedshiftDestinatio
private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftDestination.class);

enum DestinationType {
INSERT_WITH_SUPER_TMP_TYPE,
COPY_S3_WITH_SUPER_TMP_TYPE
STANDARD,
COPY_S3
}

public RedshiftDestination() {
Expand All @@ -41,8 +40,8 @@ public static DestinationType getTypeFromConfig(final JsonNode config) {

public static Map<DestinationType, Destination> getTypeToDestination() {
return Map.of(
DestinationType.INSERT_WITH_SUPER_TMP_TYPE, new RedshiftInsertDestination(RedshiftDataTmpTableMode.SUPER),
DestinationType.COPY_S3_WITH_SUPER_TMP_TYPE, new RedshiftStagingS3Destination(RedshiftDataTmpTableMode.SUPER));
DestinationType.STANDARD, new RedshiftInsertDestination(),
DestinationType.COPY_S3, new RedshiftStagingS3Destination());
}

public static DestinationType determineUploadMode(final JsonNode config) {
Expand All @@ -55,14 +54,14 @@ public static DestinationType determineUploadMode(final JsonNode config) {
&& isNullOrEmpty(secretAccessKeyNode)) {
LOGGER.warn("The \"standard\" upload mode is not performant, and is not recommended for production. " +
"Please use the Amazon S3 upload mode if you are syncing a large amount of data.");
return DestinationType.INSERT_WITH_SUPER_TMP_TYPE;
return DestinationType.STANDARD;
}

if (isNullOrEmpty(bucketNode) && isNullOrEmpty(regionNode) && isNullOrEmpty(accessKeyIdNode)
&& isNullOrEmpty(secretAccessKeyNode)) {
throw new RuntimeException("Error: Partially missing S3 Configuration.");
}
return DestinationType.COPY_S3_WITH_SUPER_TMP_TYPE;
return DestinationType.COPY_S3;
}

public static void main(final String[] args) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import io.airbyte.db.jdbc.DefaultJdbcDatabase;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.integrations.destination.redshift.enums.RedshiftDataTmpTableMode;
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations;
import java.util.Map;
import java.util.Optional;
Expand All @@ -30,8 +29,8 @@ public class RedshiftInsertDestination extends AbstractJdbcDestination {
"ssl", "true",
"sslfactory", "com.amazon.redshift.ssl.NonValidatingFactory");

public RedshiftInsertDestination(final RedshiftDataTmpTableMode redshiftDataTmpTableMode) {
super(DRIVER_CLASS, new RedshiftSQLNameTransformer(), new RedshiftSqlOperations(redshiftDataTmpTableMode));
public RedshiftInsertDestination() {
super(DRIVER_CLASS, new RedshiftSQLNameTransformer(), new RedshiftSqlOperations());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.redshift.enums.RedshiftDataTmpTableMode;
import io.airbyte.integrations.destination.redshift.operations.RedshiftS3StagingSqlOperations;
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations;
import io.airbyte.integrations.destination.s3.S3Destination;
Expand All @@ -42,11 +41,9 @@
public class RedshiftStagingS3Destination extends AbstractJdbcDestination implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftStagingS3Destination.class);
private final RedshiftDataTmpTableMode redshiftDataTmpTableMode;

public RedshiftStagingS3Destination(final RedshiftDataTmpTableMode redshiftDataTmpTableMode) {
super(RedshiftInsertDestination.DRIVER_CLASS, new RedshiftSQLNameTransformer(), new RedshiftSqlOperations(redshiftDataTmpTableMode));
this.redshiftDataTmpTableMode = redshiftDataTmpTableMode;
public RedshiftStagingS3Destination() {
super(RedshiftInsertDestination.DRIVER_CLASS, new RedshiftSQLNameTransformer(), new RedshiftSqlOperations());
}

@Override
Expand All @@ -56,7 +53,7 @@ public AirbyteConnectionStatus check(final JsonNode config) {

final NamingConventionTransformer nameTransformer = getNamingResolver();
final RedshiftS3StagingSqlOperations redshiftS3StagingSqlOperations =
new RedshiftS3StagingSqlOperations(nameTransformer, s3Config.getS3Client(), s3Config, redshiftDataTmpTableMode);
new RedshiftS3StagingSqlOperations(nameTransformer, s3Config.getS3Client(), s3Config);
final DataSource dataSource = getDataSource(config);
try {
final JdbcDatabase database = new DefaultJdbcDatabase(dataSource);
Expand Down Expand Up @@ -113,7 +110,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
return new StagingConsumerFactory().create(
outputRecordCollector,
getDatabase(getDataSource(config)),
new RedshiftS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config, redshiftDataTmpTableMode),
new RedshiftS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config),
getNamingResolver(),
CsvSerializedBuffer.createFunction(null, () -> new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX)),
config,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.airbyte.integrations.base.sentry.AirbyteSentry;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.integrations.destination.redshift.enums.RedshiftDataTmpTableMode;
import io.airbyte.integrations.destination.redshift.manifest.Entry;
import io.airbyte.integrations.destination.redshift.manifest.Manifest;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
Expand All @@ -33,10 +32,8 @@ public class RedshiftS3StagingSqlOperations extends RedshiftSqlOperations implem
private final ObjectMapper objectMapper;

public RedshiftS3StagingSqlOperations(NamingConventionTransformer nameTransformer,
AmazonS3 s3Client,
S3DestinationConfig s3Config,
RedshiftDataTmpTableMode redshiftDataTmpTableMode) {
super(redshiftDataTmpTableMode);
AmazonS3 s3Client,
S3DestinationConfig s3Config) {
this.nameTransformer = nameTransformer;
this.s3StorageOperations = new S3StorageOperations(nameTransformer, s3Client, s3Config);
this.s3Config = s3Config;
Expand Down Expand Up @@ -84,11 +81,11 @@ private String putManifest(final String manifestContents, String stagingPath) {

@Override
public void copyIntoTmpTableFromStage(JdbcDatabase database,
String stageName,
String stagingPath,
List<String> stagedFiles,
String dstTableName,
String schemaName)
String stageName,
String stagingPath,
List<String> stagedFiles,
String dstTableName,
String schemaName)
throws Exception {
LOGGER.info("Starting copy to tmp table from stage: {} in destination from stage: {}, schema: {}, .", dstTableName, stagingPath, schemaName);
final var possibleManifest = Optional.ofNullable(createManifest(stagedFiles, stagingPath));
Expand All @@ -104,12 +101,12 @@ private void executeCopy(final String manifestPath, JdbcDatabase db, String sche
final S3AccessKeyCredentialConfig credentialConfig = (S3AccessKeyCredentialConfig) s3Config.getS3CredentialConfig();
final var copyQuery = String.format(
"""
COPY %s.%s FROM '%s'
CREDENTIALS 'aws_access_key_id=%s;aws_secret_access_key=%s'
CSV GZIP
REGION '%s' TIMEFORMAT 'auto'
STATUPDATE OFF
MANIFEST;""",
COPY %s.%s FROM '%s'
CREDENTIALS 'aws_access_key_id=%s;aws_secret_access_key=%s'
CSV GZIP
REGION '%s' TIMEFORMAT 'auto'
STATUPDATE OFF
MANIFEST;""",
schemaName,
tmpTableName,
getFullS3Path(s3Config.getBucketName(), manifestPath),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations;
import io.airbyte.integrations.destination.jdbc.SqlOperationsUtils;
import io.airbyte.integrations.destination.redshift.enums.RedshiftDataTmpTableMode;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
Expand Down Expand Up @@ -58,15 +57,21 @@ having count(*) = 3)
ALTER TABLE %1$s RENAME %3$s_reserve to %3$s;
""";

private final RedshiftDataTmpTableMode redshiftDataTmpTableMode;

public RedshiftSqlOperations(final RedshiftDataTmpTableMode redshiftDataTmpTableMode) {
this.redshiftDataTmpTableMode = redshiftDataTmpTableMode;
public RedshiftSqlOperations() {
}

@Override
public String createTableQuery(final JdbcDatabase database, final String schemaName, final String tableName) {
return redshiftDataTmpTableMode.getTmpTableSqlStatement(schemaName, tableName);
return String.format("""
CREATE TABLE IF NOT EXISTS %s.%s (
%s VARCHAR PRIMARY KEY,
%s SUPER,
%s TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP)
""", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_AB_ID,
JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT);
}

@Override
Expand All @@ -88,7 +93,7 @@ public void insertRecordsInternal(final JdbcDatabase database,
JavaBaseConstants.COLUMN_NAME_AB_ID,
JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT);
final String recordQueryComponent = redshiftDataTmpTableMode.getInsertRowMode();
final String recordQueryComponent = "(?, JSON_PARSE(?), ?),\n";
SqlOperationsUtils.insertRawRecordsInSingleQuery(insertQueryComponent, recordQueryComponent, database, records);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@ public class RedshiftDestinationTest {

@Test
@DisplayName("When given S3 credentials should use COPY with SUPER Datatype")
public void useCopyStrategyTestWithSuperTmpTable() {
public void useS3Staging() {
final var stubConfig = mapper.createObjectNode();
stubConfig.put("s3_bucket_name", "fake-bucket");
stubConfig.put("s3_bucket_region", "fake-region");
stubConfig.put("access_key_id", "test");
stubConfig.put("secret_access_key", "test key");

assertEquals(DestinationType.COPY_S3_WITH_SUPER_TMP_TYPE, RedshiftDestination.determineUploadMode(stubConfig));
assertEquals(DestinationType.COPY_S3, RedshiftDestination.determineUploadMode(stubConfig));
}

@Test
@DisplayName("When not given S3 credentials should use INSERT with SUPER Datatype")
public void useInsertStrategyTestWithSuperDatatype() {
public void useStandardInsert() {
final var stubConfig = mapper.createObjectNode();
assertEquals(DestinationType.INSERT_WITH_SUPER_TMP_TYPE, RedshiftDestination.determineUploadMode(stubConfig));
assertEquals(DestinationType.STANDARD, RedshiftDestination.determineUploadMode(stubConfig));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@
import org.junit.jupiter.api.Test;

import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.destination.redshift.enums.RedshiftDataTmpTableMode;

@DisplayName("RedshiftSqlOperations")
public class RedshiftSqlOperationsTest {

private static final Random RANDOM = new Random();

private static final RedshiftDataTmpTableMode redshiftDataTmpTableMode = RedshiftDataTmpTableMode.SUPER;

private String generateBigString(final int addExtraCharacters) {
final int length = RedshiftSqlOperations.REDSHIFT_VARCHAR_MAX_BYTE_SIZE + addExtraCharacters;
return RANDOM
Expand All @@ -44,7 +41,7 @@ public void isValidDataForValid() {
.put("NZD", 1.14)
.build());

RedshiftSqlOperations uut = new RedshiftSqlOperations(redshiftDataTmpTableMode);
RedshiftSqlOperations uut = new RedshiftSqlOperations();
boolean isValid = uut.isValidData(testNode);
assertEquals(true, isValid);
}
Expand All @@ -60,7 +57,7 @@ public void isValidDataForInvalidNode() {
.put("NZD", 1.14)
.build());

RedshiftSqlOperations uut = new RedshiftSqlOperations(redshiftDataTmpTableMode);
RedshiftSqlOperations uut = new RedshiftSqlOperations();
boolean isValid = uut.isValidData(testNode);
assertEquals(false, isValid);
}
Expand Down Expand Up @@ -91,7 +88,7 @@ public void isValidDataForInvalidObject() {
.put("key20", generateBigString(-1))
.build());

RedshiftSqlOperations uut = new RedshiftSqlOperations(redshiftDataTmpTableMode);
RedshiftSqlOperations uut = new RedshiftSqlOperations();
boolean isValid = uut.isValidData(testNode);
assertEquals(false, isValid);
}
Expand Down
5 changes: 3 additions & 2 deletions docs/integrations/destinations/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ Each stream will be output into its own raw table in Redshift. Each table will c

## Changelog

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :----- | :------ |
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.3.37 | 2022-05-23 | [13090](https://github.com/airbytehq/airbyte/pull/13090) | Removed redshiftDataTmpTableMode. Some refactoring. |
| 0.3.36 | 2022-05-23 | [12820](https://github.com/airbytehq/airbyte/pull/12820) | Improved 'check' operation performance |
| 0.3.35 | 2022-05-18 | [12940](https://github.com/airbytehq/airbyte/pull/12940) | Fixed maximum record size for SUPER type |
| 0.3.34 | 2022-05-16 | [12869](https://github.com/airbytehq/airbyte/pull/12869) | Fixed NPE in S3 staging check |
Expand Down

0 comments on commit 5371f87

Please sign in to comment.