Skip to content

Commit

Permalink
Pubsub, Pulsar, Redis, Redshift, Rocket destinations : Enable DAT tes…
Browse files Browse the repository at this point in the history
…ts (#12143)

* enable DAT tests for Pulsar

* Enable DAT test for pubsub, redis, redshift, rocket

* format

* fix normalized data fetch

* cover "other" result type for arrays

* remove deserialization because now we have already parsed node

* fix bugspot

* fix unicode case
  • Loading branch information
DoNotPanicUA authored Apr 28, 2022
1 parent 31485d7 commit aab1533
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -91,6 +93,26 @@ private AirbyteStreamNameNamespacePair fromJsonNode(final JsonNode j) {
return new AirbyteStreamNameNamespacePair(stream, namespace);
}

@Override
protected TestDataComparator getTestDataComparator() {
return new AdvancedTestDataComparator();
}

@Override
protected boolean supportBasicDataTypeTest() {
return true;
}

@Override
protected boolean supportArrayDataTypeTest() {
return true;
}

@Override
protected boolean supportObjectDataTypeTest() {
return true;
}

@Override
protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
final String streamName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -69,7 +70,7 @@ protected void acceptTracked(final AirbyteMessage airbyteMessage) {
.set(PulsarDestination.COLUMN_NAME_AB_ID, key)
.set(PulsarDestination.COLUMN_NAME_STREAM, recordMessage.getStream())
.set(PulsarDestination.COLUMN_NAME_EMITTED_AT, recordMessage.getEmittedAt())
.set(PulsarDestination.COLUMN_NAME_DATA, recordMessage.getData().toString().getBytes())
.set(PulsarDestination.COLUMN_NAME_DATA, recordMessage.getData().toString().getBytes(StandardCharsets.UTF_8))
.build();

sendRecord(producer, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
Expand Down Expand Up @@ -103,6 +105,26 @@ protected String getDefaultSchema(final JsonNode config) {
return "";
}

@Override
protected TestDataComparator getTestDataComparator() {
return new AdvancedTestDataComparator();
}

@Override
protected boolean supportBasicDataTypeTest() {
return true;
}

@Override
protected boolean supportArrayDataTypeTest() {
return true;
}

@Override
protected boolean supportObjectDataTypeTest() {
return true;
}

@Override
protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv testEnv, final String streamName, final String namespace)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -68,6 +70,26 @@ protected boolean implementsNamespaces() {
return true;
}

@Override
protected TestDataComparator getTestDataComparator() {
return new AdvancedTestDataComparator();
}

@Override
protected boolean supportBasicDataTypeTest() {
return true;
}

@Override
protected boolean supportArrayDataTypeTest() {
return true;
}

@Override
protected boolean supportObjectDataTypeTest() {
return true;
}

@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv,
String streamName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,33 @@
package io.airbyte.integrations.destination.redshift;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Integration test testing {@link RedshiftCopyS3Destination}. The default Redshift integration test
* credentials contain S3 credentials - this automatically causes COPY to be selected.
*/
public class RedshiftCopyDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftCopyDestinationAcceptanceTest.class);

// config from which to create / delete schemas.
private JsonNode baseConfig;
// config which refers to the schema that the test is being run in.
Expand All @@ -34,6 +40,8 @@ public class RedshiftCopyDestinationAcceptanceTest extends DestinationAcceptance

protected TestDestinationEnv testDestinationEnv;

private final ObjectMapper mapper = new ObjectMapper();

@Override
protected String getImageName() {
return "airbyte/destination-redshift:dev";
Expand All @@ -55,6 +63,26 @@ protected JsonNode getFailCheckConfig() {
return invalidConfig;
}

@Override
protected TestDataComparator getTestDataComparator() {
return new RedshiftTestDataComparator();
}

@Override
protected boolean supportBasicDataTypeTest() {
return true;
}

@Override
protected boolean supportArrayDataTypeTest() {
return true;
}

@Override
protected boolean supportObjectDataTypeTest() {
return true;
}

@Override
protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
final String streamName,
Expand All @@ -63,7 +91,7 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
throws Exception {
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace)
.stream()
.map(j -> Jsons.deserialize(j.get(JavaBaseConstants.COLUMN_NAME_DATA).asText()))
.map(j -> j.get(JavaBaseConstants.COLUMN_NAME_DATA))
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -93,26 +121,35 @@ protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv test
return retrieveRecordsFromTable(tableName, namespace);
}

@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
result.add(resolved.toUpperCase());
}
return result;
private JsonNode getJsonFromRecord(Record record) {
ObjectNode node = mapper.createObjectNode();

Arrays.stream(record.fields()).forEach(field -> {
var value = record.get(field);

switch (field.getDataType().getTypeName()) {
case "varchar", "other":
var stringValue = (value != null ? value.toString() : null);
if (stringValue != null && (stringValue.replaceAll("[^\\x00-\\x7F]", "").matches("^\\[.*\\]$")
|| stringValue.replaceAll("[^\\x00-\\x7F]", "").matches("^\\{.*\\}$"))) {
node.set(field.getName(), Jsons.deserialize(stringValue));
} else {
node.put(field.getName(), stringValue);
}
break;
default:
node.put(field.getName(), (value != null ? value.toString() : null));
}
});
return node;
}

private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException {
return getDatabase().query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.map(this::getJsonFromRecord)
.collect(Collectors.toList()));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.redshift;

import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import java.time.DateTimeException;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedshiftTestDataComparator extends AdvancedTestDataComparator {

private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftTestDataComparator.class);

private final RedshiftSQLNameTransformer namingResolver = new RedshiftSQLNameTransformer();

protected static final String REDSHIFT_DATETIME_WITH_TZ_FORMAT = "yyyy-MM-dd HH:mm:ssX";

@Override
protected ZonedDateTime parseDestinationDateWithTz(String destinationValue) {
return ZonedDateTime.parse(destinationValue, DateTimeFormatter.ofPattern(REDSHIFT_DATETIME_WITH_TZ_FORMAT)).withZoneSameInstant(ZoneOffset.UTC);
}

@Override
protected boolean compareDateTimeValues(String airbyteMessageValue, String destinationValue) {
try {
var format = DateTimeFormatter.ofPattern(AIRBYTE_DATETIME_FORMAT);
LocalDateTime dateTime = LocalDateTime.parse(destinationValue, DateTimeFormatter.ofPattern(REDSHIFT_DATETIME_WITH_TZ_FORMAT));
return super.compareDateTimeValues(airbyteMessageValue, format.format(dateTime));
} catch (DateTimeException e) {
LOGGER.warn("Fail to convert values to DateTime. Try to compare as text. Airbyte value({}), Destination value ({}). Exception: {}",
airbyteMessageValue, destinationValue, e);
return compareTextValues(airbyteMessageValue, destinationValue);
}
}

@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
result.add(resolved.toUpperCase());
}
return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
Expand Down Expand Up @@ -54,6 +56,26 @@ protected JsonNode getConfig() throws IOException {
return Jsons.deserialize(IOs.readFile(Path.of("secrets/config.json")));
}

@Override
protected TestDataComparator getTestDataComparator() {
return new AdvancedTestDataComparator();
}

@Override
protected boolean supportBasicDataTypeTest() {
return true;
}

@Override
protected boolean supportArrayDataTypeTest() {
return true;
}

@Override
protected boolean supportObjectDataTypeTest() {
return true;
}

@Override
protected JsonNode getFailCheckConfig() throws Exception {
return Jsons.jsonNode(
Expand Down

0 comments on commit aab1533

Please sign in to comment.