From 0105ca91b404bd71dc2ed1ec16eddefb3393b124 Mon Sep 17 00:00:00 2001 From: Nicolas MOREAU Date: Wed, 29 Sep 2021 02:06:14 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=89=20BigQuery=20Denormalized=20Destin?= =?UTF-8?q?ation:=20Support=20for=20more=20bigquery=20types=20through=20th?= =?UTF-8?q?e=20format=20annotation=20(#6145)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * destination-bigquery-denormalized: introduce json spec format key handling * destination-bigquery-denormalized: Bump version --- .../079d5540-f236-4294-ba7c-ade8fd918496.json | 2 +- .../seed/destination_definitions.yaml | 2 +- .../Dockerfile | 2 +- .../BigQueryDenormalizedDestination.java | 11 +++ .../bigquery/JsonSchemaFormat.java | 69 ++++++++++++++++ .../BigQueryDenormalizedDestinationTest.java | 78 +++++++++++++++++++ .../destination/bigquery/BigQueryUtils.java | 5 ++ docs/integrations/destinations/bigquery.md | 1 + 8 files changed, 167 insertions(+), 3 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/JsonSchemaFormat.java diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/079d5540-f236-4294-ba7c-ade8fd918496.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/079d5540-f236-4294-ba7c-ade8fd918496.json index 309b10eb2548..ea1fd8ef6ef9 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/079d5540-f236-4294-ba7c-ade8fd918496.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/079d5540-f236-4294-ba7c-ade8fd918496.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "079d5540-f236-4294-ba7c-ade8fd918496", "name": "BigQuery (denormalized typed struct)", "dockerRepository": "airbyte/destination-bigquery-denormalized", - "dockerImageTag": "0.1.5", + "dockerImageTag": "0.1.6", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery" } diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 85ad7af1a062..e46dc179d3aa 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -27,7 +27,7 @@ - destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496 name: BigQuery (denormalized typed struct) dockerRepository: airbyte/destination-bigquery-denormalized - dockerImageTag: 0.1.5 + dockerImageTag: 0.1.6 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery - destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a name: Google Cloud Storage (GCS) diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile index 224e7f31259f..cc0756692987 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.5 +LABEL io.airbyte.version=0.1.6 LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java index ace7b69dc9f0..2b550ac5a822 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java @@ -39,6 +39,7 @@ public class BigQueryDenormalizedDestination extends BigQueryDestination { protected static final String PROPERTIES_FIELD = "properties"; protected static final String NESTED_ARRAY_FIELD = "value"; private static final String TYPE_FIELD = "type"; + private static final String FORMAT_FIELD = "format"; @Override protected String getTargetTableName(String streamName) { @@ -134,6 +135,16 @@ private static Builder getField(BigQuerySQLNameTransformer namingResolver, Strin } } } + + // If a specific format is defined, use their specific type instead of the JSON's one + final JsonNode fieldFormat = fieldDefinition.get(FORMAT_FIELD); + if (fieldFormat != null) { + final JsonSchemaFormat schemaFormat = JsonSchemaFormat.fromJsonSchemaFormat(fieldFormat.asText()); + if (schemaFormat != null) { + builder.setType(schemaFormat.getBigQueryType()); + } + } + return builder; } diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/JsonSchemaFormat.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/JsonSchemaFormat.java new file mode 100644 index 000000000000..15b0b8120cf4 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/JsonSchemaFormat.java @@ -0,0 +1,69 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.bigquery; + +import com.google.cloud.bigquery.StandardSQLTypeName; + +/** + * Mapping of JsonSchema formats to BigQuery Standard SQL types. + */ +public enum JsonSchemaFormat { + + DATE("date", StandardSQLTypeName.DATE), + DATETIME("date-time", StandardSQLTypeName.DATETIME), + TIME("time", StandardSQLTypeName.TIME); + + private final String jsonSchemaFormat; + private final StandardSQLTypeName bigQueryType; + + JsonSchemaFormat(String jsonSchemaFormat, StandardSQLTypeName bigQueryType) { + this.jsonSchemaFormat = jsonSchemaFormat; + this.bigQueryType = bigQueryType; + } + + public static JsonSchemaFormat fromJsonSchemaFormat(String value) { + for (JsonSchemaFormat type : values()) { + if (value.equals(type.jsonSchemaFormat)) { + return type; + } + } + return null; + } + + + public String getJsonSchemaFormat() { + return jsonSchemaFormat; + } + + public StandardSQLTypeName getBigQueryType() { + return bigQueryType; + } + + @Override + public String toString() { + return jsonSchemaFormat; + } + +} diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java index 7f9da9ace5f0..e6e35261f8fa 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java @@ -13,13 +13,17 @@ import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.Dataset; import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.string.Strings; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.AirbyteStream; @@ -40,6 +44,7 @@ import java.util.stream.StreamSupport; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -64,6 +69,10 @@ class BigQueryDenormalizedDestinationTest { .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) .withData(getDataWithEmptyObjectAndArray()) .withEmittedAt(NOW.toEpochMilli())); + private static final AirbyteMessage MESSAGE_USERS3 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) + .withData(getDataWithFormats()) + .withEmittedAt(NOW.toEpochMilli())); private JsonNode config; @@ -99,6 +108,7 @@ void setup(TestInfo info) throws IOException { final String datasetLocation = "EU"; MESSAGE_USERS1.getRecord().setNamespace(datasetId); MESSAGE_USERS2.getRecord().setNamespace(datasetId); + MESSAGE_USERS3.getRecord().setNamespace(datasetId); final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(datasetLocation).build(); dataset = bigquery.create(datasetInfo); @@ -166,7 +176,39 @@ void testNestedWrite(JsonNode schema, AirbyteMessage message) throws Exception { assertEquals(extractJsonValues(resultJson, "name"), extractJsonValues(expectedUsersJson, "name")); assertEquals(extractJsonValues(resultJson, "grants"), extractJsonValues(expectedUsersJson, "grants")); assertEquals(extractJsonValues(resultJson, "domain"), extractJsonValues(expectedUsersJson, "domain")); + } + + @Test + void testWriteWithFormat() throws Exception { + catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream() + .withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(getSchemaWithFormats())) + .withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE))); + final BigQueryDestination destination = new BigQueryDenormalizedDestination(); + final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); + + consumer.accept(MESSAGE_USERS3); + consumer.close(); + + final List usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME); + final JsonNode expectedUsersJson = MESSAGE_USERS3.getRecord().getData(); + assertEquals(usersActual.size(), 1); + final JsonNode resultJson = usersActual.get(0); + assertEquals(extractJsonValues(resultJson, "name"), extractJsonValues(expectedUsersJson, "name")); + assertEquals(extractJsonValues(resultJson, "date_of_birth"), extractJsonValues(expectedUsersJson, "date_of_birth")); + + // Bigquery's datetime type accepts multiple input format but always outputs the same, so we can't expect to receive the value we sent. + assertEquals(extractJsonValues(resultJson, "updated_at"), Set.of("2018-08-19T12:11:35.220")); + + final Schema expectedSchema = Schema.of( + Field.of("name", StandardSQLTypeName.STRING), + Field.of("date_of_birth", StandardSQLTypeName.DATE), + Field.of("updated_at", StandardSQLTypeName.DATETIME), + Field.of(JavaBaseConstants.COLUMN_NAME_AB_ID, StandardSQLTypeName.STRING), + Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.TIMESTAMP) + ); + + assertEquals(BigQueryUtils.getTableDefinition(bigquery, dataset.getDatasetId().getDataset(), USERS_STREAM_NAME).getSchema(), expectedSchema); } private Set extractJsonValues(JsonNode node, String attributeName) { @@ -252,6 +294,34 @@ private static JsonNode getSchema() { } + private static JsonNode getSchemaWithFormats() { + return Jsons.deserialize( + "{\n" + + " \"type\": [\n" + + " \"object\"\n" + + " ],\n" + + " \"properties\": {\n" + + " \"name\": {\n" + + " \"type\": [\n" + + " \"string\"\n" + + " ]\n" + + " },\n" + + " \"date_of_birth\": {\n" + + " \"type\": [\n" + + " \"string\"\n" + + " ],\n" + + " \"format\": \"date\"\n" + + " },\n" + + " \"updated_at\": {\n" + + " \"type\": [\n" + + " \"string\"\n" + + " ],\n" + + " \"format\": \"date-time\"\n" + + " }\n" + + " }\n" + + "}"); + } + private static JsonNode getSchemaWithInvalidArrayType() { return Jsons.deserialize( "{\n" @@ -310,7 +380,15 @@ private static JsonNode getData() { + " }\n" + " ]\n" + "}"); + } + private static JsonNode getDataWithFormats() { + return Jsons.deserialize( + "{\n" + + " \"name\": \"Andrii\",\n" + + " \"date_of_birth\": \"1996-01-25\",\n" + + " \"updated_at\": \"2018-08-19 12:11:35.22\"\n" + + "}"); } private static JsonNode getDataWithEmptyObjectAndArray() { diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java index 8b73e12c6a39..437c4e5c7595 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java @@ -120,4 +120,9 @@ public static String getDatasetLocation(JsonNode config) { } } + static TableDefinition getTableDefinition(BigQuery bigquery, String datasetName, String tableName) { + final TableId tableId = TableId.of(datasetName, tableName); + return bigquery.getTable(tableId).getDefinition(); + } + } diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index c1a49a3ca81a..f5355d62814d 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -162,6 +162,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.6 | 2021-09-16 | [#6145](https://github.com/airbytehq/airbyte/pull/6145) | BigQuery Denormalized support for date, datetime & timestamp types through the json "format" key | 0.1.5 | 2021-09-07 | [#5881](https://github.com/airbytehq/airbyte/pull/5881) | BigQuery Denormalized NPE fix | 0.1.4 | 2021-09-04 | [#5813](https://github.com/airbytehq/airbyte/pull/5813) | fix Stackoverflow error when receive a schema from source where "Array" type doesn't contain a required "items" element | | 0.1.3 | 2021-08-07 | [#5261](https://github.com/airbytehq/airbyte/pull/5261) | 🐛 Destination BigQuery(Denormalized): Fix processing arrays of records |