Skip to content

Commit

Permalink
BigQuery destination : Enable DAT tests (#13155)
Browse files Browse the repository at this point in the history
* Enable basic DAT tests

* adopt basic tests + avoid DateTime issue #13123

* disable array tests due to issue #13154

* format

* DON"T MERGE

* fix emitted_at

Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>

* Revert "fix emitted_at"

This reverts commit e752a24.

* Revert "DON"T MERGE"

This reverts commit dc2806b.

* google format

Co-authored-by: Sergey Chvalyuk <grubberr@gmail.com>
  • Loading branch information
DoNotPanicUA and grubberr authored May 27, 2022
1 parent 3dcda7a commit 87847ba
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies {
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs)
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-bigquery')
integrationTestJavaImplementation project(':airbyte-db:lib')

implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,37 +11,37 @@
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.ConnectionProperty;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.FieldValue;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Streams;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.bigquery.BigQueryResultSet;
import io.airbyte.db.bigquery.BigQuerySourceOperations;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -100,15 +100,36 @@ protected boolean supportNamespaceTest() {
return true;
}

@Override
protected TestDataComparator getTestDataComparator() {
return new BigQueryTestDataComparator();
}

@Override
protected boolean supportBasicDataTypeTest() {
return true;
}

@Override
protected boolean supportArrayDataTypeTest() {
// #13154 Normalization issue
return false;
}

@Override
protected boolean supportObjectDataTypeTest() {
return true;
}

@Override
protected Optional<NamingConventionTransformer> getNameTransformer() {
return Optional.of(NAME_TRANSFORMER);
}

@Override
protected void assertNamespaceNormalization(final String testCaseId,
final String expectedNormalizedNamespace,
final String actualNormalizedNamespace) {
final String expectedNormalizedNamespace,
final String actualNormalizedNamespace) {
final String message = String.format("Test case %s failed; if this is expected, please override assertNamespaceNormalization", testCaseId);
if (testCaseId.equals("S3A-1")) {
// bigquery allows namespace starting with a number, and prepending underscore
Expand All @@ -134,9 +155,9 @@ protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv test

@Override
protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
final String streamName,
final String namespace,
final JsonNode streamSchema)
final String streamName,
final String namespace,
final JsonNode streamSchema)
throws Exception {
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namingResolver.getIdentifier(namespace))
.stream()
Expand All @@ -145,52 +166,24 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
.collect(Collectors.toList());
}

@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
result.add(identifier);
result.add(namingResolver.getIdentifier(identifier));
return result;
}

private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schema) throws InterruptedException {
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));

