Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛Destination-clickhouse-strict-encrypt: enabled normalization tests #15069

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ dependencies {
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-clickhouse')
// https://mvnrepository.com/artifact/org.testcontainers/clickhouse
integrationTestJavaImplementation libs.connectors.destination.testcontainers.clickhouse
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DataTypeTestArgumentProvider;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import io.airbyte.integrations.util.HostPortResolver;
Expand All @@ -25,7 +26,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.BindMode;
Expand Down Expand Up @@ -56,7 +58,7 @@ private static JdbcDatabase getDatabase(final JsonNode config) {
config.get(JdbcUtils.USERNAME_KEY).asText(),
config.has(JdbcUtils.PASSWORD_KEY) ? config.get(JdbcUtils.PASSWORD_KEY).asText() : null,
ClickhouseDestination.DRIVER_CLASS,
jdbcStr));
jdbcStr), new ClickhouseTestSourceOperations());
}

@Override
Expand All @@ -66,7 +68,7 @@ protected String getImageName() {

@Override
protected boolean supportsNormalization() {
return false;
return true;
}

@Override
Expand Down Expand Up @@ -109,16 +111,15 @@ protected String getDefaultSchema(final JsonNode config) {

@Override
protected JsonNode getConfig() {
// Note: ClickHouse official JDBC driver uses HTTP protocol, its default port is 8123
// dbt clickhouse adapter uses native protocol, its default port is 9000
// Since we disabled normalization and dbt test, we only use the JDBC port here.
return Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, HostPortResolver.resolveIpAddress(db))
.put(JdbcUtils.TCP_PORT_KEY, NATIVE_SECURE_PORT)
.put(JdbcUtils.PORT_KEY, HTTPS_PORT)
.put(JdbcUtils.DATABASE_KEY, DB_NAME)
.put(JdbcUtils.USERNAME_KEY, USER_NAME)
.put(JdbcUtils.PASSWORD_KEY, "")
.put(JdbcUtils.SCHEMA_KEY, DB_NAME)
.put(JdbcUtils.SSL_KEY, true)
.build());
}

Expand Down Expand Up @@ -194,49 +195,20 @@ protected void tearDown(final TestDestinationEnv testEnv) {
db.close();
}

/**
* The SQL script generated by old version of dbt in 'test' step isn't compatible with ClickHouse,
* so we skip this test for now.
*
* Ref: https://github.com/dbt-labs/dbt-core/issues/3905
*
* @throws Exception
*/
@Disabled
public void testCustomDbtTransformations() throws Exception {
super.testCustomDbtTransformations();
}

@Disabled
public void testCustomDbtTransformationsFailure() throws Exception {}

/**
* The normalization container needs native port, while destination container needs HTTP port, we
* can't inject the port switch statement into DestinationAcceptanceTest.runSync() method for this
* test, so we skip it.
*
* @throws Exception
*/
@Disabled
public void testIncrementalDedupeSync() throws Exception {
super.testIncrementalDedupeSync();
}

/**
* The normalization container needs native port, while destination container needs HTTP port, we
* can't inject the port switch statement into DestinationAcceptanceTest.runSync() method for this
* test, so we skip it.
*
* @throws Exception
*/
@Disabled
public void testSyncWithNormalization(final String messagesFilename, final String catalogFilename) throws Exception {
super.testSyncWithNormalization(messagesFilename, catalogFilename);
}

