diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 1646920b6e7a..26aa7ec6354b 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -191,7 +191,7 @@ - name: Redshift destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc dockerRepository: airbyte/destination-redshift - dockerImageTag: 0.3.31 + dockerImageTag: 0.3.32 documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift icon: redshift.svg resourceRequirements: diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 57011e9c801f..551fb543b7b1 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -3414,7 +3414,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-redshift:0.3.31" +- dockerImage: "airbyte/destination-redshift:0.3.32" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift" connectionSpecification: diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/resources/redshift_normalization_migration/test_pokemon_super.sql b/airbyte-integrations/bases/base-normalization/integration_tests/resources/redshift_normalization_migration/test_pokemon_super.sql index 6dc72ef6cc9c..40266a78b251 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/resources/redshift_normalization_migration/test_pokemon_super.sql +++ b/airbyte-integrations/bases/base-normalization/integration_tests/resources/redshift_normalization_migration/test_pokemon_super.sql @@ -1 +1,6 @@ -select forms from {{ ref('pokemon' )}} where forms != json_parse('[{"name":"ditto","url":"https://pokeapi.co/api/v2/pokemon-form/132/"}]') +SELECT + forms +FROM + {{ REF('pokemon') }} +WHERE + forms != json_parse('[{"name":"ditto","url":"https://pokeapi.co/api/v2/pokemon-form/132/"}]') diff --git a/airbyte-integrations/connectors/destination-redshift/Dockerfile b/airbyte-integrations/connectors/destination-redshift/Dockerfile index 46a7fe898fee..20f394fc6f3b 100644 --- a/airbyte-integrations/connectors/destination-redshift/Dockerfile +++ b/airbyte-integrations/connectors/destination-redshift/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.3.31 +LABEL io.airbyte.version=0.3.32 LABEL io.airbyte.name=airbyte/destination-redshift diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java index a7ab36123bfc..bf283eb31a67 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftDestination.java @@ -5,15 +5,11 @@ package io.airbyte.integrations.destination.redshift; import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.destination.jdbc.copy.SwitchingDestination; import io.airbyte.integrations.destination.redshift.enums.RedshiftDataTmpTableMode; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.util.Map; -import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,17 +51,15 @@ public static DestinationType determineUploadMode(final JsonNode config) { final var accessKeyIdNode = config.get("access_key_id"); final var secretAccessKeyNode = config.get("secret_access_key"); - // Since region is a Json schema enum with an empty string default, we consider the empty string an - // unset field. - final var emptyRegion = regionNode == null || regionNode.asText().equals(""); - - if (bucketNode == null && emptyRegion && accessKeyIdNode == null && secretAccessKeyNode == null) { + if (isNullOrEmpty(bucketNode) && isNullOrEmpty(regionNode) && isNullOrEmpty(accessKeyIdNode) + && isNullOrEmpty(secretAccessKeyNode)) { LOGGER.warn("The \"standard\" upload mode is not performant, and is not recommended for production. " + - "Please use the Amazon S3 upload mode if you are syncing a large amount of data."); + "Please use the Amazon S3 upload mode if you are syncing a large amount of data."); return DestinationType.INSERT_WITH_SUPER_TMP_TYPE; } - if (bucketNode == null || regionNode == null || accessKeyIdNode == null || secretAccessKeyNode == null) { + if (isNullOrEmpty(bucketNode) && isNullOrEmpty(regionNode) && isNullOrEmpty(accessKeyIdNode) + && isNullOrEmpty(secretAccessKeyNode)) { throw new RuntimeException("Error: Partially missing S3 Configuration."); } return DestinationType.COPY_S3_WITH_SUPER_TMP_TYPE; @@ -78,4 +72,8 @@ public static void main(final String[] args) throws Exception { LOGGER.info("completed destination: {}", RedshiftDestination.class); } + private static boolean isNullOrEmpty(JsonNode jsonNode) { + return jsonNode == null || jsonNode.asText().equals(""); + } + } diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.java index 6f757f728705..7dee925f655c 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.java @@ -14,7 +14,7 @@ import java.util.Map; import java.util.Optional; -public class RedshiftInsertDestination extends AbstractJdbcDestination { +public class RedshiftInsertDestination extends AbstractJdbcDestination { private static final String DRIVER_CLASS = "com.amazon.redshift.jdbc.Driver"; private static final String USERNAME = "username"; @@ -67,4 +67,5 @@ public static JsonNode getJdbcConfig(final JsonNode redshiftConfig) { .put(SCHEMA, schema) .build()); } + } diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftSqlOperations.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftSqlOperations.java index 2b3f68368125..df706ee9385e 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftSqlOperations.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftSqlOperations.java @@ -23,28 +23,29 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class RedshiftSqlOperations extends JdbcSqlOperations{ +public class RedshiftSqlOperations extends JdbcSqlOperations { private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftSqlOperations.class); protected static final int REDSHIFT_VARCHAR_MAX_BYTE_SIZE = 65535; private static final String SELECT_ALL_TABLES_WITH_NOT_SUPER_TYPE_SQL_STATEMENT = """ - select tablename, schemaname - from pg_table_def - where tablename in ( - select tablename as tablename - from pg_table_def - where schemaname = '%1$s' - and tablename like '%%airbyte_raw%%' - and "column" in ('%2$s', '%3$s', '%4$s') - group by tablename - having count(*) = 3) - and schemaname = '%1$s' - and type <> 'super' - and "column" = '_airbyte_data'; - """; - - private static final String ALTER_TMP_TABLES_WITH_NOT_SUPER_TYPE_TO_SUPER_TYPE = """ + select tablename, schemaname + from pg_table_def + where tablename in ( + select tablename as tablename + from pg_table_def + where schemaname = '%1$s' + and tablename like '%%airbyte_raw%%' + and "column" in ('%2$s', '%3$s', '%4$s') + group by tablename + having count(*) = 3) + and schemaname = '%1$s' + and type <> 'super' + and "column" = '_airbyte_data'; + """; + + private static final String ALTER_TMP_TABLES_WITH_NOT_SUPER_TYPE_TO_SUPER_TYPE = + """ ALTER TABLE %1$s ADD COLUMN %2$s_super super; ALTER TABLE %1$s ADD COLUMN %3$s_reserve TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP; UPDATE %1$s SET %2$s_super = JSON_PARSE(%2$s); @@ -68,9 +69,9 @@ public String createTableQuery(final JdbcDatabase database, final String schemaN @Override public void insertRecordsInternal(final JdbcDatabase database, - final List records, - final String schemaName, - final String tmpTableName) + final List records, + final String schemaName, + final String tmpTableName) throws SQLException { LOGGER.info("actual size of batch: {}", records.size()); @@ -97,9 +98,10 @@ public boolean isValidData(final JsonNode data) { } /** - * In case of redshift we need to discover all tables with not super type and update them after to SUPER type. This would be done once. + * In case of redshift we need to discover all tables with not super type and update them after to + * SUPER type. This would be done once. * - * @param database - Database object for interacting with a JDBC connection. + * @param database - Database object for interacting with a JDBC connection. * @param writeConfigSet - list of write configs. */ @@ -117,11 +119,11 @@ public void onDestinationCloseOperations(final JdbcDatabase database, final Set< } /** - * @param database - Database object for interacting with a JDBC connection. + * @param database - Database object for interacting with a JDBC connection. * @param schemaName - schema to update. */ private List discoverNotSuperTables(final JdbcDatabase database, - final String schemaName) { + final String schemaName) { List schemaAndTableWithNotSuperType = new ArrayList<>(); try { LOGGER.info("Discovering NOT SUPER table types..."); @@ -136,8 +138,8 @@ private List discoverNotSuperTables(final JdbcDatabase database, if (tablesNameWithoutSuperDatatype.isEmpty()) { return Collections.emptyList(); } else { - tablesNameWithoutSuperDatatype.forEach(e -> - schemaAndTableWithNotSuperType.add(e.get("schemaname").textValue() + "." + e.get("tablename").textValue())); + tablesNameWithoutSuperDatatype + .forEach(e -> schemaAndTableWithNotSuperType.add(e.get("schemaname").textValue() + "." + e.get("tablename").textValue())); return schemaAndTableWithNotSuperType; } } catch (SQLException e) { @@ -149,14 +151,15 @@ private List discoverNotSuperTables(final JdbcDatabase database, /** * We prepare one query for all tables with not super type for updating. * - * @param database - Database object for interacting with a JDBC connection. + * @param database - Database object for interacting with a JDBC connection. * @param schemaAndTableWithNotSuperType - list of tables with not super type. */ private void updateVarcharDataColumnToSuperDataColumn(final JdbcDatabase database, final List schemaAndTableWithNotSuperType) { LOGGER.info("Updating VARCHAR data column to SUPER..."); StringBuilder finalSqlStatement = new StringBuilder(); // To keep the previous data, we need to add next columns: _airbyte_data, _airbyte_emitted_at - // We do such workflow because we can't directly CAST VARCHAR to SUPER column. _airbyte_emitted_at column recreated to keep + // We do such workflow because we can't directly CAST VARCHAR to SUPER column. _airbyte_emitted_at + // column recreated to keep // the COLUMN order. This order is required to INSERT the values in correct way. schemaAndTableWithNotSuperType.forEach(schemaAndTable -> { LOGGER.info("Altering table {} column _airbyte_data to SUPER.", schemaAndTable); @@ -172,5 +175,5 @@ private void updateVarcharDataColumnToSuperDataColumn(final JdbcDatabase databas throw new RuntimeException(e); } } -} +} diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/enums/RedshiftDataTmpTableMode.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/enums/RedshiftDataTmpTableMode.java index fcd8725e9c1a..7f120831a2c6 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/enums/RedshiftDataTmpTableMode.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/enums/RedshiftDataTmpTableMode.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.destination.redshift.enums; import io.airbyte.integrations.base.JavaBaseConstants; @@ -6,7 +10,9 @@ * This enum determines the type for _airbyte_data_ column at _airbyte_raw_**some_table_name** */ public enum RedshiftDataTmpTableMode { + SUPER { + @Override public String getTableCreationMode() { return "SUPER"; @@ -16,6 +22,7 @@ public String getTableCreationMode() { public String getInsertRowMode() { return "(?, JSON_PARSE(?), ?),\n"; } + }; public abstract String getTableCreationMode(); @@ -24,13 +31,14 @@ public String getInsertRowMode() { public String getTmpTableSqlStatement(String schemaName, String tableName) { return String.format(""" - CREATE TABLE IF NOT EXISTS %s.%s ( - %s VARCHAR PRIMARY KEY, - %s %s, - %s TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP) - """, schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, + CREATE TABLE IF NOT EXISTS %s.%s ( + %s VARCHAR PRIMARY KEY, + %s %s, + %s TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP) + """, schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA, getTableCreationMode(), JavaBaseConstants.COLUMN_NAME_EMITTED_AT); } + } diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java index 180d26f87413..0fc18225ad1f 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java @@ -32,7 +32,6 @@ import org.apache.commons.lang3.RandomStringUtils; import org.jooq.Record; import org.jooq.Result; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; /** @@ -49,20 +48,20 @@ class RedshiftInsertDestinationAcceptanceTest extends RedshiftCopyDestinationAcc private static final String USERS_STREAM_NAME = "users_" + RandomStringUtils.randomAlphabetic(5); private static final AirbyteMessage MESSAGE_USERS1 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) - .withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "john").put("id", "10").build())) - .withEmittedAt(NOW.toEpochMilli())); + .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) + .withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "john").put("id", "10").build())) + .withEmittedAt(NOW.toEpochMilli())); private static final AirbyteMessage MESSAGE_USERS2 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) - .withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "susan").put("id", "30").build())) - .withEmittedAt(NOW.toEpochMilli())); + .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) + .withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "susan").put("id", "30").build())) + .withEmittedAt(NOW.toEpochMilli())); private static final AirbyteMessage USER_IN_THE_DB = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) - .withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "Alex").put("id", "1").build())) - .withEmittedAt(NOW.toEpochMilli())); + .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) + .withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "Alex").put("id", "1").build())) + .withEmittedAt(NOW.toEpochMilli())); private static final AirbyteMessage MESSAGE_STATE = new AirbyteMessage().withType(AirbyteMessage.Type.STATE) - .withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.builder().put("checkpoint", "now!").build()))); + .withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.builder().put("checkpoint", "now!").build()))); public JsonNode getStaticConfig() { return removeStagingConfigurationFromRedshift(Jsons.deserialize(IOs.readFile(Path.of("secrets/config.json")))); @@ -77,14 +76,14 @@ public static JsonNode removeStagingConfigurationFromRedshift(final JsonNode con return original; } - void setup(){ + void setup() { MESSAGE_USERS1.getRecord().setNamespace(DATASET_ID); MESSAGE_USERS2.getRecord().setNamespace(DATASET_ID); catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( - CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, DATASET_ID, - io.airbyte.protocol.models.Field.of("name", JsonSchemaType.STRING), - io.airbyte.protocol.models.Field.of("id", JsonSchemaType.STRING)) - .withDestinationSyncMode(DestinationSyncMode.APPEND))); + CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, DATASET_ID, + io.airbyte.protocol.models.Field.of("name", JsonSchemaType.STRING), + io.airbyte.protocol.models.Field.of("id", JsonSchemaType.STRING)) + .withDestinationSyncMode(DestinationSyncMode.APPEND))); } @Test @@ -108,9 +107,9 @@ void testIfSuperTmpTableWasCreatedAfterVarcharTmpTable() throws Exception { final List usersActual = retrieveRecords(testDestinationEnv, USERS_STREAM_NAME, DATASET_ID, config); final List expectedUsersJson = Lists.newArrayList( - MESSAGE_USERS1.getRecord().getData(), - MESSAGE_USERS2.getRecord().getData(), - USER_IN_THE_DB.getRecord().getData()); + MESSAGE_USERS1.getRecord().getData(), + MESSAGE_USERS2.getRecord().getData(), + USER_IN_THE_DB.getRecord().getData()); assertEquals(expectedUsersJson.size(), usersActual.size()); assertTrue(expectedUsersJson.containsAll(usersActual) && usersActual.containsAll(expectedUsersJson)); } @@ -119,18 +118,19 @@ private void createTmpTableWithVarchar(final Database database, final String str // As we don't care about the previous data we just simulate the flow when previous table exists. database.query(q -> { q.fetch(String.format("CREATE SCHEMA IF NOT EXISTS %s", DATASET_ID)); - q.fetch(String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s VARCHAR PRIMARY KEY, %s VARCHAR, %s TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP)", - DATASET_ID, - streamName, - JavaBaseConstants.COLUMN_NAME_AB_ID, - JavaBaseConstants.COLUMN_NAME_DATA, - JavaBaseConstants.COLUMN_NAME_EMITTED_AT)); + q.fetch(String.format( + "CREATE TABLE IF NOT EXISTS %s.%s (%s VARCHAR PRIMARY KEY, %s VARCHAR, %s TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP)", + DATASET_ID, + streamName, + JavaBaseConstants.COLUMN_NAME_AB_ID, + JavaBaseConstants.COLUMN_NAME_DATA, + JavaBaseConstants.COLUMN_NAME_EMITTED_AT)); // Simulate existing record q.fetch(String.format(""" - insert into %s.%s (_airbyte_ab_id, _airbyte_data, _airbyte_emitted_at) values - ('9', '{\"id\":\"1\",\"name\":\"Alex\"}', '2022-02-09 12:02:13.322000 +00:00')""", - DATASET_ID, - streamName)); + insert into %s.%s (_airbyte_ab_id, _airbyte_data, _airbyte_emitted_at) values + ('9', '{\"id\":\"1\",\"name\":\"Alex\"}', '2022-02-09 12:02:13.322000 +00:00')""", + DATASET_ID, + streamName)); return null; }); } @@ -142,23 +142,26 @@ private void createTmpTableWithVarchar(final Database database, final String str * @param expectedType - data type of _airbyte_data to expect * @return if current datatype of _airbyte_data column is expectedType. * - * PG_TABLE_DEF table Stores information about table columns. - * PG_TABLE_DEF only returns information about tables that are visible to the user. + * PG_TABLE_DEF table Stores information about table columns. PG_TABLE_DEF only returns + * information about tables that are visible to the user. * - * PG_TABLE_DEF + * PG_TABLE_DEF * * @throws SQLException */ private boolean isTmpTableDataColumnInExpectedType(final Database database, - final String dataSet, - final String streamName, - final String expectedType) throws SQLException { + final String dataSet, + final String streamName, + final String expectedType) + throws SQLException { Result query = database.query(q -> { return q.fetch(String.format(""" - set search_path to %s; - select type from pg_table_def where tablename = \'%s\' and "column" = \'%s\'""", - dataSet, streamName, JavaBaseConstants.COLUMN_NAME_DATA)); + set search_path to %s; + select type from pg_table_def where tablename = \'%s\' and "column" = \'%s\'""", + dataSet, streamName, JavaBaseConstants.COLUMN_NAME_DATA)); }); return query.get(0).getValue(TYPE).toString().trim().contains(expectedType); } + } diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.java index 0271c3b34cdc..d540d18f95a1 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.java @@ -31,8 +31,6 @@ import org.apache.commons.lang3.RandomStringUtils; import org.jooq.Record; import org.jooq.Result; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; public class RedshiftS3StagingInsertDestinationAcceptanceTest extends RedshiftCopyDestinationAcceptanceTest { @@ -45,33 +43,33 @@ public class RedshiftS3StagingInsertDestinationAcceptanceTest extends RedshiftCo private static final String BOOKS_STREAM_NAME = "books_" + RandomStringUtils.randomAlphabetic(5); private static final AirbyteMessage MESSAGE_BOOKS1 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(BOOKS_STREAM_NAME) - .withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "Harry Potter").put("id", "10").build())) - .withEmittedAt(NOW.toEpochMilli())); + .withRecord(new AirbyteRecordMessage().withStream(BOOKS_STREAM_NAME) + .withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "Harry Potter").put("id", "10").build())) + .withEmittedAt(NOW.toEpochMilli())); private static final AirbyteMessage MESSAGE_BOOKS2 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(BOOKS_STREAM_NAME) - .withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "The Great Gatsby").put("id", "30").build())) - .withEmittedAt(NOW.toEpochMilli())); + .withRecord(new AirbyteRecordMessage().withStream(BOOKS_STREAM_NAME) + .withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "The Great Gatsby").put("id", "30").build())) + .withEmittedAt(NOW.toEpochMilli())); private static final AirbyteMessage BOOKS_IN_THE_DB = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(BOOKS_STREAM_NAME) - .withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "Brave New World").put("id", "1").build())) - .withEmittedAt(NOW.toEpochMilli())); + .withRecord(new AirbyteRecordMessage().withStream(BOOKS_STREAM_NAME) + .withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "Brave New World").put("id", "1").build())) + .withEmittedAt(NOW.toEpochMilli())); private static final AirbyteMessage MESSAGE_STATE = new AirbyteMessage().withType(AirbyteMessage.Type.STATE) - .withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.builder().put("checkpoint", "now!").build()))); + .withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.builder().put("checkpoint", "now!").build()))); public JsonNode getStaticConfig() { return Jsons.deserialize(IOs.readFile(Path.of("secrets/config.json"))); } - void setup(){ + void setup() { MESSAGE_BOOKS1.getRecord().setNamespace(DATASET_ID); MESSAGE_BOOKS2.getRecord().setNamespace(DATASET_ID); catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( - CatalogHelpers.createConfiguredAirbyteStream(BOOKS_STREAM_NAME, DATASET_ID, - io.airbyte.protocol.models.Field.of("name", JsonSchemaType.STRING), - io.airbyte.protocol.models.Field.of("id", JsonSchemaType.STRING)) - .withDestinationSyncMode(DestinationSyncMode.APPEND))); + CatalogHelpers.createConfiguredAirbyteStream(BOOKS_STREAM_NAME, DATASET_ID, + io.airbyte.protocol.models.Field.of("name", JsonSchemaType.STRING), + io.airbyte.protocol.models.Field.of("id", JsonSchemaType.STRING)) + .withDestinationSyncMode(DestinationSyncMode.APPEND))); } @Test @@ -95,9 +93,9 @@ void testIfSuperTmpTableWasCreatedAfterVarcharTmpTableDuringS3Staging() throws E final List booksActual = retrieveRecords(testDestinationEnv, BOOKS_STREAM_NAME, DATASET_ID, config); final List expectedUsersJson = Lists.newArrayList( - MESSAGE_BOOKS1.getRecord().getData(), - MESSAGE_BOOKS2.getRecord().getData(), - BOOKS_IN_THE_DB.getRecord().getData()); + MESSAGE_BOOKS1.getRecord().getData(), + MESSAGE_BOOKS2.getRecord().getData(), + BOOKS_IN_THE_DB.getRecord().getData()); assertEquals(expectedUsersJson.size(), booksActual.size()); assertTrue(expectedUsersJson.containsAll(booksActual) && booksActual.containsAll(expectedUsersJson)); } @@ -106,18 +104,19 @@ private void createTmpTableWithVarchar(final Database database, final String str // As we don't care about the previous data we just simulate the flow when previous table exists. database.query(q -> { q.fetch(String.format("CREATE SCHEMA IF NOT EXISTS %s", DATASET_ID)); - q.fetch(String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s VARCHAR PRIMARY KEY, %s VARCHAR, %s TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP)", - DATASET_ID, - streamName, - JavaBaseConstants.COLUMN_NAME_AB_ID, - JavaBaseConstants.COLUMN_NAME_DATA, - JavaBaseConstants.COLUMN_NAME_EMITTED_AT)); + q.fetch(String.format( + "CREATE TABLE IF NOT EXISTS %s.%s (%s VARCHAR PRIMARY KEY, %s VARCHAR, %s TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP)", + DATASET_ID, + streamName, + JavaBaseConstants.COLUMN_NAME_AB_ID, + JavaBaseConstants.COLUMN_NAME_DATA, + JavaBaseConstants.COLUMN_NAME_EMITTED_AT)); // Simulate existing record q.fetch(String.format(""" - insert into %s.%s (_airbyte_ab_id, _airbyte_data, _airbyte_emitted_at) values - ('9', '{\"id\":\"1\",\"name\":\"Brave New World\"}', '2022-02-09 12:02:13.322000 +00:00')""", - DATASET_ID, - streamName)); + insert into %s.%s (_airbyte_ab_id, _airbyte_data, _airbyte_emitted_at) values + ('9', '{\"id\":\"1\",\"name\":\"Brave New World\"}', '2022-02-09 12:02:13.322000 +00:00')""", + DATASET_ID, + streamName)); return null; }); } @@ -129,23 +128,26 @@ private void createTmpTableWithVarchar(final Database database, final String str * @param expectedType - data type of _airbyte_data to expect * @return if current datatype of _airbyte_data column is expectedType. * - * PG_TABLE_DEF table Stores information about table columns. - * PG_TABLE_DEF only returns information about tables that are visible to the user. + * PG_TABLE_DEF table Stores information about table columns. PG_TABLE_DEF only returns + * information about tables that are visible to the user. * - * PG_TABLE_DEF + * PG_TABLE_DEF * * @throws SQLException */ private boolean isTmpTableDataColumnInExpectedType(final Database database, - final String dataSet, - final String streamName, - final String expectedType) throws SQLException { + final String dataSet, + final String streamName, + final String expectedType) + throws SQLException { Result query = database.query(q -> { return q.fetch(String.format(""" - set search_path to %s; - select type from pg_table_def where tablename = \'%s\' and "column" = \'%s\'""", - dataSet, streamName, JavaBaseConstants.COLUMN_NAME_DATA)); + set search_path to %s; + select type from pg_table_def where tablename = \'%s\' and "column" = \'%s\'""", + dataSet, streamName, JavaBaseConstants.COLUMN_NAME_DATA)); }); return query.get(0).getValue(TYPE).toString().trim().contains(expectedType); } + } diff --git a/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationTest.java b/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationTest.java index a5a093d358fa..19e3733e8ef4 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationTest.java @@ -5,8 +5,6 @@ package io.airbyte.integrations.destination.redshift; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.ObjectMapper; import io.airbyte.commons.jackson.MoreMappers; @@ -37,4 +35,5 @@ public void useInsertStrategyTestWithSuperDatatype() { final var stubConfig = mapper.createObjectNode(); assertEquals(DestinationType.INSERT_WITH_SUPER_TMP_TYPE, RedshiftDestination.determineUploadMode(stubConfig)); } + } diff --git a/docs/integrations/destinations/redshift.md b/docs/integrations/destinations/redshift.md index 2c1859f7f4d8..8339f8e7c8ec 100644 --- a/docs/integrations/destinations/redshift.md +++ b/docs/integrations/destinations/redshift.md @@ -123,6 +123,7 @@ All Redshift connections are encrypted using SSL | Version | Date | Pull Request | Subject | |:--------|:-----------| :----- | :------ | +| 0.3.32 | 2022-04-20 | [12085](https://github.com/airbytehq/airbyte/pull/12085) | Fixed bug with switching between INSERT and COPY config | | 0.3.31 | 2022-04-19 | [\#12064](https://github.com/airbytehq/airbyte/pull/12064) | Added option to support SUPER datatype in _airbyte_raw_** table | | 0.3.29 | 2022-04-05 | [11729](https://github.com/airbytehq/airbyte/pull/11729) | Fixed bug with dashes in schema name | | | 0.3.28 | 2022-03-18 | [\#11254](https://github.com/airbytehq/airbyte/pull/11254) | Fixed missing records during S3 staging |