final QueryJobConfiguration queryConfig =
QueryJobConfiguration
.newBuilder(
String.format("SELECT * FROM `%s`.`%s` order by %s asc;", schema, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.setUseLegacySql(false).build();
.setUseLegacySql(false)
.setConnectionProperties(Collections.singletonList(ConnectionProperty.of("time_zone", "UTC")))
.build();

final TableResult queryResults = executeQuery(bigquery, queryConfig).getLeft().getQueryResults();
final FieldList fields = queryResults.getSchema().getFields();
BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations();

return StreamSupport
.stream(queryResults.iterateAll().spliterator(), false)
.map(row -> {
final Map<String, Object> jsonMap = Maps.newHashMap();
for (final Field field : fields) {
final Object value = getTypedFieldValue(row, field);
jsonMap.put(field.getName(), value);
}
return jsonMap;
})
.map(Jsons::jsonNode)
.collect(Collectors.toList());
}

private Object getTypedFieldValue(final FieldValueList row, final Field field) {
final FieldValue fieldValue = row.get(field.getName());
if (fieldValue.getValue() != null) {
return switch (field.getType().getStandardType()) {
case FLOAT64, NUMERIC -> fieldValue.getDoubleValue();
case INT64 -> fieldValue.getNumericValue().intValue();
case STRING -> fieldValue.getStringValue();
case BOOL -> fieldValue.getBooleanValue();
default -> fieldValue.getValue();
};
} else {
return null;
}
return Streams.stream(queryResults.iterateAll())
.map(fieldValues -> sourceOperations.rowToJson(new BigQueryResultSet(fieldValues, fields))).collect(Collectors.toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ class BigQueryDestinationTest {
private static Stream<Arguments> datasetIdResetterProvider() {
// parameterized test with two dataset-id patterns: `dataset_id` and `project-id:dataset_id`
return Stream.of(
Arguments.arguments(new DatasetIdResetter(config -> {})),
Arguments.arguments(new DatasetIdResetter(config -> {
})),
Arguments.arguments(new DatasetIdResetter(
config -> {
final String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText();
Expand Down Expand Up @@ -152,9 +153,9 @@ void setup(final TestInfo info) throws IOException {

catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, datasetId,
io.airbyte.protocol.models.Field.of("name", JsonSchemaType.STRING),
io.airbyte.protocol.models.Field
.of("id", JsonSchemaType.STRING))
io.airbyte.protocol.models.Field.of("name", JsonSchemaType.STRING),
io.airbyte.protocol.models.Field
.of("id", JsonSchemaType.STRING))
.withDestinationSyncMode(DestinationSyncMode.APPEND),
CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, datasetId, Field.of("goal", JsonSchemaType.STRING))));

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.bigquery;

import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryTestDataComparator extends AdvancedTestDataComparator {

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryTestDataComparator.class);
private final StandardNameTransformer namingResolver = new StandardNameTransformer();

private static final String BIGQUERY_DATETIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'";

@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
result.add(identifier);
result.add(namingResolver.getIdentifier(identifier));
return result;
}

private LocalDate parseDate(String dateValue) {
if (dateValue != null) {
var format = (dateValue.matches(".+Z") ? BIGQUERY_DATETIME_FORMAT : AIRBYTE_DATE_FORMAT);
return LocalDate.parse(dateValue, DateTimeFormatter.ofPattern(format));
} else {
return null;
}
}

private LocalDateTime parseDateTime(String dateTimeValue) {
if (dateTimeValue != null) {
var format = (dateTimeValue.matches(".+Z") ? BIGQUERY_DATETIME_FORMAT : AIRBYTE_DATETIME_FORMAT);
return LocalDateTime.parse(dateTimeValue, DateTimeFormatter.ofPattern(format));
} else {
return null;
}
}

@Override
protected boolean compareDateTimeValues(String expectedValue, String actualValue) {
var destinationDate = parseDateTime(actualValue);
var expectedDate = LocalDateTime.parse(expectedValue, DateTimeFormatter.ofPattern(AIRBYTE_DATETIME_FORMAT));
// #13123 Normalization issue
if (expectedDate.isBefore(getBrokenDate().toLocalDateTime())) {
LOGGER
.warn("Validation is skipped due to known Normalization issue. Values older then 1583 year and with time zone stored wrongly(lose days).");
return true;
} else {
return expectedDate.equals(destinationDate);
}
}

@Override
protected boolean compareDateValues(String expectedValue, String actualValue) {
var destinationDate = parseDate(actualValue);
var expectedDate = LocalDate.parse(expectedValue, DateTimeFormatter.ofPattern(AIRBYTE_DATE_FORMAT));
return expectedDate.equals(destinationDate);
}

@Override
protected ZonedDateTime parseDestinationDateWithTz(String destinationValue) {
return ZonedDateTime.of(LocalDateTime.parse(destinationValue, DateTimeFormatter.ofPattern(BIGQUERY_DATETIME_FORMAT)), ZoneOffset.UTC);
}

@Override
protected boolean compareDateTimeWithTzValues(String airbyteMessageValue, String destinationValue) {
// #13123 Normalization issue
if (parseDestinationDateWithTz(destinationValue).isBefore(getBrokenDate())) {
LOGGER
.warn("Validation is skipped due to known Normalization issue. Values older then 1583 year and with time zone stored wrongly(lose days).");
return true;
} else {
return super.compareDateTimeWithTzValues(airbyteMessageValue, destinationValue);
}
}

// #13123 Normalization issue
private ZonedDateTime getBrokenDate() {
return ZonedDateTime.of(1583, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC);
}

}

0 comments on commit 87847ba

Please sign in to comment.