@Disabled
public void specNormalizationValueShouldBeCorrect() throws Exception {
super.specNormalizationValueShouldBeCorrect();
@ParameterizedTest
@ArgumentsSource(DataTypeTestArgumentProvider.class)
public void testDataTypeTestWithNormalization(final String messagesFilename,
final String catalogFilename,
final DataTypeTestArgumentProvider.TestCompatibility testCompatibility)
throws Exception {

// arrays are not fully supported yet in jdbc driver
// https://github.com/ClickHouse/clickhouse-jdbc/blob/master/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/ClickHouseArray.java
if (messagesFilename.contains("array")) {
return;
}

super.testDataTypeTestWithNormalization(messagesFilename, catalogFilename, testCompatibility);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,32 @@

import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClickhouseTestDataComparator extends AdvancedTestDataComparator {

private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseTestDataComparator.class);
private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer();

private static final String CLICKHOUSE_DATETIME_WITH_TZ_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSX";

// https://clickhouse.com/docs/en/sql-reference/data-types/date32/
private final LocalDate minSupportedDate = LocalDate.parse("1970-01-01");
private final LocalDate maxSupportedDate = LocalDate.parse("2149-06-06");
private final ZonedDateTime minSupportedDateTime = ZonedDateTime.parse(
"1925-01-01T00:00:00.000Z");
private final ZonedDateTime maxSupportedDateTime = ZonedDateTime.parse(
"2283-11-10T20:23:45.000Z");

@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
Expand All @@ -26,4 +45,103 @@ protected List<String> resolveIdentifier(final String identifier) {
return result;
}

@Override
protected boolean compareNumericValues(final String firstNumericValue,
final String secondNumericValue) {
// clickhouse stores double 1.14 as 1.1400000000000001
// https://clickhouse.com/docs/en/sql-reference/data-types/float/
double epsilon = 0.000000000000001d;

double firstValue = Double.parseDouble(firstNumericValue);
double secondValue = Double.parseDouble(secondNumericValue);

return Math.abs(firstValue - secondValue) < epsilon;
}

@Override
protected boolean compareBooleanValues(final String firstValue, final String secondValue) {
return parseBool(firstValue) == parseBool(secondValue);
}

@Override
protected boolean compareDateValues(final String airbyteMessageValue,
final String destinationValue) {
final LocalDate expectedDate = LocalDate.parse(airbyteMessageValue);
final LocalDate actualDate = LocalDate.parse(destinationValue);

if (expectedDate.isBefore(minSupportedDate) || expectedDate.isAfter(maxSupportedDate)) {
// inserting any dates that are out of supported range causes registers overflow in clickhouseDB,
// so actually you end up with unpredicted values, more
// https://clickhouse.com/docs/en/sql-reference/data-types/date32
LOGGER.warn(
"Test value is out of range and would be corrupted by Snowflake, so we skip this verification");
return true;
}

return actualDate.equals(expectedDate);
}

@Override
protected boolean compareDateTimeWithTzValues(final String airbyteMessageValue,
final String destinationValue) {
try {
ZonedDateTime airbyteDate = ZonedDateTime.parse(airbyteMessageValue,
getAirbyteDateTimeWithTzFormatter()).withZoneSameInstant(ZoneOffset.UTC);
ZonedDateTime destinationDate = parseDestinationDateWithTz(destinationValue);

if (airbyteDate.isBefore(minSupportedDateTime) || airbyteDate.isAfter(maxSupportedDateTime)) {
// inserting any dates that are out of supported range causes registers overflow in clickhouseDB,
// so actually you end up with unpredicted values, more
// https://clickhouse.com/docs/en/sql-reference/data-types/datetime64
LOGGER.warn(
"Test value is out of range and would be corrupted by Snowflake, so we skip this verification");
return true;
}
return airbyteDate.equals(destinationDate);
} catch (DateTimeParseException e) {
LOGGER.warn(
"Fail to convert values to ZonedDateTime. Try to compare as text. Airbyte value({}), Destination value ({}). Exception: {}",
airbyteMessageValue, destinationValue, e);
return compareTextValues(airbyteMessageValue, destinationValue);
}
}

@Override
protected ZonedDateTime parseDestinationDateWithTz(final String destinationValue) {
return ZonedDateTime.parse(destinationValue,
DateTimeFormatter.ofPattern(CLICKHOUSE_DATETIME_WITH_TZ_FORMAT)).withZoneSameInstant(
ZoneOffset.UTC);
}

@Override
protected boolean compareDateTimeValues(final String airbyteMessageValue,
final String destinationValue) {
final LocalDateTime expectedDateTime = LocalDateTime.parse(airbyteMessageValue);
final LocalDateTime actualDateTime = LocalDateTime.parse(destinationValue,
DateTimeFormatter.ofPattern(CLICKHOUSE_DATETIME_WITH_TZ_FORMAT));

if (expectedDateTime.isBefore(minSupportedDateTime.toLocalDateTime())
|| expectedDateTime.isAfter(maxSupportedDateTime.toLocalDateTime())) {
// inserting any dates that are out of supported range causes registers overflow in clickhouseDB,
// so actually you end up with unpredicted values, more
// https://clickhouse.com/docs/en/sql-reference/data-types/datetime64
LOGGER.warn(
"Test value is out of range and would be corrupted by Snowflake, so we skip this verification");
return true;
}

return expectedDateTime.equals(actualDateTime);
}

private boolean parseBool(final String valueAsString) {
// boolen as a String may be returned as true\false and as 0\1
// https://clickhouse.com/docs/en/sql-reference/data-types/boolean
try {
return Integer.parseInt(valueAsString) > 0;
} catch (final NumberFormatException ex) {
return Boolean.parseBoolean(valueAsString);
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.clickhouse;

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.db.DataTypeUtils;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

public class ClickhouseTestSourceOperations extends JdbcSourceOperations {

@Override
protected void putDate(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, DateTimeFormatter.ISO_DATE.format(resultSet.getTimestamp(index).toLocalDateTime()));
}

@Override
protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class);
final LocalDate date = timestamp.toLocalDate();

DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(
DataTypeUtils.DATE_FORMAT_WITH_MILLISECONDS_PATTERN);

node.put(columnName, resolveEra(date, timestamp.format(dateTimeFormatter)));
}

}