Skip to content

Commit

Permalink
šŸ› Destination BigQuery-denormalized: Added conversion from JSON Datetā€¦
Browse files Browse the repository at this point in the history
ā€¦ime to BigQuery format (#7413)
  • Loading branch information
alexandertsukanov authored Nov 3, 2021
1 parent f2ab450 commit a915034
Show file tree
Hide file tree
Showing 10 changed files with 327 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "079d5540-f236-4294-ba7c-ade8fd918496",
"name": "BigQuery (denormalized typed struct)",
"dockerRepository": "airbyte/destination-bigquery-denormalized",
"dockerImageTag": "0.1.6",
"dockerImageTag": "0.1.8",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery"
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
- name: BigQuery (denormalized typed struct)
destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
dockerRepository: airbyte/destination-bigquery-denormalized
dockerImageTag: 0.1.7
dockerImageTag: 0.1.8
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
- name: Chargify (Keen)
destinationDefinitionId: 81740ce8-d764-4ea7-94df-16bb41de36ae
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# BigQuery Denormalized Destination Connector Bootstrap

Instead of splitting the final data into multiple tables, this destination leverages BigQuery capabilities with [Structured and Repeated fields](https://cloud.google.com/bigquery/docs/nested-repeated) to produce a single "big" table per stream. This does not write the `_airbyte_raw_*` tables in the destination and normalization from this connector is not supported at this time.

See [this](https://docs.airbyte.io/integrations/destinations/databricks) link for the nuances about the connector.
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ public class BigQueryDenormalizedRecordConsumer extends BigQueryRecordConsumer {
private final Set<String> invalidKeys;

public BigQueryDenormalizedRecordConsumer(final BigQuery bigquery,
final Map<AirbyteStreamNameNamespacePair, BigQueryWriteConfig> writeConfigs,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector,
final StandardNameTransformer namingResolver) {
final Map<AirbyteStreamNameNamespacePair, BigQueryWriteConfig> writeConfigs,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector,
final StandardNameTransformer namingResolver) {
super(bigquery, writeConfigs, catalog, outputRecordCollector, false, false);
this.namingResolver = namingResolver;
invalidKeys = new HashSet<>();
Expand All @@ -59,6 +59,7 @@ protected JsonNode formatRecord(final Schema schema, final AirbyteRecordMessage
final ObjectNode data = (ObjectNode) formatData(schema.getFields(), recordMessage.getData());
data.put(JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString());
data.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, formattedEmittedAt);

return data;
}

Expand All @@ -67,6 +68,10 @@ protected JsonNode formatData(final FieldList fields, final JsonNode root) {
if (fields == null) {
return root;
}
List<String> dateTimeFields = BigQueryUtils.getDateTimeFieldsFromSchema(fields);
if (!dateTimeFields.isEmpty()) {
BigQueryUtils.transformJsonDateTimeToBigDataFormat(dateTimeFields, (ObjectNode) root);
}
if (root.isObject()) {
final List<String> fieldNames = fields.stream().map(Field::getName).collect(Collectors.toList());
return Jsons.jsonNode(Jsons.keys(root).stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.destination.bigquery;

import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.params.provider.Arguments.arguments;

Expand Down Expand Up @@ -42,6 +43,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.joda.time.DateTime;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -73,6 +75,10 @@ class BigQueryDenormalizedDestinationTest {
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(getDataWithFormats())
.withEmittedAt(NOW.toEpochMilli()));
private static final AirbyteMessage MESSAGE_USERS4 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(getDataWithJSONDateTimeFormats())
.withEmittedAt(NOW.toEpochMilli()));

private JsonNode config;

Expand Down Expand Up @@ -109,6 +115,7 @@ void setup(final TestInfo info) throws IOException {
MESSAGE_USERS1.getRecord().setNamespace(datasetId);
MESSAGE_USERS2.getRecord().setNamespace(datasetId);
MESSAGE_USERS3.getRecord().setNamespace(datasetId);
MESSAGE_USERS4.getRecord().setNamespace(datasetId);

final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(datasetLocation).build();
dataset = bigquery.create(datasetInfo);
Expand Down Expand Up @@ -199,7 +206,7 @@ void testWriteWithFormat() throws Exception {

// Bigquery's datetime type accepts multiple input format but always outputs the same, so we can't
// expect to receive the value we sent.
assertEquals(extractJsonValues(resultJson, "updated_at"), Set.of("2018-08-19T12:11:35.220"));
assertEquals(extractJsonValues(resultJson, "updated_at"), Set.of("2021-10-11T06:36:53"));

final Schema expectedSchema = Schema.of(
Field.of("name", StandardSQLTypeName.STRING),
Expand All @@ -211,6 +218,29 @@ void testWriteWithFormat() throws Exception {
assertEquals(BigQueryUtils.getTableDefinition(bigquery, dataset.getDatasetId().getDataset(), USERS_STREAM_NAME).getSchema(), expectedSchema);
}

@Test
void testIfJSONDateTimeWasConvertedToBigQueryFormat() throws Exception {
catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream()
.withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(getSchemaWithDateTime()))
.withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE)));

final BigQueryDestination destination = new BigQueryDenormalizedDestination();
final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector);

consumer.accept(MESSAGE_USERS4);
consumer.close();

final List<JsonNode> usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME);
assertEquals(usersActual.size(), 1);
final JsonNode resultJson = usersActual.get(0);

// BigQuery Accepts "YYYY-MM-DD HH:MM:SS[.SSSSSS]" format
// returns "yyyy-MM-dd'T'HH:mm:ss" format
assertEquals(Set.of(new DateTime("2021-10-11T06:36:53+00:00").toString("yyyy-MM-dd'T'HH:mm:ss")), extractJsonValues(resultJson, "updated_at"));
//check nested datetime
assertEquals(Set.of(new DateTime("2021-11-11T06:36:53+00:00").toString("yyyy-MM-dd'T'HH:mm:ss")), extractJsonValues(resultJson.get("items"), "nested_datetime"));
}

private Set<String> extractJsonValues(final JsonNode node, final String attributeName) {
final List<JsonNode> valuesNode = node.findValues(attributeName);
final Set<String> resultSet = new HashSet<>();
Expand All @@ -233,7 +263,6 @@ private List<JsonNode> retrieveRecordsAsJson(final String tableName) throws Exce
.newBuilder(
String.format("select TO_JSON_STRING(t) as jsonValue from %s.%s t;", dataset.getDatasetId().getDataset(), tableName.toLowerCase()))
.setUseLegacySql(false).build();

BigQueryUtils.executeQuery(bigquery, queryConfig);

return StreamSupport
Expand All @@ -249,171 +278,4 @@ private static Stream<Arguments> schemaAndDataProvider() {
arguments(getSchemaWithInvalidArrayType(), MESSAGE_USERS1),
arguments(getSchema(), MESSAGE_USERS2));
}

private static JsonNode getSchema() {
return Jsons.deserialize(
"{\n"
+ " \"type\": [\n"
+ " \"object\"\n"
+ " ],\n"
+ " \"properties\": {\n"
+ " \"name\": {\n"
+ " \"type\": [\n"
+ " \"string\"\n"
+ " ]\n"
+ " },\n"
+ " \"permissions\": {\n"
+ " \"type\": [\n"
+ " \"array\"\n"
+ " ],\n"
+ " \"items\": {\n"
+ " \"type\": [\n"
+ " \"object\"\n"
+ " ],\n"
+ " \"properties\": {\n"
+ " \"domain\": {\n"
+ " \"type\": [\n"
+ " \"string\"\n"
+ " ]\n"
+ " },\n"
+ " \"grants\": {\n"
+ " \"type\": [\n"
+ " \"array\"\n"
+ " ],\n"
+ " \"items\": {\n"
+ " \"type\": [\n"
+ " \"string\"\n"
+ " ]\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ "}");

}

private static JsonNode getSchemaWithFormats() {
return Jsons.deserialize(
"{\n"
+ " \"type\": [\n"
+ " \"object\"\n"
+ " ],\n"
+ " \"properties\": {\n"
+ " \"name\": {\n"
+ " \"type\": [\n"
+ " \"string\"\n"
+ " ]\n"
+ " },\n"
+ " \"date_of_birth\": {\n"
+ " \"type\": [\n"
+ " \"string\"\n"
+ " ],\n"
+ " \"format\": \"date\"\n"
+ " },\n"
+ " \"updated_at\": {\n"
+ " \"type\": [\n"
+ " \"string\"\n"
+ " ],\n"
+ " \"format\": \"date-time\"\n"
+ " }\n"
+ " }\n"
+ "}");
}

private static JsonNode getSchemaWithInvalidArrayType() {
return Jsons.deserialize(
"{\n"
+ " \"type\": [\n"
+ " \"object\"\n"
+ " ],\n"
+ " \"properties\": {\n"
+ " \"name\": {\n"
+ " \"type\": [\n"
+ " \"string\"\n"
+ " ]\n"
+ " },\n"
+ " \"permissions\": {\n"
+ " \"type\": [\n"
+ " \"array\"\n"
+ " ],\n"
+ " \"items\": {\n"
+ " \"type\": [\n"
+ " \"object\"\n"
+ " ],\n"
+ " \"properties\": {\n"
+ " \"domain\": {\n"
+ " \"type\": [\n"
+ " \"string\"\n"
+ " ]\n"
+ " },\n"
+ " \"grants\": {\n"
+ " \"type\": [\n"
+ " \"array\"\n" // missed "items" element
+ " ]\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ "}");

}

private static JsonNode getData() {
return Jsons.deserialize(
"{\n"
+ " \"name\": \"Andrii\",\n"
+ " \"permissions\": [\n"
+ " {\n"
+ " \"domain\": \"abs\",\n"
+ " \"grants\": [\n"
+ " \"admin\"\n"
+ " ]\n"
+ " },\n"
+ " {\n"
+ " \"domain\": \"tools\",\n"
+ " \"grants\": [\n"
+ " \"read\", \"write\"\n"
+ " ]\n"
+ " }\n"
+ " ]\n"
+ "}");
}

private static JsonNode getDataWithFormats() {
return Jsons.deserialize(
"{\n"
+ " \"name\": \"Andrii\",\n"
+ " \"date_of_birth\": \"1996-01-25\",\n"
+ " \"updated_at\": \"2018-08-19 12:11:35.22\"\n"
+ "}");
}

private static JsonNode getDataWithEmptyObjectAndArray() {
return Jsons.deserialize(
"{\n"
+ " \"name\": \"Andrii\",\n"
+ " \"permissions\": [\n"
+ " {\n"
+ " \"domain\": \"abs\",\n"
+ " \"items\": {},\n" // empty object
+ " \"grants\": [\n"
+ " \"admin\"\n"
+ " ]\n"
+ " },\n"
+ " {\n"
+ " \"domain\": \"tools\",\n"
+ " \"grants\": [],\n" // empty array
+ " \"items\": {\n" // object with empty array and object
+ " \"object\": {},\n"
+ " \"array\": []\n"
+ " }\n"
+ " }\n"
+ " ]\n"
+ "}");

}

}
Loading

0 comments on commit a915034

Please sign in to comment.