diff --git a/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/build.gradle b/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/build.gradle index 31b2bcf7241e..ece5ce8ecf7e 100644 --- a/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/build.gradle +++ b/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/build.gradle @@ -26,4 +26,5 @@ dependencies { integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-clickhouse') // https://mvnrepository.com/artifact/org.testcontainers/clickhouse integrationTestJavaImplementation libs.connectors.destination.testcontainers.clickhouse + integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs) } diff --git a/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationStrictEncryptAcceptanceTest.java b/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationStrictEncryptAcceptanceTest.java index c275ca914410..af8a7674ecc5 100644 --- a/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationStrictEncryptAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationStrictEncryptAcceptanceTest.java @@ -17,6 +17,7 @@ import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.ExtendedNameTransformer; +import io.airbyte.integrations.standardtest.destination.DataTypeTestArgumentProvider; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; import io.airbyte.integrations.util.HostPortResolver; @@ -25,7 +26,8 @@ import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; -import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ArgumentsSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.BindMode; @@ -56,7 +58,7 @@ private static JdbcDatabase getDatabase(final JsonNode config) { config.get(JdbcUtils.USERNAME_KEY).asText(), config.has(JdbcUtils.PASSWORD_KEY) ? config.get(JdbcUtils.PASSWORD_KEY).asText() : null, ClickhouseDestination.DRIVER_CLASS, - jdbcStr)); + jdbcStr), new ClickhouseTestSourceOperations()); } @Override @@ -66,7 +68,7 @@ protected String getImageName() { @Override protected boolean supportsNormalization() { - return false; + return true; } @Override @@ -109,16 +111,15 @@ protected String getDefaultSchema(final JsonNode config) { @Override protected JsonNode getConfig() { - // Note: ClickHouse official JDBC driver uses HTTP protocol, its default port is 8123 - // dbt clickhouse adapter uses native protocol, its default port is 9000 - // Since we disabled normalization and dbt test, we only use the JDBC port here. return Jsons.jsonNode(ImmutableMap.builder() .put(JdbcUtils.HOST_KEY, HostPortResolver.resolveIpAddress(db)) + .put(JdbcUtils.TCP_PORT_KEY, NATIVE_SECURE_PORT) .put(JdbcUtils.PORT_KEY, HTTPS_PORT) .put(JdbcUtils.DATABASE_KEY, DB_NAME) .put(JdbcUtils.USERNAME_KEY, USER_NAME) .put(JdbcUtils.PASSWORD_KEY, "") .put(JdbcUtils.SCHEMA_KEY, DB_NAME) + .put(JdbcUtils.SSL_KEY, true) .build()); } @@ -194,49 +195,20 @@ protected void tearDown(final TestDestinationEnv testEnv) { db.close(); } - /** - * The SQL script generated by old version of dbt in 'test' step isn't compatible with ClickHouse, - * so we skip this test for now. - * - * Ref: https://github.com/dbt-labs/dbt-core/issues/3905 - * - * @throws Exception - */ - @Disabled - public void testCustomDbtTransformations() throws Exception { - super.testCustomDbtTransformations(); - } - - @Disabled - public void testCustomDbtTransformationsFailure() throws Exception {} - - /** - * The normalization container needs native port, while destination container needs HTTP port, we - * can't inject the port switch statement into DestinationAcceptanceTest.runSync() method for this - * test, so we skip it. - * - * @throws Exception - */ - @Disabled - public void testIncrementalDedupeSync() throws Exception { - super.testIncrementalDedupeSync(); - } - - /** - * The normalization container needs native port, while destination container needs HTTP port, we - * can't inject the port switch statement into DestinationAcceptanceTest.runSync() method for this - * test, so we skip it. - * - * @throws Exception - */ - @Disabled - public void testSyncWithNormalization(final String messagesFilename, final String catalogFilename) throws Exception { - super.testSyncWithNormalization(messagesFilename, catalogFilename); - } - - @Disabled - public void specNormalizationValueShouldBeCorrect() throws Exception { - super.specNormalizationValueShouldBeCorrect(); + @ParameterizedTest + @ArgumentsSource(DataTypeTestArgumentProvider.class) + public void testDataTypeTestWithNormalization(final String messagesFilename, + final String catalogFilename, + final DataTypeTestArgumentProvider.TestCompatibility testCompatibility) + throws Exception { + + // arrays are not fully supported yet in jdbc driver + // https://github.com/ClickHouse/clickhouse-jdbc/blob/master/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/ClickHouseArray.java + if (messagesFilename.contains("array")) { + return; + } + + super.testDataTypeTestWithNormalization(messagesFilename, catalogFilename, testCompatibility); } } diff --git a/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseTestDataComparator.java b/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseTestDataComparator.java index 00e64e6256ec..96c343f8394c 100644 --- a/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseTestDataComparator.java +++ b/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseTestDataComparator.java @@ -6,13 +6,32 @@ import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ClickhouseTestDataComparator extends AdvancedTestDataComparator { + private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseTestDataComparator.class); private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer(); + private static final String CLICKHOUSE_DATETIME_WITH_TZ_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSX"; + + // https://clickhouse.com/docs/en/sql-reference/data-types/date32/ + private final LocalDate minSupportedDate = LocalDate.parse("1970-01-01"); + private final LocalDate maxSupportedDate = LocalDate.parse("2149-06-06"); + private final ZonedDateTime minSupportedDateTime = ZonedDateTime.parse( + "1925-01-01T00:00:00.000Z"); + private final ZonedDateTime maxSupportedDateTime = ZonedDateTime.parse( + "2283-11-10T20:23:45.000Z"); + @Override protected List resolveIdentifier(final String identifier) { final List result = new ArrayList<>(); @@ -26,4 +45,103 @@ protected List resolveIdentifier(final String identifier) { return result; } + @Override + protected boolean compareNumericValues(final String firstNumericValue, + final String secondNumericValue) { + // clickhouse stores double 1.14 as 1.1400000000000001 + // https://clickhouse.com/docs/en/sql-reference/data-types/float/ + double epsilon = 0.000000000000001d; + + double firstValue = Double.parseDouble(firstNumericValue); + double secondValue = Double.parseDouble(secondNumericValue); + + return Math.abs(firstValue - secondValue) < epsilon; + } + + @Override + protected boolean compareBooleanValues(final String firstValue, final String secondValue) { + return parseBool(firstValue) == parseBool(secondValue); + } + + @Override + protected boolean compareDateValues(final String airbyteMessageValue, + final String destinationValue) { + final LocalDate expectedDate = LocalDate.parse(airbyteMessageValue); + final LocalDate actualDate = LocalDate.parse(destinationValue); + + if (expectedDate.isBefore(minSupportedDate) || expectedDate.isAfter(maxSupportedDate)) { + // inserting any dates that are out of supported range causes registers overflow in clickhouseDB, + // so actually you end up with unpredicted values, more + // https://clickhouse.com/docs/en/sql-reference/data-types/date32 + LOGGER.warn( + "Test value is out of range and would be corrupted by Snowflake, so we skip this verification"); + return true; + } + + return actualDate.equals(expectedDate); + } + + @Override + protected boolean compareDateTimeWithTzValues(final String airbyteMessageValue, + final String destinationValue) { + try { + ZonedDateTime airbyteDate = ZonedDateTime.parse(airbyteMessageValue, + getAirbyteDateTimeWithTzFormatter()).withZoneSameInstant(ZoneOffset.UTC); + ZonedDateTime destinationDate = parseDestinationDateWithTz(destinationValue); + + if (airbyteDate.isBefore(minSupportedDateTime) || airbyteDate.isAfter(maxSupportedDateTime)) { + // inserting any dates that are out of supported range causes registers overflow in clickhouseDB, + // so actually you end up with unpredicted values, more + // https://clickhouse.com/docs/en/sql-reference/data-types/datetime64 + LOGGER.warn( + "Test value is out of range and would be corrupted by Snowflake, so we skip this verification"); + return true; + } + return airbyteDate.equals(destinationDate); + } catch (DateTimeParseException e) { + LOGGER.warn( + "Fail to convert values to ZonedDateTime. Try to compare as text. Airbyte value({}), Destination value ({}). Exception: {}", + airbyteMessageValue, destinationValue, e); + return compareTextValues(airbyteMessageValue, destinationValue); + } + } + + @Override + protected ZonedDateTime parseDestinationDateWithTz(final String destinationValue) { + return ZonedDateTime.parse(destinationValue, + DateTimeFormatter.ofPattern(CLICKHOUSE_DATETIME_WITH_TZ_FORMAT)).withZoneSameInstant( + ZoneOffset.UTC); + } + + @Override + protected boolean compareDateTimeValues(final String airbyteMessageValue, + final String destinationValue) { + final LocalDateTime expectedDateTime = LocalDateTime.parse(airbyteMessageValue); + final LocalDateTime actualDateTime = LocalDateTime.parse(destinationValue, + DateTimeFormatter.ofPattern(CLICKHOUSE_DATETIME_WITH_TZ_FORMAT)); + + if (expectedDateTime.isBefore(minSupportedDateTime.toLocalDateTime()) + || expectedDateTime.isAfter(maxSupportedDateTime.toLocalDateTime())) { + // inserting any dates that are out of supported range causes registers overflow in clickhouseDB, + // so actually you end up with unpredicted values, more + // https://clickhouse.com/docs/en/sql-reference/data-types/datetime64 + LOGGER.warn( + "Test value is out of range and would be corrupted by Snowflake, so we skip this verification"); + return true; + } + + return expectedDateTime.equals(actualDateTime); + } + + private boolean parseBool(final String valueAsString) { + // boolen as a String may be returned as true\false and as 0\1 + // https://clickhouse.com/docs/en/sql-reference/data-types/boolean + try { + return Integer.parseInt(valueAsString) > 0; + } catch (final NumberFormatException ex) { + return Boolean.parseBoolean(valueAsString); + } + + } + } diff --git a/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseTestSourceOperations.java b/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseTestSourceOperations.java new file mode 100644 index 000000000000..cc90ad1e528f --- /dev/null +++ b/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseTestSourceOperations.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.clickhouse; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.db.DataTypeUtils; +import io.airbyte.db.jdbc.JdbcSourceOperations; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; + +public class ClickhouseTestSourceOperations extends JdbcSourceOperations { + + @Override + protected void putDate(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { + node.put(columnName, DateTimeFormatter.ISO_DATE.format(resultSet.getTimestamp(index).toLocalDateTime())); + } + + @Override + protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { + final LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class); + final LocalDate date = timestamp.toLocalDate(); + + DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern( + DataTypeUtils.DATE_FORMAT_WITH_MILLISECONDS_PATTERN); + + node.put(columnName, resolveEra(date, timestamp.format(dateTimeFormatter))); + } + +}