From e4e11cc0cfd60a79b06528cd34ba6c8bfee4a64f Mon Sep 17 00:00:00 2001 From: Andrii Leonets <30464745+DoNotPanicUA@users.noreply.github.com> Date: Thu, 28 Apr 2022 18:26:48 +0300 Subject: [PATCH] Pubsub, Pulsar, Redis, Redshift, Rocket destinations : Enable DAT tests (#12143) * enable DAT tests for Pulsar * Enable DAT test for pubsub, redis, redshift, rocket * format * fix normalized data fetch * cover "other" result type for arrays * remove deserialization because now we have already parsed node * fix bugspot * fix unicode case --- .../PubsubDestinationAcceptanceTest.java | 22 ++++++ .../pulsar/PulsarRecordConsumer.java | 3 +- .../PulsarDestinationAcceptanceTest.java | 22 ++++++ .../redis/RedisDestinationAcceptanceTest.java | 22 ++++++ ...RedshiftCopyDestinationAcceptanceTest.java | 69 ++++++++++++++----- .../redshift/RedshiftTestDataComparator.java | 57 +++++++++++++++ .../RocksetDestinationAcceptanceTest.java | 22 ++++++ 7 files changed, 200 insertions(+), 17 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftTestDataComparator.java diff --git a/airbyte-integrations/connectors/destination-pubsub/src/test-integration/java/io/airbyte/integrations/destination/pubsub/PubsubDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-pubsub/src/test-integration/java/io/airbyte/integrations/destination/pubsub/PubsubDestinationAcceptanceTest.java index a85de5db1945..aaadb701bd9f 100644 --- a/airbyte-integrations/connectors/destination-pubsub/src/test-integration/java/io/airbyte/integrations/destination/pubsub/PubsubDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-pubsub/src/test-integration/java/io/airbyte/integrations/destination/pubsub/PubsubDestinationAcceptanceTest.java @@ -38,6 +38,8 @@ import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator; +import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -91,6 +93,26 @@ private AirbyteStreamNameNamespacePair fromJsonNode(final JsonNode j) { return new AirbyteStreamNameNamespacePair(stream, namespace); } + @Override + protected TestDataComparator getTestDataComparator() { + return new AdvancedTestDataComparator(); + } + + @Override + protected boolean supportBasicDataTypeTest() { + return true; + } + + @Override + protected boolean supportArrayDataTypeTest() { + return true; + } + + @Override + protected boolean supportObjectDataTypeTest() { + return true; + } + @Override protected List retrieveRecords(final TestDestinationEnv testEnv, final String streamName, diff --git a/airbyte-integrations/connectors/destination-pulsar/src/main/java/io/airbyte/integrations/destination/pulsar/PulsarRecordConsumer.java b/airbyte-integrations/connectors/destination-pulsar/src/main/java/io/airbyte/integrations/destination/pulsar/PulsarRecordConsumer.java index c22ac5c056c5..129b9e86348c 100644 --- a/airbyte-integrations/connectors/destination-pulsar/src/main/java/io/airbyte/integrations/destination/pulsar/PulsarRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-pulsar/src/main/java/io/airbyte/integrations/destination/pulsar/PulsarRecordConsumer.java @@ -11,6 +11,7 @@ import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -69,7 +70,7 @@ protected void acceptTracked(final AirbyteMessage airbyteMessage) { .set(PulsarDestination.COLUMN_NAME_AB_ID, key) .set(PulsarDestination.COLUMN_NAME_STREAM, recordMessage.getStream()) .set(PulsarDestination.COLUMN_NAME_EMITTED_AT, recordMessage.getEmittedAt()) - .set(PulsarDestination.COLUMN_NAME_DATA, recordMessage.getData().toString().getBytes()) + .set(PulsarDestination.COLUMN_NAME_DATA, recordMessage.getData().toString().getBytes(StandardCharsets.UTF_8)) .build(); sendRecord(producer, value); diff --git a/airbyte-integrations/connectors/destination-pulsar/src/test-integration/java/io/airbyte/integrations/destination/pulsar/PulsarDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-pulsar/src/test-integration/java/io/airbyte/integrations/destination/pulsar/PulsarDestinationAcceptanceTest.java index 26dae59de485..f31ec96dbfa6 100644 --- a/airbyte-integrations/connectors/destination-pulsar/src/test-integration/java/io/airbyte/integrations/destination/pulsar/PulsarDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-pulsar/src/test-integration/java/io/airbyte/integrations/destination/pulsar/PulsarDestinationAcceptanceTest.java @@ -15,6 +15,8 @@ import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.StandardNameTransformer; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator; +import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; import java.io.IOException; import java.net.InetAddress; import java.net.NetworkInterface; @@ -103,6 +105,26 @@ protected String getDefaultSchema(final JsonNode config) { return ""; } + @Override + protected TestDataComparator getTestDataComparator() { + return new AdvancedTestDataComparator(); + } + + @Override + protected boolean supportBasicDataTypeTest() { + return true; + } + + @Override + protected boolean supportArrayDataTypeTest() { + return true; + } + + @Override + protected boolean supportObjectDataTypeTest() { + return true; + } + @Override protected List retrieveNormalizedRecords(final TestDestinationEnv testEnv, final String streamName, final String namespace) throws IOException { diff --git a/airbyte-integrations/connectors/destination-redis/src/test-integration/java/io/airbyte/integrations/destination/redis/RedisDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redis/src/test-integration/java/io/airbyte/integrations/destination/redis/RedisDestinationAcceptanceTest.java index ca80552dc3cb..9dcc2312e2fc 100644 --- a/airbyte-integrations/connectors/destination-redis/src/test-integration/java/io/airbyte/integrations/destination/redis/RedisDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redis/src/test-integration/java/io/airbyte/integrations/destination/redis/RedisDestinationAcceptanceTest.java @@ -7,6 +7,8 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator; +import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; @@ -68,6 +70,26 @@ protected boolean implementsNamespaces() { return true; } + @Override + protected TestDataComparator getTestDataComparator() { + return new AdvancedTestDataComparator(); + } + + @Override + protected boolean supportBasicDataTypeTest() { + return true; + } + + @Override + protected boolean supportArrayDataTypeTest() { + return true; + } + + @Override + protected boolean supportObjectDataTypeTest() { + return true; + } + @Override protected List retrieveRecords(TestDestinationEnv testEnv, String streamName, diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftCopyDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftCopyDestinationAcceptanceTest.java index f0e729a1ca5e..231252bb6b1a 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftCopyDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftCopyDestinationAcceptanceTest.java @@ -5,20 +5,24 @@ package io.airbyte.integrations.destination.redshift; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.string.Strings; import io.airbyte.db.Database; import io.airbyte.db.Databases; -import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; import java.nio.file.Path; import java.sql.SQLException; -import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; +import org.jooq.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Integration test testing {@link RedshiftCopyS3Destination}. The default Redshift integration test @@ -26,6 +30,8 @@ */ public class RedshiftCopyDestinationAcceptanceTest extends DestinationAcceptanceTest { + private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftCopyDestinationAcceptanceTest.class); + // config from which to create / delete schemas. private JsonNode baseConfig; // config which refers to the schema that the test is being run in. @@ -34,6 +40,8 @@ public class RedshiftCopyDestinationAcceptanceTest extends DestinationAcceptance protected TestDestinationEnv testDestinationEnv; + private final ObjectMapper mapper = new ObjectMapper(); + @Override protected String getImageName() { return "airbyte/destination-redshift:dev"; @@ -55,6 +63,26 @@ protected JsonNode getFailCheckConfig() { return invalidConfig; } + @Override + protected TestDataComparator getTestDataComparator() { + return new RedshiftTestDataComparator(); + } + + @Override + protected boolean supportBasicDataTypeTest() { + return true; + } + + @Override + protected boolean supportArrayDataTypeTest() { + return true; + } + + @Override + protected boolean supportObjectDataTypeTest() { + return true; + } + @Override protected List retrieveRecords(final TestDestinationEnv env, final String streamName, @@ -63,7 +91,7 @@ protected List retrieveRecords(final TestDestinationEnv env, throws Exception { return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) .stream() - .map(j -> Jsons.deserialize(j.get(JavaBaseConstants.COLUMN_NAME_DATA).asText())) + .map(j -> j.get(JavaBaseConstants.COLUMN_NAME_DATA)) .collect(Collectors.toList()); } @@ -93,17 +121,27 @@ protected List retrieveNormalizedRecords(final TestDestinationEnv test return retrieveRecordsFromTable(tableName, namespace); } - @Override - protected List resolveIdentifier(final String identifier) { - final List result = new ArrayList<>(); - final String resolved = namingResolver.getIdentifier(identifier); - result.add(identifier); - result.add(resolved); - if (!resolved.startsWith("\"")) { - result.add(resolved.toLowerCase()); - result.add(resolved.toUpperCase()); - } - return result; + private JsonNode getJsonFromRecord(Record record) { + ObjectNode node = mapper.createObjectNode(); + + Arrays.stream(record.fields()).forEach(field -> { + var value = record.get(field); + + switch (field.getDataType().getTypeName()) { + case "varchar", "other": + var stringValue = (value != null ? value.toString() : null); + if (stringValue != null && (stringValue.replaceAll("[^\\x00-\\x7F]", "").matches("^\\[.*\\]$") + || stringValue.replaceAll("[^\\x00-\\x7F]", "").matches("^\\{.*\\}$"))) { + node.set(field.getName(), Jsons.deserialize(stringValue)); + } else { + node.put(field.getName(), stringValue); + } + break; + default: + node.put(field.getName(), (value != null ? value.toString() : null)); + } + }); + return node; } private List retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException { @@ -111,8 +149,7 @@ private List retrieveRecordsFromTable(final String tableName, final St ctx -> ctx .fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) .stream() - .map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat())) - .map(Jsons::deserialize) + .map(this::getJsonFromRecord) .collect(Collectors.toList())); } diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftTestDataComparator.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftTestDataComparator.java new file mode 100644 index 000000000000..6b018e9cd7f4 --- /dev/null +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftTestDataComparator.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.redshift; + +import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator; +import java.time.DateTimeException; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RedshiftTestDataComparator extends AdvancedTestDataComparator { + + private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftTestDataComparator.class); + + private final RedshiftSQLNameTransformer namingResolver = new RedshiftSQLNameTransformer(); + + protected static final String REDSHIFT_DATETIME_WITH_TZ_FORMAT = "yyyy-MM-dd HH:mm:ssX"; + + @Override + protected ZonedDateTime parseDestinationDateWithTz(String destinationValue) { + return ZonedDateTime.parse(destinationValue, DateTimeFormatter.ofPattern(REDSHIFT_DATETIME_WITH_TZ_FORMAT)).withZoneSameInstant(ZoneOffset.UTC); + } + + @Override + protected boolean compareDateTimeValues(String airbyteMessageValue, String destinationValue) { + try { + var format = DateTimeFormatter.ofPattern(AIRBYTE_DATETIME_FORMAT); + LocalDateTime dateTime = LocalDateTime.parse(destinationValue, DateTimeFormatter.ofPattern(REDSHIFT_DATETIME_WITH_TZ_FORMAT)); + return super.compareDateTimeValues(airbyteMessageValue, format.format(dateTime)); + } catch (DateTimeException e) { + LOGGER.warn("Fail to convert values to DateTime. Try to compare as text. Airbyte value({}), Destination value ({}). Exception: {}", + airbyteMessageValue, destinationValue, e); + return compareTextValues(airbyteMessageValue, destinationValue); + } + } + + @Override + protected List resolveIdentifier(final String identifier) { + final List result = new ArrayList<>(); + final String resolved = namingResolver.getIdentifier(identifier); + result.add(identifier); + result.add(resolved); + if (!resolved.startsWith("\"")) { + result.add(resolved.toLowerCase()); + result.add(resolved.toUpperCase()); + } + return result; + } + +} diff --git a/airbyte-integrations/connectors/destination-rockset/src/test-integration/java/io/airbyte/integrations/destination/rockset/RocksetDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-rockset/src/test-integration/java/io/airbyte/integrations/destination/rockset/RocksetDestinationAcceptanceTest.java index cfd96d02dffa..a0efe58576ac 100644 --- a/airbyte-integrations/connectors/destination-rockset/src/test-integration/java/io/airbyte/integrations/destination/rockset/RocksetDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-rockset/src/test-integration/java/io/airbyte/integrations/destination/rockset/RocksetDestinationAcceptanceTest.java @@ -18,6 +18,8 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator; +import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; @@ -54,6 +56,26 @@ protected JsonNode getConfig() throws IOException { return Jsons.deserialize(IOs.readFile(Path.of("secrets/config.json"))); } + @Override + protected TestDataComparator getTestDataComparator() { + return new AdvancedTestDataComparator(); + } + + @Override + protected boolean supportBasicDataTypeTest() { + return true; + } + + @Override + protected boolean supportArrayDataTypeTest() { + return true; + } + + @Override + protected boolean supportObjectDataTypeTest() { + return true; + } + @Override protected JsonNode getFailCheckConfig() throws Exception { return Jsons.jsonNode(